提交 28635839 authored 作者: taojinlong's avatar taojinlong

feat: excel 數據集 替換、追加

上级 d44bbddb
...@@ -129,6 +129,7 @@ public class ExtractDataService { ...@@ -129,6 +129,7 @@ public class ExtractDataService {
" exit 1\n" + " exit 1\n" +
"fi\n" + "fi\n" +
"rm -rf %s\n"; "rm -rf %s\n";
private String createDorisTablColumnSql(List<DatasetTableField> datasetTableFields) { private String createDorisTablColumnSql(List<DatasetTableField> datasetTableFields) {
String Column_Fields = "dataease_uuid varchar(50), `"; String Column_Fields = "dataease_uuid varchar(50), `";
for (DatasetTableField datasetTableField : datasetTableFields) { for (DatasetTableField datasetTableField : datasetTableFields) {
...@@ -244,7 +245,11 @@ public class ExtractDataService { ...@@ -244,7 +245,11 @@ public class ExtractDataService {
switch (updateType) { switch (updateType) {
// 全量更新 // 全量更新
case all_scope: case all_scope:
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); if(datasource.getType().equalsIgnoreCase("excel")){
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
}else {
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
}
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql); createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null); generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null);
...@@ -258,35 +263,43 @@ public class ExtractDataService { ...@@ -258,35 +263,43 @@ public class ExtractDataService {
// 增量更新 // 增量更新
case add_scope: case add_scope:
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId); if(datasource.getType().equalsIgnoreCase("excel")){
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) { datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
return; generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null);
} generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
DatasetTableTaskLog request = new DatasetTableTaskLog();
request.setTableId(datasetTableId);
request.setStatus(JobStatus.Completed.name());
List<DataSetTaskLogDTO> dataSetTaskLogDTOS = dataSetTableTaskLogService.list(request);
if (CollectionUtils.isEmpty(dataSetTaskLogDTOS)) {
return;
}
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
// 增量添加
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_add"); extractData(datasetTable, "incremental_add");
} return;
}else {
// 增量删除 DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) { if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() return;
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); }
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql); DatasetTableTaskLog request = new DatasetTableTaskLog();
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource)); request.setTableId(datasetTableId);
extractData(datasetTable, "incremental_delete"); request.setStatus(JobStatus.Completed.name());
List<DataSetTaskLogDTO> dataSetTaskLogDTOS = dataSetTableTaskLogService.list(request);
if (CollectionUtils.isEmpty(dataSetTaskLogDTOS)) {
return;
}
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
// 增量添加
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_add");
}
// 增量删除
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_delete");
}
} }
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis()); datasetTableTaskLog.setEndTime(System.currentTimeMillis());
...@@ -358,6 +371,9 @@ public class ExtractDataService { ...@@ -358,6 +371,9 @@ public class ExtractDataService {
if(CollectionUtils.isNotEmpty(datasetTableTaskLogs)){ if(CollectionUtils.isNotEmpty(datasetTableTaskLogs)){
return datasetTableTaskLogs.get(0); return datasetTableTaskLogs.get(0);
} }
try {
Thread.sleep(1000);
}catch (Exception ignore){}
} }
datasetTableTaskLog.setStartTime(System.currentTimeMillis()); datasetTableTaskLog.setStartTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
...@@ -365,8 +381,6 @@ public class ExtractDataService { ...@@ -365,8 +381,6 @@ public class ExtractDataService {
} }
private void extractData(DatasetTable datasetTable, String extractType) throws Exception { private void extractData(DatasetTable datasetTable, String extractType) throws Exception {
KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class); KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class);
RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree(); RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree();
JobMeta jobMeta = null; JobMeta jobMeta = null;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论