提交 883eb91a authored 作者: taojinlong's avatar taojinlong

fix: 删除数据

上级 57c55515
...@@ -12,7 +12,10 @@ import io.dataease.commons.utils.LogUtil; ...@@ -12,7 +12,10 @@ import io.dataease.commons.utils.LogUtil;
import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.constants.DatasourceTypes;
import io.dataease.datasource.dto.DorisConfigration; import io.dataease.datasource.dto.DorisConfigration;
import io.dataease.datasource.dto.MysqlConfigration; import io.dataease.datasource.dto.MysqlConfigration;
import io.dataease.datasource.dto.TableFiled;
import io.dataease.datasource.provider.DatasourceProvider;
import io.dataease.datasource.provider.JdbcProvider; import io.dataease.datasource.provider.JdbcProvider;
import io.dataease.datasource.provider.ProviderFactory;
import io.dataease.datasource.request.DatasourceRequest; import io.dataease.datasource.request.DatasourceRequest;
import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataSetTaskLogDTO;
import io.dataease.dto.dataset.DataTableInfoDTO; import io.dataease.dto.dataset.DataTableInfoDTO;
...@@ -91,8 +94,7 @@ public class ExtractDataService { ...@@ -91,8 +94,7 @@ public class ExtractDataService {
"PROPERTIES(\"replication_num\" = \"1\");"; "PROPERTIES(\"replication_num\" = \"1\");";
private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" + private static String shellScript = "curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load\n" +
// "rm -rf %s\n" + "rm -rf %s\n";
"return $?";
private String createDorisTablColumnSql( List<DatasetTableField> datasetTableFields){ private String createDorisTablColumnSql( List<DatasetTableField> datasetTableFields){
String Column_Fields = "dataease_uuid varchar(50),"; String Column_Fields = "dataease_uuid varchar(50),";
...@@ -167,7 +169,7 @@ public class ExtractDataService { ...@@ -167,7 +169,7 @@ public class ExtractDataService {
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql); createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null); generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null);
generateJobFile("all_scope", datasetTable, datasetTableFields); generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList())));
extractData(datasetTable, "all_scope"); extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId)); replaceTable(DorisTableUtils.dorisName(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
...@@ -194,8 +196,10 @@ public class ExtractDataService { ...@@ -194,8 +196,10 @@ public class ExtractDataService {
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
//TODO sql column
generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql); generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, datasetTableFields); generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_add"); extractData(datasetTable, "incremental_add");
} }
...@@ -203,8 +207,10 @@ public class ExtractDataService { ...@@ -203,8 +207,10 @@ public class ExtractDataService {
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
//TODO sql column
;
generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql); generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable, datasetTableFields); generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_delete"); extractData(datasetTable, "incremental_delete");
} }
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
...@@ -289,44 +295,32 @@ public class ExtractDataService { ...@@ -289,44 +295,32 @@ public class ExtractDataService {
return remoteSlaveServer; return remoteSlaveServer;
} }
private void generateJobFile(String extractType, DatasetTable datasetTable, List<DatasetTableField> datasetTableFields) throws Exception { private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFeilds) throws Exception {
String dorisOutputTable = null; String dorisOutputTable = null;
String jobName = null; String jobName = null;
String script = null; String script = null;
Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource"); Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
DorisConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfigration.class); DorisConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), DorisConfigration.class);
String columns = String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList())) + ",dataease_uuid"; String columns = columnFeilds + ",dataease_uuid";
switch (extractType) {
case "all_scope":
jobName = "job_" + datasetTable.getId();
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND",root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
break;
case "incremental_add":
jobName = "job_add_" + datasetTable.getId();
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
break;
case "incremental_delete":
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
jobName = "job_delete_" + datasetTable.getId();
break;
default:
break;
}
String transName = null; String transName = null;
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
transName = "trans_" + datasetTable.getId(); transName = "trans_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId())); dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
jobName = "job_" + datasetTable.getId();
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND",root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
break; break;
case "incremental_add": case "incremental_add":
transName = "trans_add_" + datasetTable.getId(); transName = "trans_add_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId());
jobName = "job_add_" + datasetTable.getId();
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention);
break; break;
case "incremental_delete": case "incremental_delete":
transName = "trans_delete_" + datasetTable.getId(); transName = "trans_delete_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(),dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + dorisOutputTable + "." + extention);
jobName = "job_delete_" + datasetTable.getId();
break; break;
default: default:
break; break;
...@@ -382,16 +376,25 @@ public class ExtractDataService { ...@@ -382,16 +376,25 @@ public class ExtractDataService {
greenHop.setEvaluation(true); greenHop.setEvaluation(true);
jobMeta.addJobHop(greenHop); jobMeta.addJobHop(greenHop);
if(!extractType.equals("incremental_delete")){
}
String jobXml = jobMeta.getXML(); String jobXml = jobMeta.getXML();
File file = new File(root_path + jobName + ".kjb"); File file = new File(root_path + jobName + ".kjb");
FileUtils.writeStringToFile(file, jobXml, "UTF-8"); FileUtils.writeStringToFile(file, jobXml, "UTF-8");
} }
private String fetchSqlField(String sql, Datasource ds)throws Exception{
String tmpSql = sql;
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
if(tmpSql.trim().endsWith(";")){
tmpSql = tmpSql.substring(0, tmpSql.length() -1 ) + " limit 0";
}else {
tmpSql = tmpSql + " limit 0";
}
datasourceRequest.setQuery(tmpSql);
return String.join(",", datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).collect(Collectors.toList()));
}
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception { private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
TransMeta transMeta = new TransMeta(); TransMeta transMeta = new TransMeta();
String dorisOutputTable = null; String dorisOutputTable = null;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论