提交 92549c13 authored 作者: taojinlong's avatar taojinlong

fix: 检测同步任务

上级 8f88821b
...@@ -21,6 +21,8 @@ public class DatasetTable implements Serializable { ...@@ -21,6 +21,8 @@ public class DatasetTable implements Serializable {
private Long createTime; private Long createTime;
private String qrtzInstance;
private String syncStatus; private String syncStatus;
private String info; private String info;
......
...@@ -644,6 +644,76 @@ public class DatasetTableExample { ...@@ -644,6 +644,76 @@ public class DatasetTableExample {
return (Criteria) this; return (Criteria) this;
} }
public Criteria andQrtzInstanceIsNull() {
addCriterion("qrtz_instance is null");
return (Criteria) this;
}
public Criteria andQrtzInstanceIsNotNull() {
addCriterion("qrtz_instance is not null");
return (Criteria) this;
}
public Criteria andQrtzInstanceEqualTo(String value) {
addCriterion("qrtz_instance =", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotEqualTo(String value) {
addCriterion("qrtz_instance <>", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceGreaterThan(String value) {
addCriterion("qrtz_instance >", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceGreaterThanOrEqualTo(String value) {
addCriterion("qrtz_instance >=", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceLessThan(String value) {
addCriterion("qrtz_instance <", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceLessThanOrEqualTo(String value) {
addCriterion("qrtz_instance <=", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceLike(String value) {
addCriterion("qrtz_instance like", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotLike(String value) {
addCriterion("qrtz_instance not like", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceIn(List<String> values) {
addCriterion("qrtz_instance in", values, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotIn(List<String> values) {
addCriterion("qrtz_instance not in", values, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceBetween(String value1, String value2) {
addCriterion("qrtz_instance between", value1, value2, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotBetween(String value1, String value2) {
addCriterion("qrtz_instance not between", value1, value2, "qrtzInstance");
return (Criteria) this;
}
public Criteria andSyncStatusIsNull() { public Criteria andSyncStatusIsNull() {
addCriterion("sync_status is null"); addCriterion("sync_status is null");
return (Criteria) this; return (Criteria) this;
......
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
<result column="mode" jdbcType="INTEGER" property="mode" /> <result column="mode" jdbcType="INTEGER" property="mode" />
<result column="create_by" jdbcType="VARCHAR" property="createBy" /> <result column="create_by" jdbcType="VARCHAR" property="createBy" />
<result column="create_time" jdbcType="BIGINT" property="createTime" /> <result column="create_time" jdbcType="BIGINT" property="createTime" />
<result column="qrtz_instance" jdbcType="VARCHAR" property="qrtzInstance" />
<result column="sync_status" jdbcType="VARCHAR" property="syncStatus" /> <result column="sync_status" jdbcType="VARCHAR" property="syncStatus" />
</resultMap> </resultMap>
<resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable"> <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable">
...@@ -74,7 +75,8 @@ ...@@ -74,7 +75,8 @@
</where> </where>
</sql> </sql>
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, sync_status id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, qrtz_instance,
sync_status
</sql> </sql>
<sql id="Blob_Column_List"> <sql id="Blob_Column_List">
info info
...@@ -130,12 +132,12 @@ ...@@ -130,12 +132,12 @@
<insert id="insert" parameterType="io.dataease.base.domain.DatasetTable"> <insert id="insert" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table (id, `name`, scene_id, insert into dataset_table (id, `name`, scene_id,
data_source_id, `type`, `mode`, data_source_id, `type`, `mode`,
create_by, create_time, sync_status, create_by, create_time, qrtz_instance,
info) sync_status, info)
values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR}, values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR},
#{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER}, #{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER},
#{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{syncStatus,jdbcType=VARCHAR}, #{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{qrtzInstance,jdbcType=VARCHAR},
#{info,jdbcType=LONGVARCHAR}) #{syncStatus,jdbcType=VARCHAR}, #{info,jdbcType=LONGVARCHAR})
</insert> </insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable"> <insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table insert into dataset_table
...@@ -164,6 +166,9 @@ ...@@ -164,6 +166,9 @@
<if test="createTime != null"> <if test="createTime != null">
create_time, create_time,
</if> </if>
<if test="qrtzInstance != null">
qrtz_instance,
</if>
<if test="syncStatus != null"> <if test="syncStatus != null">
sync_status, sync_status,
</if> </if>
...@@ -196,6 +201,9 @@ ...@@ -196,6 +201,9 @@
<if test="createTime != null"> <if test="createTime != null">
#{createTime,jdbcType=BIGINT}, #{createTime,jdbcType=BIGINT},
</if> </if>
<if test="qrtzInstance != null">
#{qrtzInstance,jdbcType=VARCHAR},
</if>
<if test="syncStatus != null"> <if test="syncStatus != null">
#{syncStatus,jdbcType=VARCHAR}, #{syncStatus,jdbcType=VARCHAR},
</if> </if>
...@@ -237,6 +245,9 @@ ...@@ -237,6 +245,9 @@
<if test="record.createTime != null"> <if test="record.createTime != null">
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
</if> </if>
<if test="record.qrtzInstance != null">
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
</if>
<if test="record.syncStatus != null"> <if test="record.syncStatus != null">
sync_status = #{record.syncStatus,jdbcType=VARCHAR}, sync_status = #{record.syncStatus,jdbcType=VARCHAR},
</if> </if>
...@@ -258,6 +269,7 @@ ...@@ -258,6 +269,7 @@
`mode` = #{record.mode,jdbcType=INTEGER}, `mode` = #{record.mode,jdbcType=INTEGER},
create_by = #{record.createBy,jdbcType=VARCHAR}, create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR}, sync_status = #{record.syncStatus,jdbcType=VARCHAR},
info = #{record.info,jdbcType=LONGVARCHAR} info = #{record.info,jdbcType=LONGVARCHAR}
<if test="_parameter != null"> <if test="_parameter != null">
...@@ -274,6 +286,7 @@ ...@@ -274,6 +286,7 @@
`mode` = #{record.mode,jdbcType=INTEGER}, `mode` = #{record.mode,jdbcType=INTEGER},
create_by = #{record.createBy,jdbcType=VARCHAR}, create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR} sync_status = #{record.syncStatus,jdbcType=VARCHAR}
<if test="_parameter != null"> <if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" /> <include refid="Update_By_Example_Where_Clause" />
...@@ -303,6 +316,9 @@ ...@@ -303,6 +316,9 @@
<if test="createTime != null"> <if test="createTime != null">
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
</if> </if>
<if test="qrtzInstance != null">
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
</if>
<if test="syncStatus != null"> <if test="syncStatus != null">
sync_status = #{syncStatus,jdbcType=VARCHAR}, sync_status = #{syncStatus,jdbcType=VARCHAR},
</if> </if>
...@@ -321,6 +337,7 @@ ...@@ -321,6 +337,7 @@
`mode` = #{mode,jdbcType=INTEGER}, `mode` = #{mode,jdbcType=INTEGER},
create_by = #{createBy,jdbcType=VARCHAR}, create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR}, sync_status = #{syncStatus,jdbcType=VARCHAR},
info = #{info,jdbcType=LONGVARCHAR} info = #{info,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
...@@ -334,6 +351,7 @@ ...@@ -334,6 +351,7 @@
`mode` = #{mode,jdbcType=INTEGER}, `mode` = #{mode,jdbcType=INTEGER},
create_by = #{createBy,jdbcType=VARCHAR}, create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR} sync_status = #{syncStatus,jdbcType=VARCHAR}
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
</update> </update>
......
...@@ -145,7 +145,7 @@ public class DatasourceService { ...@@ -145,7 +145,7 @@ public class DatasourceService {
DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource); datasourceRequest.setDatasource(datasource);
datasourceProvider.initDataSource(datasourceRequest); datasourceProvider.initDataSource(datasourceRequest);
LogUtil.error("Succsss to init datasource connection pool: " + datasource.getName()); LogUtil.info("Succsss to init datasource connection pool: " + datasource.getName());
}catch (Exception e){ }catch (Exception e){
LogUtil.error("Failed to init datasource connection pool: " + datasource.getName(), e); LogUtil.error("Failed to init datasource connection pool: " + datasource.getName(), e);
} }
......
package io.dataease.job.sechedule; package io.dataease.job.sechedule;
import com.google.gson.Gson;
import io.dataease.commons.utils.LogUtil; import io.dataease.commons.utils.LogUtil;
import org.quartz.*; import org.quartz.*;
......
...@@ -15,7 +15,7 @@ public class ExtractDataJob extends DeScheduleJob{ ...@@ -15,7 +15,7 @@ public class ExtractDataJob extends DeScheduleJob{
@Override @Override
void businessExecute(JobExecutionContext context) { void businessExecute(JobExecutionContext context) {
extractDataService.extractData(datasetTableId, taskId, updateType); extractDataService.extractData(datasetTableId, taskId, updateType, context);
} }
} }
package io.dataease.service.dataset; package io.dataease.service.dataset;
import com.fit2cloud.quartz.anno.QuartzScheduled;
import com.google.gson.Gson; import com.google.gson.Gson;
import io.dataease.base.domain.*; import io.dataease.base.domain.*;
import io.dataease.base.mapper.DatasetTableIncrementalConfigMapper; import io.dataease.base.mapper.*;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.base.mapper.DatasourceMapper;
import io.dataease.base.mapper.ext.ExtDataSetTableMapper; import io.dataease.base.mapper.ext.ExtDataSetTableMapper;
import io.dataease.commons.constants.JobStatus; import io.dataease.commons.constants.JobStatus;
import io.dataease.commons.utils.*; import io.dataease.commons.utils.*;
...@@ -75,6 +74,10 @@ public class DataSetTableService { ...@@ -75,6 +74,10 @@ public class DataSetTableService {
private DataSetTableUnionService dataSetTableUnionService; private DataSetTableUnionService dataSetTableUnionService;
@Resource @Resource
private DataSetTableTaskLogService dataSetTableTaskLogService; private DataSetTableTaskLogService dataSetTableTaskLogService;
@Resource
private QrtzSchedulerStateMapper qrtzSchedulerStateMapper;
@Resource
private DatasetTableTaskLogMapper datasetTableTaskLogMapper;
@Value("${upload.file.path}") @Value("${upload.file.path}")
private String path; private String path;
...@@ -106,7 +109,7 @@ public class DataSetTableService { ...@@ -106,7 +109,7 @@ public class DataSetTableService {
saveTableField(datasetTable); saveTableField(datasetTable);
if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) { if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) {
commonThreadPool.addTask(() -> { commonThreadPool.addTask(() -> {
extractDataService.extractData(datasetTable.getId(), null, "all_scope"); extractDataService.extractData(datasetTable.getId(), null, "all_scope", null);
}); });
} }
} }
...@@ -867,22 +870,48 @@ public class DataSetTableService { ...@@ -867,22 +870,48 @@ public class DataSetTableService {
} }
public Boolean checkDorisTableIsExists(String id) throws Exception { public Boolean checkDorisTableIsExists(String id) throws Exception {
// Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource"); Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
// JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class); JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);
// DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
// datasourceRequest.setDatasource(dorisDatasource); datasourceRequest.setDatasource(dorisDatasource);
// QueryProvider qp = ProviderFactory.getQueryProvider(dorisDatasource.getType()); QueryProvider qp = ProviderFactory.getQueryProvider(dorisDatasource.getType());
// datasourceRequest.setQuery(qp.searchTable(DorisTableUtils.dorisName(id))); datasourceRequest.setQuery(qp.searchTable(DorisTableUtils.dorisName(id)));
// List<String[]> data = jdbcProvider.getData(datasourceRequest); List<String[]> data = jdbcProvider.getData(datasourceRequest);
// return CollectionUtils.isNotEmpty(data); return CollectionUtils.isNotEmpty(data);
return true;
} }
@QuartzScheduled(cron = "0 0/3 * * * ?")
public void updateDatasetTableStatus(){ public void updateDatasetTableStatus(){
DatasetTable record = new DatasetTable(); List<QrtzSchedulerState> qrtzSchedulerStates = qrtzSchedulerStateMapper.selectByExample(null);
record.setSyncStatus(JobStatus.Completed.name()); List<String> activeQrtzInstances = qrtzSchedulerStates.stream().filter(qrtzSchedulerState -> qrtzSchedulerState.getLastCheckinTime() + qrtzSchedulerState.getCheckinInterval() + 1000 > System.currentTimeMillis()).map(QrtzSchedulerStateKey::getInstanceName).collect(Collectors.toList());
List<DatasetTable> jobStoppeddDatasetTables = new ArrayList<>();
DatasetTableExample example = new DatasetTableExample(); DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()); example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name());
datasetTableMapper.selectByExample(example).forEach(datasetTable -> {
if(StringUtils.isEmpty(datasetTable.getQrtzInstance()) || !activeQrtzInstances.contains(datasetTable.getQrtzInstance().substring(0, datasetTable.getQrtzInstance().length() - 13))){
jobStoppeddDatasetTables.add(datasetTable);
}
});
if(CollectionUtils.isEmpty(jobStoppeddDatasetTables)){
return;
}
DatasetTable record = new DatasetTable();
record.setSyncStatus(JobStatus.Completed.name());
example.clear();
example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()).andIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList()));
datasetTableMapper.updateByExampleSelective(record, example); datasetTableMapper.updateByExampleSelective(record, example);
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
datasetTableTaskLog.setStatus(JobStatus.Error.name());
datasetTableTaskLog.setInfo("Job stopped due to system error.");
DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample();
datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name()).andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList()));
datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample);
} }
} }
...@@ -63,6 +63,7 @@ import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta; ...@@ -63,6 +63,7 @@ import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
import org.pentaho.di.www.SlaveServerJobStatus; import org.pentaho.di.www.SlaveServerJobStatus;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -183,7 +184,7 @@ public class ExtractDataService { ...@@ -183,7 +184,7 @@ public class ExtractDataService {
return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0; return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
} }
public void extractData(String datasetTableId, String taskId, String type) { public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
DatasetTable datasetTable = dataSetTableService.get(datasetTableId); DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
if(updateSyncStatus(datasetTable)){ if(updateSyncStatus(datasetTable)){
LogUtil.info("Skip synchronization task for table : " + datasetTableId); LogUtil.info("Skip synchronization task for table : " + datasetTableId);
...@@ -193,6 +194,10 @@ public class ExtractDataService { ...@@ -193,6 +194,10 @@ public class ExtractDataService {
UpdateType updateType = UpdateType.valueOf(type); UpdateType updateType = UpdateType.valueOf(type);
Datasource datasource = new Datasource(); Datasource datasource = new Datasource();
try { try {
if(context != null){
datasetTable.setQrtzInstance(context.getFireInstanceId());
datasetTableMapper.updateByPrimaryKeySelective(datasetTable);
}
if (StringUtils.isNotEmpty(datasetTable.getDataSourceId())) { if (StringUtils.isNotEmpty(datasetTable.getDataSourceId())) {
datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
} else { } else {
......
ALTER TABLE `dataease`.`dataset_table` ADD COLUMN `sync_status` VARCHAR(45) NULL AFTER `create_time`; ALTER TABLE `dataset_table` ADD COLUMN `sync_status` VARCHAR(45) NULL AFTER `create_time`;
ALTER TABLE `dataset_table` ADD COLUMN `qrtz_instance` VARCHAR(1024) NULL AFTER `create_time`;
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论