提交 ec4dcf93 authored 作者: taojinlong's avatar taojinlong

feat: 清理临时文件

上级 064b0fe8
FROM registry.fit2cloud.com/fit2cloud3/fabric8-java-alpine-openjdk8-jre FROM registry.cn-qingdao.aliyuncs.com/fit2cloud3/fabric8-java-alpine-openjdk8-jre
ARG IMAGE_TAG ARG IMAGE_TAG
......
...@@ -25,6 +25,8 @@ public class DatasetTable implements Serializable { ...@@ -25,6 +25,8 @@ public class DatasetTable implements Serializable {
private String syncStatus; private String syncStatus;
private Long lastUpdateTime;
private String info; private String info;
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
......
...@@ -783,6 +783,66 @@ public class DatasetTableExample { ...@@ -783,6 +783,66 @@ public class DatasetTableExample {
addCriterion("sync_status not between", value1, value2, "syncStatus"); addCriterion("sync_status not between", value1, value2, "syncStatus");
return (Criteria) this; return (Criteria) this;
} }
public Criteria andLastUpdateTimeIsNull() {
addCriterion("last_update_time is null");
return (Criteria) this;
}
public Criteria andLastUpdateTimeIsNotNull() {
addCriterion("last_update_time is not null");
return (Criteria) this;
}
public Criteria andLastUpdateTimeEqualTo(Long value) {
addCriterion("last_update_time =", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeNotEqualTo(Long value) {
addCriterion("last_update_time <>", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeGreaterThan(Long value) {
addCriterion("last_update_time >", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeGreaterThanOrEqualTo(Long value) {
addCriterion("last_update_time >=", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeLessThan(Long value) {
addCriterion("last_update_time <", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeLessThanOrEqualTo(Long value) {
addCriterion("last_update_time <=", value, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeIn(List<Long> values) {
addCriterion("last_update_time in", values, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeNotIn(List<Long> values) {
addCriterion("last_update_time not in", values, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeBetween(Long value1, Long value2) {
addCriterion("last_update_time between", value1, value2, "lastUpdateTime");
return (Criteria) this;
}
public Criteria andLastUpdateTimeNotBetween(Long value1, Long value2) {
addCriterion("last_update_time not between", value1, value2, "lastUpdateTime");
return (Criteria) this;
}
} }
public static class Criteria extends GeneratedCriteria { public static class Criteria extends GeneratedCriteria {
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
<result column="create_time" jdbcType="BIGINT" property="createTime" /> <result column="create_time" jdbcType="BIGINT" property="createTime" />
<result column="qrtz_instance" jdbcType="VARCHAR" property="qrtzInstance" /> <result column="qrtz_instance" jdbcType="VARCHAR" property="qrtzInstance" />
<result column="sync_status" jdbcType="VARCHAR" property="syncStatus" /> <result column="sync_status" jdbcType="VARCHAR" property="syncStatus" />
<result column="last_update_time" jdbcType="BIGINT" property="lastUpdateTime" />
</resultMap> </resultMap>
<resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable"> <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable">
<result column="info" jdbcType="LONGVARCHAR" property="info" /> <result column="info" jdbcType="LONGVARCHAR" property="info" />
...@@ -76,7 +77,7 @@ ...@@ -76,7 +77,7 @@
</sql> </sql>
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, qrtz_instance, id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, qrtz_instance,
sync_status sync_status, last_update_time
</sql> </sql>
<sql id="Blob_Column_List"> <sql id="Blob_Column_List">
info info
...@@ -133,11 +134,13 @@ ...@@ -133,11 +134,13 @@
insert into dataset_table (id, `name`, scene_id, insert into dataset_table (id, `name`, scene_id,
data_source_id, `type`, `mode`, data_source_id, `type`, `mode`,
create_by, create_time, qrtz_instance, create_by, create_time, qrtz_instance,
sync_status, info) sync_status, last_update_time, info
)
values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR}, values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR},
#{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER}, #{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER},
#{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{qrtzInstance,jdbcType=VARCHAR}, #{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{qrtzInstance,jdbcType=VARCHAR},
#{syncStatus,jdbcType=VARCHAR}, #{info,jdbcType=LONGVARCHAR}) #{syncStatus,jdbcType=VARCHAR}, #{lastUpdateTime,jdbcType=BIGINT}, #{info,jdbcType=LONGVARCHAR}
)
</insert> </insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable"> <insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table insert into dataset_table
...@@ -172,6 +175,9 @@ ...@@ -172,6 +175,9 @@
<if test="syncStatus != null"> <if test="syncStatus != null">
sync_status, sync_status,
</if> </if>
<if test="lastUpdateTime != null">
last_update_time,
</if>
<if test="info != null"> <if test="info != null">
info, info,
</if> </if>
...@@ -207,6 +213,9 @@ ...@@ -207,6 +213,9 @@
<if test="syncStatus != null"> <if test="syncStatus != null">
#{syncStatus,jdbcType=VARCHAR}, #{syncStatus,jdbcType=VARCHAR},
</if> </if>
<if test="lastUpdateTime != null">
#{lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="info != null"> <if test="info != null">
#{info,jdbcType=LONGVARCHAR}, #{info,jdbcType=LONGVARCHAR},
</if> </if>
...@@ -251,6 +260,9 @@ ...@@ -251,6 +260,9 @@
<if test="record.syncStatus != null"> <if test="record.syncStatus != null">
sync_status = #{record.syncStatus,jdbcType=VARCHAR}, sync_status = #{record.syncStatus,jdbcType=VARCHAR},
</if> </if>
<if test="record.lastUpdateTime != null">
last_update_time = #{record.lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="record.info != null"> <if test="record.info != null">
info = #{record.info,jdbcType=LONGVARCHAR}, info = #{record.info,jdbcType=LONGVARCHAR},
</if> </if>
...@@ -271,6 +283,7 @@ ...@@ -271,6 +283,7 @@
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR}, qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR}, sync_status = #{record.syncStatus,jdbcType=VARCHAR},
last_update_time = #{record.lastUpdateTime,jdbcType=BIGINT},
info = #{record.info,jdbcType=LONGVARCHAR} info = #{record.info,jdbcType=LONGVARCHAR}
<if test="_parameter != null"> <if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" /> <include refid="Update_By_Example_Where_Clause" />
...@@ -287,7 +300,8 @@ ...@@ -287,7 +300,8 @@
create_by = #{record.createBy,jdbcType=VARCHAR}, create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR}, qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR} sync_status = #{record.syncStatus,jdbcType=VARCHAR},
last_update_time = #{record.lastUpdateTime,jdbcType=BIGINT}
<if test="_parameter != null"> <if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" /> <include refid="Update_By_Example_Where_Clause" />
</if> </if>
...@@ -322,6 +336,9 @@ ...@@ -322,6 +336,9 @@
<if test="syncStatus != null"> <if test="syncStatus != null">
sync_status = #{syncStatus,jdbcType=VARCHAR}, sync_status = #{syncStatus,jdbcType=VARCHAR},
</if> </if>
<if test="lastUpdateTime != null">
last_update_time = #{lastUpdateTime,jdbcType=BIGINT},
</if>
<if test="info != null"> <if test="info != null">
info = #{info,jdbcType=LONGVARCHAR}, info = #{info,jdbcType=LONGVARCHAR},
</if> </if>
...@@ -339,6 +356,7 @@ ...@@ -339,6 +356,7 @@
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR}, qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR}, sync_status = #{syncStatus,jdbcType=VARCHAR},
last_update_time = #{lastUpdateTime,jdbcType=BIGINT},
info = #{info,jdbcType=LONGVARCHAR} info = #{info,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
</update> </update>
...@@ -352,7 +370,8 @@ ...@@ -352,7 +370,8 @@
create_by = #{createBy,jdbcType=VARCHAR}, create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR}, qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR} sync_status = #{syncStatus,jdbcType=VARCHAR},
last_update_time = #{lastUpdateTime,jdbcType=BIGINT}
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
</update> </update>
</mapper> </mapper>
\ No newline at end of file
...@@ -1055,6 +1055,11 @@ public class DataSetTableService { ...@@ -1055,6 +1055,11 @@ public class DataSetTableService {
DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample(); DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample();
datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name()).andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList())); datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name()).andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList()));
datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample); datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample);
for (DatasetTable jobStoppeddDatasetTable : jobStoppeddDatasetTables) {
extractDataService.deleteFile("all_scope", jobStoppeddDatasetTable.getId());
extractDataService.deleteFile("incremental_add", jobStoppeddDatasetTable.getId());
extractDataService.deleteFile("incremental_delete", jobStoppeddDatasetTable.getId());
} }
}
} }
...@@ -196,14 +196,15 @@ public class ExtractDataService { ...@@ -196,14 +196,15 @@ public class ExtractDataService {
}else { }else {
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList()))); generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
} }
Long execTime = System.currentTimeMillis();
extractData(datasetTable, "all_scope"); extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId)); replaceTable(DorisTableUtils.dorisName(datasetTableId));
saveSucessLog(datasetTableTaskLog); saveSucessLog(datasetTableTaskLog);
deleteFile("all_scope", datasetTableId); deleteFile("all_scope", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
}catch (Exception e){ }catch (Exception e){
saveErrorLog(datasetTableId, taskId, e); saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error); updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId))); dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
deleteFile("all_scope", datasetTableId); deleteFile("all_scope", datasetTableId);
}finally { }finally {
...@@ -220,21 +221,19 @@ public class ExtractDataService { ...@@ -220,21 +221,19 @@ public class ExtractDataService {
datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null); datasetTableTaskLog = writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, null);
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null); generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, null);
generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList()))); generateJobFile("incremental_add", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
Long execTime = System.currentTimeMillis();
extractData(datasetTable, "incremental_add"); extractData(datasetTable, "incremental_add");
saveSucessLog(datasetTableTaskLog); saveSucessLog(datasetTableTaskLog);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
}else { }else {
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId); DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) { if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
return; return;
} }
DatasetTableTaskLog request = new DatasetTableTaskLog();
request.setTableId(datasetTableId); if (datasetTable.getLastUpdateTime() == 0 || datasetTable.getLastUpdateTime() == null) {
request.setStatus(JobStatus.Completed.name()); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, null);
List<DatasetTableTaskLog> datasetTableTaskLogs = dataSetTableTaskLogService.select(request);
if (CollectionUtils.isEmpty(datasetTableTaskLogs)) {
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed);
return; return;
} }
...@@ -245,8 +244,9 @@ public class ExtractDataService { ...@@ -245,8 +244,9 @@ public class ExtractDataService {
datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); datasetTableTaskLog = getDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
} }
Long execTime = System.currentTimeMillis();
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加 if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString()) String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql); generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource)); generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
...@@ -254,7 +254,7 @@ public class ExtractDataService { ...@@ -254,7 +254,7 @@ public class ExtractDataService {
} }
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除 if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTableTaskLogs.get(0).getStartTime().toString()) String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, datasetTable.getLastUpdateTime().toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql); generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource)); generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
...@@ -263,11 +263,11 @@ public class ExtractDataService { ...@@ -263,11 +263,11 @@ public class ExtractDataService {
saveSucessLog(datasetTableTaskLog); saveSucessLog(datasetTableTaskLog);
deleteFile("incremental_add", datasetTableId); deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId); deleteFile("incremental_delete", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
} }
}catch (Exception e){ }catch (Exception e){
saveErrorLog(datasetTableId, taskId, e); saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error); updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
deleteFile("incremental_add", datasetTableId); deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId); deleteFile("incremental_delete", datasetTableId);
}finally { }finally {
...@@ -280,8 +280,11 @@ public class ExtractDataService { ...@@ -280,8 +280,11 @@ public class ExtractDataService {
} }
} }
private void updateTableStatus(String datasetTableId, DatasetTable datasetTable, JobStatus completed) { private void updateTableStatus(String datasetTableId, DatasetTable datasetTable, JobStatus completed, Long execTime) {
datasetTable.setSyncStatus(completed.name()); datasetTable.setSyncStatus(completed.name());
if(execTime != null){
datasetTable.setLastUpdateTime(execTime);
}
DatasetTableExample example = new DatasetTableExample(); DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andIdEqualTo(datasetTableId); example.createCriteria().andIdEqualTo(datasetTableId);
datasetTableMapper.updateByExampleSelective(datasetTable, example); datasetTableMapper.updateByExampleSelective(datasetTable, example);
...@@ -429,13 +432,13 @@ public class ExtractDataService { ...@@ -429,13 +432,13 @@ public class ExtractDataService {
JobMeta jobMeta = null; JobMeta jobMeta = null;
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
jobMeta = repository.loadJob("job_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); jobMeta = repository.loadJob("job_" + DorisTableUtils.dorisName(datasetTable.getId()), repositoryDirectoryInterface, null, null);
break; break;
case "incremental_add": case "incremental_add":
jobMeta = repository.loadJob("job_add_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); jobMeta = repository.loadJob("job_add_" + DorisTableUtils.dorisName(datasetTable.getId()), repositoryDirectoryInterface, null, null);
break; break;
case "incremental_delete": case "incremental_delete":
jobMeta = repository.loadJob("job_delete_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); jobMeta = repository.loadJob("job_delete_" + DorisTableUtils.dorisName(datasetTable.getId()), repositoryDirectoryInterface, null, null);
break; break;
default: default:
break; break;
...@@ -477,7 +480,7 @@ public class ExtractDataService { ...@@ -477,7 +480,7 @@ public class ExtractDataService {
} }
private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFeilds) throws Exception { private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFeilds) throws Exception {
String dorisOutputTable = null; String outFile = null;
String jobName = null; String jobName = null;
String script = null; String script = null;
Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource"); Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
...@@ -486,22 +489,22 @@ public class ExtractDataService { ...@@ -486,22 +489,22 @@ public class ExtractDataService {
String transName = null; String transName = null;
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
transName = "trans_" + datasetTable.getId(); transName = "trans_" + DorisTableUtils.dorisName(datasetTable.getId());
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId())); outFile = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
jobName = "job_" + datasetTable.getId(); jobName = "job_" + datasetTable.getId();
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + outFile + "." + extention);
break; break;
case "incremental_add": case "incremental_add":
transName = "trans_add_" + datasetTable.getId(); transName = "trans_add_" + DorisTableUtils.dorisName(datasetTable.getId());
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); outFile = DorisTableUtils.dorisName(datasetTable.getId());
jobName = "job_add_" + datasetTable.getId(); jobName = "job_add_" + DorisTableUtils.dorisName(datasetTable.getId());
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), dorisOutputTable, root_path + dorisOutputTable + "." + extention); script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "APPEND", root_path + outFile + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + outFile + "." + extention);
break; break;
case "incremental_delete": case "incremental_delete":
transName = "trans_delete_" + datasetTable.getId(); transName = "trans_delete_" + DorisTableUtils.dorisName(datasetTable.getId());
dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId())); outFile = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + dorisOutputTable + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + dorisOutputTable + "." + extention); script = String.format(shellScript, dorisConfigration.getUsername(), dorisConfigration.getPassword(), String.valueOf(System.currentTimeMillis()), separator, columns, "DELETE", root_path + outFile + "." + extention, dorisConfigration.getHost(), dorisConfigration.getHttpPort(), dorisConfigration.getDataBase(), DorisTableUtils.dorisName(datasetTable.getId()), root_path + outFile + "." + extention);
jobName = "job_delete_" + datasetTable.getId(); jobName = "job_delete_" + DorisTableUtils.dorisName(datasetTable.getId());
break; break;
default: default:
break; break;
...@@ -586,7 +589,7 @@ public class ExtractDataService { ...@@ -586,7 +589,7 @@ public class ExtractDataService {
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception { private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
TransMeta transMeta = new TransMeta(); TransMeta transMeta = new TransMeta();
String dorisOutputTable = null; String outFile = null;
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType()); DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
DatabaseMeta dataMeta = null; DatabaseMeta dataMeta = null;
StepMeta inputStep = null; StepMeta inputStep = null;
...@@ -640,18 +643,18 @@ public class ExtractDataService { ...@@ -640,18 +643,18 @@ public class ExtractDataService {
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
transName = "trans_" + datasetTable.getId(); transName = "trans_" + DorisTableUtils.dorisName(datasetTable.getId());
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId())); outFile = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
transMeta.setName(transName); transMeta.setName(transName);
break; break;
case "incremental_add": case "incremental_add":
transName = "trans_add_" + datasetTable.getId(); transName = "trans_add_" + DorisTableUtils.dorisName(datasetTable.getId());
dorisOutputTable = DorisTableUtils.dorisName(datasetTable.getId()); outFile = DorisTableUtils.dorisName(datasetTable.getId());
transMeta.setName(transName); transMeta.setName(transName);
break; break;
case "incremental_delete": case "incremental_delete":
dorisOutputTable = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId())); transName = "trans_delete_" + DorisTableUtils.dorisName(datasetTable.getId());
transName = "trans_delete_" + datasetTable.getId(); outFile = DorisTableUtils.dorisDeleteName(DorisTableUtils.dorisName(datasetTable.getId()));
transMeta.setName(transName); transMeta.setName(transName);
break; break;
default: default:
...@@ -659,7 +662,7 @@ public class ExtractDataService { ...@@ -659,7 +662,7 @@ public class ExtractDataService {
} }
outputStep = outputStep(dorisOutputTable); outputStep = outputStep(outFile);
hi1 = new TransHopMeta(inputStep, udjcStep); hi1 = new TransHopMeta(inputStep, udjcStep);
hi2 = new TransHopMeta(udjcStep, outputStep); hi2 = new TransHopMeta(udjcStep, outputStep);
transMeta.addTransHop(hi1); transMeta.addTransHop(hi1);
...@@ -779,26 +782,34 @@ public class ExtractDataService { ...@@ -779,26 +782,34 @@ public class ExtractDataService {
return userDefinedJavaClassStep; return userDefinedJavaClassStep;
} }
private void deleteFile(String type, String dataSetTableId){ public void deleteFile(String type, String dataSetTableId){
String transName = null; String transName = null;
String jobName = null; String jobName = null;
String fileName = null;
switch (type) { switch (type) {
case "all_scope": case "all_scope":
transName = "trans_" + dataSetTableId; transName = "trans_" + dataSetTableId;
jobName = "job_" + dataSetTableId; jobName = "job_" + dataSetTableId;
fileName = DorisTableUtils.dorisTmpName(dataSetTableId);
break; break;
case "incremental_add": case "incremental_add":
transName = "trans_add_" + dataSetTableId; transName = "trans_add_" + dataSetTableId;
jobName = "job_add_" + dataSetTableId; jobName = "job_add_" + dataSetTableId;
fileName = DorisTableUtils.dorisName(dataSetTableId);
break; break;
case "incremental_delete": case "incremental_delete":
transName = "trans_delete_" + dataSetTableId; transName = "trans_delete_" + dataSetTableId;
jobName = "job_delete_" + dataSetTableId; jobName = "job_delete_" + dataSetTableId;
fileName = DorisTableUtils.dorisDeleteName(dataSetTableId);
break; break;
default: default:
break; break;
} }
try{
File file = new File(root_path + fileName + "." + extention);
FileUtils.forceDelete(file);
}catch (Exception e){}
try{ try{
File file = new File(root_path + jobName + ".kjb"); File file = new File(root_path + jobName + ".kjb");
FileUtils.forceDelete(file); FileUtils.forceDelete(file);
......
ALTER TABLE `dataset_table` ADD COLUMN `last_update_time` BIGINT(13) NULL DEFAULT 0 AFTER `sync_status`;
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论