提交 e5f5ab90 authored 作者: taojinlong's avatar taojinlong

fix: 同时创建多个定时同步的数据集,会出现部分数据集更新信息显示成功但是预览数据为空

上级 657265cc
...@@ -120,6 +120,19 @@ public class ExtractDataService { ...@@ -120,6 +120,19 @@ public class ExtractDataService {
" exit 1\n" + " exit 1\n" +
"fi\n"; "fi\n";
private static final String shellScriptForDeleteFile = "result=`curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load`\n" +
"rm -rf %s \n" +
"if [ $? -eq 0 ] ; then\n" +
" failstatus=$(echo $result | grep '\"Status\": \"Fail\"')\n" +
" if [ \"x${failstatus}\" != \"x\" ];then" +
" echo $result\n" +
" exit 1\n" +
" fi\n" +
"else\n" +
" echo $result\n" +
" exit 1\n" +
"fi\n";
public synchronized boolean existSyncTask(DatasetTable datasetTable, DatasetTableTask datasetTableTask, Long startTime) { public synchronized boolean existSyncTask(DatasetTable datasetTable, DatasetTableTask datasetTableTask, Long startTime) {
datasetTable.setSyncStatus(JobStatus.Underway.name()); datasetTable.setSyncStatus(JobStatus.Underway.name());
DatasetTableExample example = new DatasetTableExample(); DatasetTableExample example = new DatasetTableExample();
...@@ -445,14 +458,20 @@ public class ExtractDataService { ...@@ -445,14 +458,20 @@ public class ExtractDataService {
String dataFile = null; String dataFile = null;
String script = null; String script = null;
String streamLoadScript = "";
if(kettleFilesKeep){
streamLoadScript = shellScript;
}else {
streamLoadScript = shellScriptForDeleteFile;
}
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
dataFile = root_path + TableUtils.tmpName(TableUtils.tableName(datasetTable.getId())) + "." + extention; dataFile = root_path + TableUtils.tmpName(TableUtils.tableName(datasetTable.getId())) + "." + extention;
script = String.format(shellScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), System.currentTimeMillis(), separator, columns, "APPEND", dataFile, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tmpName(TableUtils.tableName(datasetTable.getId()))); script = String.format(streamLoadScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), System.currentTimeMillis(), separator, columns, "APPEND", dataFile, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tmpName(TableUtils.tableName(datasetTable.getId())));
break; break;
default: default:
dataFile = root_path + TableUtils.addName(TableUtils.tableName(datasetTable.getId())) + "." + extention; dataFile = root_path + TableUtils.addName(TableUtils.tableName(datasetTable.getId())) + "." + extention;
script = String.format(shellScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), System.currentTimeMillis(), separator, columns, "APPEND", dataFile, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tableName(datasetTable.getId())); script = String.format(streamLoadScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), System.currentTimeMillis(), separator, columns, "APPEND", dataFile, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tableName(datasetTable.getId()));
break; break;
} }
...@@ -744,7 +763,6 @@ public class ExtractDataService { ...@@ -744,7 +763,6 @@ public class ExtractDataService {
DataEaseException.throwException(transStatus.getLoggingString()); DataEaseException.throwException(transStatus.getLoggingString());
return; return;
} }
executing = true; executing = true;
String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null); String lastCarteObjectId = Job.sendToSlaveServer(jobMeta, jobExecutionConfiguration, repository, null);
SlaveServerJobStatus jobStatus = null; SlaveServerJobStatus jobStatus = null;
...@@ -759,7 +777,7 @@ public class ExtractDataService { ...@@ -759,7 +777,7 @@ public class ExtractDataService {
if (jobStatus.getStatusDescription().equals("Finished")) { if (jobStatus.getStatusDescription().equals("Finished")) {
return; return;
} else { } else {
DataEaseException.throwException((jobStatus.getLoggingString())); DataEaseException.throwException(jobStatus.getLoggingString());
} }
} }
...@@ -773,20 +791,26 @@ public class ExtractDataService { ...@@ -773,20 +791,26 @@ public class ExtractDataService {
Datasource dorisDatasource = engineService.getDeEngine(); Datasource dorisDatasource = engineService.getDeEngine();
DorisConfiguration dorisConfiguration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfiguration.class); DorisConfiguration dorisConfiguration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfiguration.class);
String columns = columnFields + ",dataease_uuid"; String columns = columnFields + ",dataease_uuid";
String streamLoadScript = "";
if(kettleFilesKeep){
streamLoadScript = shellScript;
}else {
streamLoadScript = shellScriptForDeleteFile;
}
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
outFile = TableUtils.tmpName(TableUtils.tableName(datasetTable.getId())); outFile = TableUtils.tmpName(TableUtils.tableName(datasetTable.getId()));
jobName = "job_" + TableUtils.tableName(datasetTable.getId()); jobName = "job_" + TableUtils.tableName(datasetTable.getId());
script = String.format(shellScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), System.currentTimeMillis(), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tmpName(TableUtils.tableName(datasetTable.getId())), root_path + outFile + "." + extention); script = String.format(streamLoadScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), datasetTable.getId() + System.currentTimeMillis(), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tmpName(TableUtils.tableName(datasetTable.getId())), root_path + outFile + "." + extention);
break; break;
case "incremental_add": case "incremental_add":
outFile = TableUtils.addName(datasetTable.getId()); outFile = TableUtils.addName(datasetTable.getId());
jobName = "job_add_" + TableUtils.tableName(datasetTable.getId()); jobName = "job_add_" + TableUtils.tableName(datasetTable.getId());
script = String.format(shellScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), System.currentTimeMillis(), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tableName(datasetTable.getId()), root_path + outFile + "." + extention); script = String.format(streamLoadScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), datasetTable.getId() + System.currentTimeMillis(), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tableName(datasetTable.getId()), root_path + outFile + "." + extention);
break; break;
case "incremental_delete": case "incremental_delete":
outFile = TableUtils.deleteName(TableUtils.tableName(datasetTable.getId())); outFile = TableUtils.deleteName(TableUtils.tableName(datasetTable.getId()));
script = String.format(shellScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), System.currentTimeMillis(), separator, columns, "DELETE", root_path + outFile + "." + extention, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tableName(datasetTable.getId()), root_path + outFile + "." + extention); script = String.format(streamLoadScript, dorisConfiguration.getUsername(), dorisConfiguration.getPassword(), datasetTable.getId() + System.currentTimeMillis(), separator, columns, "DELETE", root_path + outFile + "." + extention, dorisConfiguration.getHost(), dorisConfiguration.getHttpPort(), dorisConfiguration.getDataBase(), TableUtils.tableName(datasetTable.getId()), root_path + outFile + "." + extention);
jobName = "job_delete_" + TableUtils.tableName(datasetTable.getId()); jobName = "job_delete_" + TableUtils.tableName(datasetTable.getId());
break; break;
default: default:
......
...@@ -277,6 +277,7 @@ export default { ...@@ -277,6 +277,7 @@ export default {
this.form = JSON.parse(JSON.stringify(row)) this.form = JSON.parse(JSON.stringify(row))
this.originConfiguration = this.form.configuration this.originConfiguration = this.form.configuration
if (row.type === 'api') { if (row.type === 'api') {
} else { } else {
this.form.configuration = JSON.parse(this.form.configuration) this.form.configuration = JSON.parse(this.form.configuration)
} }
...@@ -494,7 +495,9 @@ export default { ...@@ -494,7 +495,9 @@ export default {
changeType() { changeType() {
for (let i = 0; i < this.dsTypes.length; i++) { for (let i = 0; i < this.dsTypes.length; i++) {
if (this.dsTypes[i].type === this.form.type) { if (this.dsTypes[i].type === this.form.type) {
this.form.configuration.extraParams = this.dsTypes[i].extraParams if(row.type !== 'api'){
this.form.configuration.extraParams = this.dsTypes[i].extraParams
}
this.datasourceType = this.dsTypes[i] this.datasourceType = this.dsTypes[i]
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论