提交 7f48bf4f authored 作者: taojinlong's avatar taojinlong

fix: 检测同步任务

上级 8f88821b
...@@ -21,6 +21,8 @@ public class DatasetTable implements Serializable { ...@@ -21,6 +21,8 @@ public class DatasetTable implements Serializable {
private Long createTime; private Long createTime;
private String qrtzInstance;
private String syncStatus; private String syncStatus;
private String info; private String info;
......
...@@ -644,6 +644,76 @@ public class DatasetTableExample { ...@@ -644,6 +644,76 @@ public class DatasetTableExample {
return (Criteria) this; return (Criteria) this;
} }
public Criteria andQrtzInstanceIsNull() {
addCriterion("qrtz_instance is null");
return (Criteria) this;
}
public Criteria andQrtzInstanceIsNotNull() {
addCriterion("qrtz_instance is not null");
return (Criteria) this;
}
public Criteria andQrtzInstanceEqualTo(String value) {
addCriterion("qrtz_instance =", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotEqualTo(String value) {
addCriterion("qrtz_instance <>", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceGreaterThan(String value) {
addCriterion("qrtz_instance >", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceGreaterThanOrEqualTo(String value) {
addCriterion("qrtz_instance >=", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceLessThan(String value) {
addCriterion("qrtz_instance <", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceLessThanOrEqualTo(String value) {
addCriterion("qrtz_instance <=", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceLike(String value) {
addCriterion("qrtz_instance like", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotLike(String value) {
addCriterion("qrtz_instance not like", value, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceIn(List<String> values) {
addCriterion("qrtz_instance in", values, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotIn(List<String> values) {
addCriterion("qrtz_instance not in", values, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceBetween(String value1, String value2) {
addCriterion("qrtz_instance between", value1, value2, "qrtzInstance");
return (Criteria) this;
}
public Criteria andQrtzInstanceNotBetween(String value1, String value2) {
addCriterion("qrtz_instance not between", value1, value2, "qrtzInstance");
return (Criteria) this;
}
public Criteria andSyncStatusIsNull() { public Criteria andSyncStatusIsNull() {
addCriterion("sync_status is null"); addCriterion("sync_status is null");
return (Criteria) this; return (Criteria) this;
......
package io.dataease.base.domain;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@Data
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class QrtzSchedulerState extends QrtzSchedulerStateKey implements Serializable {
private Long lastCheckinTime;
private Long checkinInterval;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package io.dataease.base.domain;
import java.io.Serializable;
import lombok.Data;
@Data
public class QrtzSchedulerStateKey implements Serializable {
private String schedName;
private String instanceName;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
...@@ -10,6 +10,7 @@ ...@@ -10,6 +10,7 @@
<result column="mode" jdbcType="INTEGER" property="mode" /> <result column="mode" jdbcType="INTEGER" property="mode" />
<result column="create_by" jdbcType="VARCHAR" property="createBy" /> <result column="create_by" jdbcType="VARCHAR" property="createBy" />
<result column="create_time" jdbcType="BIGINT" property="createTime" /> <result column="create_time" jdbcType="BIGINT" property="createTime" />
<result column="qrtz_instance" jdbcType="VARCHAR" property="qrtzInstance" />
<result column="sync_status" jdbcType="VARCHAR" property="syncStatus" /> <result column="sync_status" jdbcType="VARCHAR" property="syncStatus" />
</resultMap> </resultMap>
<resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable"> <resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="io.dataease.base.domain.DatasetTable">
...@@ -74,7 +75,8 @@ ...@@ -74,7 +75,8 @@
</where> </where>
</sql> </sql>
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, sync_status id, `name`, scene_id, data_source_id, `type`, `mode`, create_by, create_time, qrtz_instance,
sync_status
</sql> </sql>
<sql id="Blob_Column_List"> <sql id="Blob_Column_List">
info info
...@@ -130,12 +132,12 @@ ...@@ -130,12 +132,12 @@
<insert id="insert" parameterType="io.dataease.base.domain.DatasetTable"> <insert id="insert" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table (id, `name`, scene_id, insert into dataset_table (id, `name`, scene_id,
data_source_id, `type`, `mode`, data_source_id, `type`, `mode`,
create_by, create_time, sync_status, create_by, create_time, qrtz_instance,
info) sync_status, info)
values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR}, values (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR}, #{sceneId,jdbcType=VARCHAR},
#{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER}, #{dataSourceId,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{mode,jdbcType=INTEGER},
#{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{syncStatus,jdbcType=VARCHAR}, #{createBy,jdbcType=VARCHAR}, #{createTime,jdbcType=BIGINT}, #{qrtzInstance,jdbcType=VARCHAR},
#{info,jdbcType=LONGVARCHAR}) #{syncStatus,jdbcType=VARCHAR}, #{info,jdbcType=LONGVARCHAR})
</insert> </insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable"> <insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTable">
insert into dataset_table insert into dataset_table
...@@ -164,6 +166,9 @@ ...@@ -164,6 +166,9 @@
<if test="createTime != null"> <if test="createTime != null">
create_time, create_time,
</if> </if>
<if test="qrtzInstance != null">
qrtz_instance,
</if>
<if test="syncStatus != null"> <if test="syncStatus != null">
sync_status, sync_status,
</if> </if>
...@@ -196,6 +201,9 @@ ...@@ -196,6 +201,9 @@
<if test="createTime != null"> <if test="createTime != null">
#{createTime,jdbcType=BIGINT}, #{createTime,jdbcType=BIGINT},
</if> </if>
<if test="qrtzInstance != null">
#{qrtzInstance,jdbcType=VARCHAR},
</if>
<if test="syncStatus != null"> <if test="syncStatus != null">
#{syncStatus,jdbcType=VARCHAR}, #{syncStatus,jdbcType=VARCHAR},
</if> </if>
...@@ -237,6 +245,9 @@ ...@@ -237,6 +245,9 @@
<if test="record.createTime != null"> <if test="record.createTime != null">
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
</if> </if>
<if test="record.qrtzInstance != null">
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
</if>
<if test="record.syncStatus != null"> <if test="record.syncStatus != null">
sync_status = #{record.syncStatus,jdbcType=VARCHAR}, sync_status = #{record.syncStatus,jdbcType=VARCHAR},
</if> </if>
...@@ -258,6 +269,7 @@ ...@@ -258,6 +269,7 @@
`mode` = #{record.mode,jdbcType=INTEGER}, `mode` = #{record.mode,jdbcType=INTEGER},
create_by = #{record.createBy,jdbcType=VARCHAR}, create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR}, sync_status = #{record.syncStatus,jdbcType=VARCHAR},
info = #{record.info,jdbcType=LONGVARCHAR} info = #{record.info,jdbcType=LONGVARCHAR}
<if test="_parameter != null"> <if test="_parameter != null">
...@@ -274,6 +286,7 @@ ...@@ -274,6 +286,7 @@
`mode` = #{record.mode,jdbcType=INTEGER}, `mode` = #{record.mode,jdbcType=INTEGER},
create_by = #{record.createBy,jdbcType=VARCHAR}, create_by = #{record.createBy,jdbcType=VARCHAR},
create_time = #{record.createTime,jdbcType=BIGINT}, create_time = #{record.createTime,jdbcType=BIGINT},
qrtz_instance = #{record.qrtzInstance,jdbcType=VARCHAR},
sync_status = #{record.syncStatus,jdbcType=VARCHAR} sync_status = #{record.syncStatus,jdbcType=VARCHAR}
<if test="_parameter != null"> <if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" /> <include refid="Update_By_Example_Where_Clause" />
...@@ -303,6 +316,9 @@ ...@@ -303,6 +316,9 @@
<if test="createTime != null"> <if test="createTime != null">
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
</if> </if>
<if test="qrtzInstance != null">
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
</if>
<if test="syncStatus != null"> <if test="syncStatus != null">
sync_status = #{syncStatus,jdbcType=VARCHAR}, sync_status = #{syncStatus,jdbcType=VARCHAR},
</if> </if>
...@@ -321,6 +337,7 @@ ...@@ -321,6 +337,7 @@
`mode` = #{mode,jdbcType=INTEGER}, `mode` = #{mode,jdbcType=INTEGER},
create_by = #{createBy,jdbcType=VARCHAR}, create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR}, sync_status = #{syncStatus,jdbcType=VARCHAR},
info = #{info,jdbcType=LONGVARCHAR} info = #{info,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
...@@ -334,6 +351,7 @@ ...@@ -334,6 +351,7 @@
`mode` = #{mode,jdbcType=INTEGER}, `mode` = #{mode,jdbcType=INTEGER},
create_by = #{createBy,jdbcType=VARCHAR}, create_by = #{createBy,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=BIGINT}, create_time = #{createTime,jdbcType=BIGINT},
qrtz_instance = #{qrtzInstance,jdbcType=VARCHAR},
sync_status = #{syncStatus,jdbcType=VARCHAR} sync_status = #{syncStatus,jdbcType=VARCHAR}
where id = #{id,jdbcType=VARCHAR} where id = #{id,jdbcType=VARCHAR}
</update> </update>
......
package io.dataease.base.mapper;
import io.dataease.base.domain.QrtzSchedulerState;
import io.dataease.base.domain.QrtzSchedulerStateExample;
import io.dataease.base.domain.QrtzSchedulerStateKey;
import java.util.List;
import org.apache.ibatis.annotations.Param;
public interface QrtzSchedulerStateMapper {
long countByExample(QrtzSchedulerStateExample example);
int deleteByExample(QrtzSchedulerStateExample example);
int deleteByPrimaryKey(QrtzSchedulerStateKey key);
int insert(QrtzSchedulerState record);
int insertSelective(QrtzSchedulerState record);
List<QrtzSchedulerState> selectByExample(QrtzSchedulerStateExample example);
QrtzSchedulerState selectByPrimaryKey(QrtzSchedulerStateKey key);
int updateByExampleSelective(@Param("record") QrtzSchedulerState record, @Param("example") QrtzSchedulerStateExample example);
int updateByExample(@Param("record") QrtzSchedulerState record, @Param("example") QrtzSchedulerStateExample example);
int updateByPrimaryKeySelective(QrtzSchedulerState record);
int updateByPrimaryKey(QrtzSchedulerState record);
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="io.dataease.base.mapper.QrtzSchedulerStateMapper">
<resultMap id="BaseResultMap" type="io.dataease.base.domain.QrtzSchedulerState">
<id column="SCHED_NAME" jdbcType="VARCHAR" property="schedName" />
<id column="INSTANCE_NAME" jdbcType="VARCHAR" property="instanceName" />
<result column="LAST_CHECKIN_TIME" jdbcType="BIGINT" property="lastCheckinTime" />
<result column="CHECKIN_INTERVAL" jdbcType="BIGINT" property="checkinInterval" />
</resultMap>
<sql id="Example_Where_Clause">
<where>
<foreach collection="oredCriteria" item="criteria" separator="or">
<if test="criteria.valid">
<trim prefix="(" prefixOverrides="and" suffix=")">
<foreach collection="criteria.criteria" item="criterion">
<choose>
<when test="criterion.noValue">
and ${criterion.condition}
</when>
<when test="criterion.singleValue">
and ${criterion.condition} #{criterion.value}
</when>
<when test="criterion.betweenValue">
and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
</when>
<when test="criterion.listValue">
and ${criterion.condition}
<foreach close=")" collection="criterion.value" item="listItem" open="(" separator=",">
#{listItem}
</foreach>
</when>
</choose>
</foreach>
</trim>
</if>
</foreach>
</where>
</sql>
<sql id="Update_By_Example_Where_Clause">
<where>
<foreach collection="example.oredCriteria" item="criteria" separator="or">
<if test="criteria.valid">
<trim prefix="(" prefixOverrides="and" suffix=")">
<foreach collection="criteria.criteria" item="criterion">
<choose>
<when test="criterion.noValue">
and ${criterion.condition}
</when>
<when test="criterion.singleValue">
and ${criterion.condition} #{criterion.value}
</when>
<when test="criterion.betweenValue">
and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
</when>
<when test="criterion.listValue">
and ${criterion.condition}
<foreach close=")" collection="criterion.value" item="listItem" open="(" separator=",">
#{listItem}
</foreach>
</when>
</choose>
</foreach>
</trim>
</if>
</foreach>
</where>
</sql>
<sql id="Base_Column_List">
SCHED_NAME, INSTANCE_NAME, LAST_CHECKIN_TIME, CHECKIN_INTERVAL
</sql>
<select id="selectByExample" parameterType="io.dataease.base.domain.QrtzSchedulerStateExample" resultMap="BaseResultMap">
select
<if test="distinct">
distinct
</if>
<include refid="Base_Column_List" />
from qrtz_scheduler_state
<if test="_parameter != null">
<include refid="Example_Where_Clause" />
</if>
<if test="orderByClause != null">
order by ${orderByClause}
</if>
</select>
<select id="selectByPrimaryKey" parameterType="io.dataease.base.domain.QrtzSchedulerStateKey" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from qrtz_scheduler_state
where SCHED_NAME = #{schedName,jdbcType=VARCHAR}
and INSTANCE_NAME = #{instanceName,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="io.dataease.base.domain.QrtzSchedulerStateKey">
delete from qrtz_scheduler_state
where SCHED_NAME = #{schedName,jdbcType=VARCHAR}
and INSTANCE_NAME = #{instanceName,jdbcType=VARCHAR}
</delete>
<delete id="deleteByExample" parameterType="io.dataease.base.domain.QrtzSchedulerStateExample">
delete from qrtz_scheduler_state
<if test="_parameter != null">
<include refid="Example_Where_Clause" />
</if>
</delete>
<insert id="insert" parameterType="io.dataease.base.domain.QrtzSchedulerState">
insert into qrtz_scheduler_state (SCHED_NAME, INSTANCE_NAME, LAST_CHECKIN_TIME,
CHECKIN_INTERVAL)
values (#{schedName,jdbcType=VARCHAR}, #{instanceName,jdbcType=VARCHAR}, #{lastCheckinTime,jdbcType=BIGINT},
#{checkinInterval,jdbcType=BIGINT})
</insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.QrtzSchedulerState">
insert into qrtz_scheduler_state
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="schedName != null">
SCHED_NAME,
</if>
<if test="instanceName != null">
INSTANCE_NAME,
</if>
<if test="lastCheckinTime != null">
LAST_CHECKIN_TIME,
</if>
<if test="checkinInterval != null">
CHECKIN_INTERVAL,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="schedName != null">
#{schedName,jdbcType=VARCHAR},
</if>
<if test="instanceName != null">
#{instanceName,jdbcType=VARCHAR},
</if>
<if test="lastCheckinTime != null">
#{lastCheckinTime,jdbcType=BIGINT},
</if>
<if test="checkinInterval != null">
#{checkinInterval,jdbcType=BIGINT},
</if>
</trim>
</insert>
<select id="countByExample" parameterType="io.dataease.base.domain.QrtzSchedulerStateExample" resultType="java.lang.Long">
select count(*) from qrtz_scheduler_state
<if test="_parameter != null">
<include refid="Example_Where_Clause" />
</if>
</select>
<update id="updateByExampleSelective" parameterType="map">
update qrtz_scheduler_state
<set>
<if test="record.schedName != null">
SCHED_NAME = #{record.schedName,jdbcType=VARCHAR},
</if>
<if test="record.instanceName != null">
INSTANCE_NAME = #{record.instanceName,jdbcType=VARCHAR},
</if>
<if test="record.lastCheckinTime != null">
LAST_CHECKIN_TIME = #{record.lastCheckinTime,jdbcType=BIGINT},
</if>
<if test="record.checkinInterval != null">
CHECKIN_INTERVAL = #{record.checkinInterval,jdbcType=BIGINT},
</if>
</set>
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
</if>
</update>
<update id="updateByExample" parameterType="map">
update qrtz_scheduler_state
set SCHED_NAME = #{record.schedName,jdbcType=VARCHAR},
INSTANCE_NAME = #{record.instanceName,jdbcType=VARCHAR},
LAST_CHECKIN_TIME = #{record.lastCheckinTime,jdbcType=BIGINT},
CHECKIN_INTERVAL = #{record.checkinInterval,jdbcType=BIGINT}
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
</if>
</update>
<update id="updateByPrimaryKeySelective" parameterType="io.dataease.base.domain.QrtzSchedulerState">
update qrtz_scheduler_state
<set>
<if test="lastCheckinTime != null">
LAST_CHECKIN_TIME = #{lastCheckinTime,jdbcType=BIGINT},
</if>
<if test="checkinInterval != null">
CHECKIN_INTERVAL = #{checkinInterval,jdbcType=BIGINT},
</if>
</set>
where SCHED_NAME = #{schedName,jdbcType=VARCHAR}
and INSTANCE_NAME = #{instanceName,jdbcType=VARCHAR}
</update>
<update id="updateByPrimaryKey" parameterType="io.dataease.base.domain.QrtzSchedulerState">
update qrtz_scheduler_state
set LAST_CHECKIN_TIME = #{lastCheckinTime,jdbcType=BIGINT},
CHECKIN_INTERVAL = #{checkinInterval,jdbcType=BIGINT}
where SCHED_NAME = #{schedName,jdbcType=VARCHAR}
and INSTANCE_NAME = #{instanceName,jdbcType=VARCHAR}
</update>
</mapper>
\ No newline at end of file
...@@ -145,7 +145,7 @@ public class DatasourceService { ...@@ -145,7 +145,7 @@ public class DatasourceService {
DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource); datasourceRequest.setDatasource(datasource);
datasourceProvider.initDataSource(datasourceRequest); datasourceProvider.initDataSource(datasourceRequest);
LogUtil.error("Succsss to init datasource connection pool: " + datasource.getName()); LogUtil.info("Succsss to init datasource connection pool: " + datasource.getName());
}catch (Exception e){ }catch (Exception e){
LogUtil.error("Failed to init datasource connection pool: " + datasource.getName(), e); LogUtil.error("Failed to init datasource connection pool: " + datasource.getName(), e);
} }
......
package io.dataease.job.sechedule; package io.dataease.job.sechedule;
import com.google.gson.Gson;
import io.dataease.commons.utils.LogUtil; import io.dataease.commons.utils.LogUtil;
import org.quartz.*; import org.quartz.*;
......
...@@ -15,7 +15,7 @@ public class ExtractDataJob extends DeScheduleJob{ ...@@ -15,7 +15,7 @@ public class ExtractDataJob extends DeScheduleJob{
@Override @Override
void businessExecute(JobExecutionContext context) { void businessExecute(JobExecutionContext context) {
extractDataService.extractData(datasetTableId, taskId, updateType); extractDataService.extractData(datasetTableId, taskId, updateType, context);
} }
} }
package io.dataease.service.dataset; package io.dataease.service.dataset;
import com.fit2cloud.quartz.anno.QuartzScheduled;
import com.google.gson.Gson; import com.google.gson.Gson;
import io.dataease.base.domain.*; import io.dataease.base.domain.*;
import io.dataease.base.mapper.DatasetTableIncrementalConfigMapper; import io.dataease.base.mapper.*;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.base.mapper.DatasourceMapper;
import io.dataease.base.mapper.ext.ExtDataSetTableMapper; import io.dataease.base.mapper.ext.ExtDataSetTableMapper;
import io.dataease.commons.constants.JobStatus; import io.dataease.commons.constants.JobStatus;
import io.dataease.commons.utils.*; import io.dataease.commons.utils.*;
...@@ -75,6 +74,10 @@ public class DataSetTableService { ...@@ -75,6 +74,10 @@ public class DataSetTableService {
private DataSetTableUnionService dataSetTableUnionService; private DataSetTableUnionService dataSetTableUnionService;
@Resource @Resource
private DataSetTableTaskLogService dataSetTableTaskLogService; private DataSetTableTaskLogService dataSetTableTaskLogService;
@Resource
private QrtzSchedulerStateMapper qrtzSchedulerStateMapper;
@Resource
private DatasetTableTaskLogMapper datasetTableTaskLogMapper;
@Value("${upload.file.path}") @Value("${upload.file.path}")
private String path; private String path;
...@@ -106,7 +109,7 @@ public class DataSetTableService { ...@@ -106,7 +109,7 @@ public class DataSetTableService {
saveTableField(datasetTable); saveTableField(datasetTable);
if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) { if (StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")) {
commonThreadPool.addTask(() -> { commonThreadPool.addTask(() -> {
extractDataService.extractData(datasetTable.getId(), null, "all_scope"); extractDataService.extractData(datasetTable.getId(), null, "all_scope", null);
}); });
} }
} }
...@@ -867,22 +870,48 @@ public class DataSetTableService { ...@@ -867,22 +870,48 @@ public class DataSetTableService {
} }
public Boolean checkDorisTableIsExists(String id) throws Exception { public Boolean checkDorisTableIsExists(String id) throws Exception {
// Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource"); Datasource dorisDatasource = (Datasource) CommonBeanFactory.getBean("DorisDatasource");
// JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class); JdbcProvider jdbcProvider = CommonBeanFactory.getBean(JdbcProvider.class);
// DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
// datasourceRequest.setDatasource(dorisDatasource); datasourceRequest.setDatasource(dorisDatasource);
// QueryProvider qp = ProviderFactory.getQueryProvider(dorisDatasource.getType()); QueryProvider qp = ProviderFactory.getQueryProvider(dorisDatasource.getType());
// datasourceRequest.setQuery(qp.searchTable(DorisTableUtils.dorisName(id))); datasourceRequest.setQuery(qp.searchTable(DorisTableUtils.dorisName(id)));
// List<String[]> data = jdbcProvider.getData(datasourceRequest); List<String[]> data = jdbcProvider.getData(datasourceRequest);
// return CollectionUtils.isNotEmpty(data); return CollectionUtils.isNotEmpty(data);
return true;
} }
@QuartzScheduled(cron = "0 0/3 * * * ?")
public void updateDatasetTableStatus(){ public void updateDatasetTableStatus(){
DatasetTable record = new DatasetTable(); List<QrtzSchedulerState> qrtzSchedulerStates = qrtzSchedulerStateMapper.selectByExample(null);
record.setSyncStatus(JobStatus.Completed.name()); List<String> activeQrtzInstances = qrtzSchedulerStates.stream().filter(qrtzSchedulerState -> qrtzSchedulerState.getLastCheckinTime() + qrtzSchedulerState.getCheckinInterval() + 1000 > System.currentTimeMillis()).map(QrtzSchedulerStateKey::getInstanceName).collect(Collectors.toList());
List<DatasetTable> jobStoppeddDatasetTables = new ArrayList<>();
DatasetTableExample example = new DatasetTableExample(); DatasetTableExample example = new DatasetTableExample();
example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()); example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name());
datasetTableMapper.selectByExample(example).forEach(datasetTable -> {
if(StringUtils.isEmpty(datasetTable.getQrtzInstance()) || !activeQrtzInstances.contains(datasetTable.getQrtzInstance().substring(0, datasetTable.getQrtzInstance().length() - 13))){
jobStoppeddDatasetTables.add(datasetTable);
}
});
if(CollectionUtils.isEmpty(jobStoppeddDatasetTables)){
return;
}
DatasetTable record = new DatasetTable();
record.setSyncStatus(JobStatus.Completed.name());
example.clear();
example.createCriteria().andSyncStatusEqualTo(JobStatus.Underway.name()).andIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList()));
datasetTableMapper.updateByExampleSelective(record, example); datasetTableMapper.updateByExampleSelective(record, example);
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
datasetTableTaskLog.setStatus(JobStatus.Error.name());
datasetTableTaskLog.setInfo("Job stopped due to system error.");
DatasetTableTaskLogExample datasetTableTaskLogExample = new DatasetTableTaskLogExample();
datasetTableTaskLogExample.createCriteria().andStatusEqualTo(JobStatus.Underway.name()).andTableIdIn(jobStoppeddDatasetTables.stream().map(DatasetTable::getId).collect(Collectors.toList()));
datasetTableTaskLogMapper.updateByExampleSelective(datasetTableTaskLog, datasetTableTaskLogExample);
} }
} }
...@@ -63,6 +63,7 @@ import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta; ...@@ -63,6 +63,7 @@ import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta; import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
import org.pentaho.di.www.SlaveServerJobStatus; import org.pentaho.di.www.SlaveServerJobStatus;
import org.quartz.JobExecutionContext;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy; import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
...@@ -183,7 +184,7 @@ public class ExtractDataService { ...@@ -183,7 +184,7 @@ public class ExtractDataService {
return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0; return datasetTableMapper.updateByExampleSelective(datasetTable, example) == 0;
} }
public void extractData(String datasetTableId, String taskId, String type) { public void extractData(String datasetTableId, String taskId, String type, JobExecutionContext context) {
DatasetTable datasetTable = dataSetTableService.get(datasetTableId); DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
if(updateSyncStatus(datasetTable)){ if(updateSyncStatus(datasetTable)){
LogUtil.info("Skip synchronization task for table : " + datasetTableId); LogUtil.info("Skip synchronization task for table : " + datasetTableId);
...@@ -193,6 +194,10 @@ public class ExtractDataService { ...@@ -193,6 +194,10 @@ public class ExtractDataService {
UpdateType updateType = UpdateType.valueOf(type); UpdateType updateType = UpdateType.valueOf(type);
Datasource datasource = new Datasource(); Datasource datasource = new Datasource();
try { try {
if(context != null){
datasetTable.setQrtzInstance(context.getFireInstanceId());
datasetTableMapper.updateByPrimaryKeySelective(datasetTable);
}
if (StringUtils.isNotEmpty(datasetTable.getDataSourceId())) { if (StringUtils.isNotEmpty(datasetTable.getDataSourceId())) {
datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
} else { } else {
......
ALTER TABLE `dataease`.`dataset_table` ADD COLUMN `sync_status` VARCHAR(45) NULL AFTER `create_time`; ALTER TABLE `dataset_table` ADD COLUMN `sync_status` VARCHAR(45) NULL AFTER `create_time`;
ALTER TABLE `dataset_table` ADD COLUMN `qrtz_instance` VARCHAR(1024) NULL AFTER `create_time`;
...@@ -498,6 +498,7 @@ export default { ...@@ -498,6 +498,7 @@ export default {
this.update_task = false this.update_task = false
this.resetTaskForm() this.resetTaskForm()
this.listTask() this.listTask()
this.listTaskLog()
}) })
}, },
deleteTask(task) { deleteTask(task) {
...@@ -514,6 +515,7 @@ export default { ...@@ -514,6 +515,7 @@ export default {
}) })
this.resetTaskForm() this.resetTaskForm()
this.listTask() this.listTask()
this.listTaskLog()
}) })
}).catch(() => { }).catch(() => {
}) })
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论