提交 e97297fd authored 作者: TaoJinlong's avatar TaoJinlong

feat: 增量更新数据

上级 5f823af9
......@@ -326,6 +326,11 @@
<artifactId>hbase-common</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
......
package io.dataease.base.domain;
import java.io.Serializable;
import lombok.Data;
@Data
public class DatasetTableIncrementalConfig implements Serializable {
private String id;
private String tableId;
private String incrementalDelete;
private String incrementalAdd;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package io.dataease.base.mapper;
import io.dataease.base.domain.DatasetTableIncrementalConfig;
import io.dataease.base.domain.DatasetTableIncrementalConfigExample;
import java.util.List;
import org.apache.ibatis.annotations.Param;
public interface DatasetTableIncrementalConfigMapper {
long countByExample(DatasetTableIncrementalConfigExample example);
int deleteByExample(DatasetTableIncrementalConfigExample example);
int insert(DatasetTableIncrementalConfig record);
int insertSelective(DatasetTableIncrementalConfig record);
List<DatasetTableIncrementalConfig> selectByExample(DatasetTableIncrementalConfigExample example);
int updateByExampleSelective(@Param("record") DatasetTableIncrementalConfig record, @Param("example") DatasetTableIncrementalConfigExample example);
int updateByExample(@Param("record") DatasetTableIncrementalConfig record, @Param("example") DatasetTableIncrementalConfigExample example);
}
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="io.dataease.base.mapper.DatasetTableIncrementalConfigMapper">
<resultMap id="BaseResultMap" type="io.dataease.base.domain.DatasetTableIncrementalConfig">
<result column="id" jdbcType="VARCHAR" property="id" />
<result column="table_id" jdbcType="VARCHAR" property="tableId" />
<result column="incremental_delete" jdbcType="VARCHAR" property="incrementalDelete" />
<result column="incremental_add" jdbcType="VARCHAR" property="incrementalAdd" />
</resultMap>
<sql id="Example_Where_Clause">
<where>
<foreach collection="oredCriteria" item="criteria" separator="or">
<if test="criteria.valid">
<trim prefix="(" prefixOverrides="and" suffix=")">
<foreach collection="criteria.criteria" item="criterion">
<choose>
<when test="criterion.noValue">
and ${criterion.condition}
</when>
<when test="criterion.singleValue">
and ${criterion.condition} #{criterion.value}
</when>
<when test="criterion.betweenValue">
and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
</when>
<when test="criterion.listValue">
and ${criterion.condition}
<foreach close=")" collection="criterion.value" item="listItem" open="(" separator=",">
#{listItem}
</foreach>
</when>
</choose>
</foreach>
</trim>
</if>
</foreach>
</where>
</sql>
<sql id="Update_By_Example_Where_Clause">
<where>
<foreach collection="example.oredCriteria" item="criteria" separator="or">
<if test="criteria.valid">
<trim prefix="(" prefixOverrides="and" suffix=")">
<foreach collection="criteria.criteria" item="criterion">
<choose>
<when test="criterion.noValue">
and ${criterion.condition}
</when>
<when test="criterion.singleValue">
and ${criterion.condition} #{criterion.value}
</when>
<when test="criterion.betweenValue">
and ${criterion.condition} #{criterion.value} and #{criterion.secondValue}
</when>
<when test="criterion.listValue">
and ${criterion.condition}
<foreach close=")" collection="criterion.value" item="listItem" open="(" separator=",">
#{listItem}
</foreach>
</when>
</choose>
</foreach>
</trim>
</if>
</foreach>
</where>
</sql>
<sql id="Base_Column_List">
id, table_id, incremental_delete, incremental_add
</sql>
<select id="selectByExample" parameterType="io.dataease.base.domain.DatasetTableIncrementalConfigExample" resultMap="BaseResultMap">
select
<if test="distinct">
distinct
</if>
<include refid="Base_Column_List" />
from dataset_table_incremental_config
<if test="_parameter != null">
<include refid="Example_Where_Clause" />
</if>
<if test="orderByClause != null">
order by ${orderByClause}
</if>
</select>
<delete id="deleteByExample" parameterType="io.dataease.base.domain.DatasetTableIncrementalConfigExample">
delete from dataset_table_incremental_config
<if test="_parameter != null">
<include refid="Example_Where_Clause" />
</if>
</delete>
<insert id="insert" parameterType="io.dataease.base.domain.DatasetTableIncrementalConfig">
insert into dataset_table_incremental_config (id, table_id, incremental_delete,
incremental_add)
values (#{id,jdbcType=VARCHAR}, #{tableId,jdbcType=VARCHAR}, #{incrementalDelete,jdbcType=VARCHAR},
#{incrementalAdd,jdbcType=VARCHAR})
</insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTableIncrementalConfig">
insert into dataset_table_incremental_config
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">
id,
</if>
<if test="tableId != null">
table_id,
</if>
<if test="incrementalDelete != null">
incremental_delete,
</if>
<if test="incrementalAdd != null">
incremental_add,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
#{id,jdbcType=VARCHAR},
</if>
<if test="tableId != null">
#{tableId,jdbcType=VARCHAR},
</if>
<if test="incrementalDelete != null">
#{incrementalDelete,jdbcType=VARCHAR},
</if>
<if test="incrementalAdd != null">
#{incrementalAdd,jdbcType=VARCHAR},
</if>
</trim>
</insert>
<select id="countByExample" parameterType="io.dataease.base.domain.DatasetTableIncrementalConfigExample" resultType="java.lang.Long">
select count(*) from dataset_table_incremental_config
<if test="_parameter != null">
<include refid="Example_Where_Clause" />
</if>
</select>
<update id="updateByExampleSelective" parameterType="map">
update dataset_table_incremental_config
<set>
<if test="record.id != null">
id = #{record.id,jdbcType=VARCHAR},
</if>
<if test="record.tableId != null">
table_id = #{record.tableId,jdbcType=VARCHAR},
</if>
<if test="record.incrementalDelete != null">
incremental_delete = #{record.incrementalDelete,jdbcType=VARCHAR},
</if>
<if test="record.incrementalAdd != null">
incremental_add = #{record.incrementalAdd,jdbcType=VARCHAR},
</if>
</set>
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
</if>
</update>
<update id="updateByExample" parameterType="map">
update dataset_table_incremental_config
set id = #{record.id,jdbcType=VARCHAR},
table_id = #{record.tableId,jdbcType=VARCHAR},
incremental_delete = #{record.incrementalDelete,jdbcType=VARCHAR},
incremental_add = #{record.incrementalAdd,jdbcType=VARCHAR}
<if test="_parameter != null">
<include refid="Update_By_Example_Where_Clause" />
</if>
</update>
</mapper>
\ No newline at end of file
package io.dataease.commons.constants;
public enum ScheduleType {
CRON, SIMPLE
CRON, SIMPLE, SIMPLE_COMPLETE
}
package io.dataease.commons.constants;
public enum UpdateType {
all_scope, add_scope
}
......@@ -2,6 +2,7 @@ package io.dataease.controller.dataset;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.DatasetTableIncrementalConfig;
import io.dataease.controller.request.dataset.DataSetTableRequest;
import io.dataease.datasource.dto.TableFiled;
import io.dataease.service.dataset.DataSetTableService;
......@@ -70,4 +71,15 @@ public class DataSetTableController {
public Map<String, Object> getSQLPreview(@RequestBody DataSetTableRequest dataSetTableRequest) throws Exception {
return dataSetTableService.getSQLPreview(dataSetTableRequest);
}
@PostMapping("incrementalConfig")
public DatasetTableIncrementalConfig incrementalConfig(@RequestBody DatasetTableIncrementalConfig datasetTableIncrementalConfig) throws Exception {
return dataSetTableService.incrementalConfig(datasetTableIncrementalConfig);
}
@PostMapping("save/incrementalConfig")
public void saveIncrementalConfig(@RequestBody DatasetTableIncrementalConfig datasetTableIncrementalConfig) throws Exception {
dataSetTableService.saveIncrementalConfig(datasetTableIncrementalConfig);
}
}
......@@ -8,6 +8,7 @@ public abstract class DeScheduleJob implements Job {
protected String datasetTableId;
protected String expression;
protected String taskId;
protected String updateType;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
......@@ -16,6 +17,7 @@ public abstract class DeScheduleJob implements Job {
this.datasetTableId = jobDataMap.getString("datasetTableId");
this.expression = jobDataMap.getString("expression");
this.taskId = jobDataMap.getString("taskId");
this.updateType = jobDataMap.getString("updateType");
LogUtil.info(jobKey.getGroup() + " Running: " + datasetTableId);
LogUtil.info("CronExpression: " + expression);
......
......@@ -16,7 +16,7 @@ public class ExtractDataJob extends DeScheduleJob{
@Override
void businessExecute(JobExecutionContext context) {
extractDataService.extractData(datasetTableId, taskId);
extractDataService.extractData(datasetTableId, taskId, updateType);
}
}
......@@ -369,11 +369,12 @@ public class ScheduleManager {
addOrUpdateCronJob(jobKey, triggerKey, jobClass, cron, startTime, endTime, null);
}
public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String taskId) {
public JobDataMap getDefaultJobDataMap(String resourceId, String expression, String taskId, String updateType) {
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("datasetTableId", resourceId);
jobDataMap.put("taskId", taskId);
jobDataMap.put("expression", expression);
jobDataMap.put("updateType", updateType);
return jobDataMap;
}
......
package io.dataease.service;
import io.dataease.base.domain.DatasetTableTask;
import io.dataease.commons.constants.ScheduleType;
import io.dataease.job.sechedule.ExtractDataJob;
import io.dataease.job.sechedule.ScheduleManager;
import org.apache.commons.lang3.StringUtils;
......@@ -21,12 +22,13 @@ public class ScheduleService {
private ScheduleManager scheduleManager;
public void addSchedule(DatasetTableTask datasetTableTask) throws Exception {
if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "0")) {
if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), ScheduleType.SIMPLE.toString())) {
scheduleManager.addOrUpdateSingleJob(new JobKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
ExtractDataJob.class,
new Date(datasetTableTask.getStartTime()), scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId()));
} else if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), "1")) {
new Date(datasetTableTask.getStartTime()),
scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId(), datasetTableTask.getType()));
} else if (StringUtils.equalsIgnoreCase(datasetTableTask.getRate(), ScheduleType.CRON.toString())) {
Date endTime;
if (datasetTableTask.getEndTime() == null || datasetTableTask.getEndTime() == 0) {
endTime = null;
......@@ -38,7 +40,7 @@ public class ScheduleService {
new TriggerKey(datasetTableTask.getId(), datasetTableTask.getTableId()),
ExtractDataJob.class,
datasetTableTask.getCron(), new Date(datasetTableTask.getStartTime()), endTime,
scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId()));
scheduleManager.getDefaultJobDataMap(datasetTableTask.getTableId(), datasetTableTask.getCron(), datasetTableTask.getId(), datasetTableTask.getType()));
}
}
......
......@@ -2,10 +2,8 @@ package io.dataease.service.dataset;
import com.google.gson.Gson;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableExample;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.Datasource;
import io.dataease.base.domain.*;
import io.dataease.base.mapper.DatasetTableIncrementalConfigMapper;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.base.mapper.DatasourceMapper;
import io.dataease.commons.utils.BeanUtils;
......@@ -40,6 +38,8 @@ public class DataSetTableService {
private DataSetTableFieldsService dataSetTableFieldsService;
@Resource
private DataSetTableTaskService dataSetTableTaskService;
@Resource
private DatasetTableIncrementalConfigMapper datasetTableIncrementalConfigMapper;
public void batchInsert(List<DatasetTable> datasetTable) throws Exception {
for (DatasetTable table : datasetTable) {
......@@ -261,6 +261,20 @@ public class DataSetTableService {
return data;
}
public List<String[]> getDataSetDataBySql(String datasourceId, String table, String sql) {
List<String[]> data = new ArrayList<>();
Datasource ds = datasourceMapper.selectByPrimaryKey(datasourceId);
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
datasourceRequest.setQuery(sql);
try {
return datasourceProvider.getData(datasourceRequest);
} catch (Exception e) {
}
return data;
}
public void saveTableField(DatasetTable datasetTable) throws Exception {
Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
DataSetTableRequest dataSetTableRequest = new DataSetTableRequest();
......@@ -349,4 +363,35 @@ public class DataSetTableService {
return 0;
}
}
public DatasetTableIncrementalConfig incrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig){
if(StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())){return new DatasetTableIncrementalConfig();}
DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample();
example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId());
List<DatasetTableIncrementalConfig> configs = datasetTableIncrementalConfigMapper.selectByExample(example);
if(CollectionUtils.isNotEmpty(configs)){
return configs.get(0);
}else {
return new DatasetTableIncrementalConfig();
}
}
public DatasetTableIncrementalConfig incrementalConfig(String datasetTableId){
DatasetTableIncrementalConfig datasetTableIncrementalConfig = new DatasetTableIncrementalConfig();
datasetTableIncrementalConfig.setTableId(datasetTableId);
return incrementalConfig(datasetTableIncrementalConfig);
}
public void saveIncrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig){
if(StringUtils.isEmpty(datasetTableIncrementalConfig.getId())){
datasetTableIncrementalConfig.setId(UUID.randomUUID().toString());
datasetTableIncrementalConfigMapper.insertSelective(datasetTableIncrementalConfig);
}else{
DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample();
example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId());
datasetTableIncrementalConfigMapper.updateByExample(datasetTableIncrementalConfig, example);
}
}
}
......@@ -70,6 +70,10 @@ public class DataSetTableTaskService {
return datasetTableTaskMapper.selectByPrimaryKey(id);
}
public void update(DatasetTableTask datasetTableTask) {
datasetTableTaskMapper.updateByPrimaryKey(datasetTableTask);
}
public List<DatasetTableTask> list(DatasetTableTask datasetTableTask) {
DatasetTableTaskExample datasetTableTaskExample = new DatasetTableTaskExample();
DatasetTableTaskExample.Criteria criteria = datasetTableTaskExample.createCriteria();
......
DROP TABLE IF EXISTS `dataset_table_incremental_config`;
CREATE TABLE IF NOT EXISTS `dataset_table_incremental_config`
(
`id` varchar(50) NOT NULL COMMENT 'ID',
`table_id` varchar(50) NOT NULL COMMENT '表ID',
`incremental_delete` longtext COMMENT '详细信息',
`incremental_add` longtext COMMENT '详细信息',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
\ No newline at end of file
......@@ -37,11 +37,11 @@
</commentGenerator>
<!-- jdbc连接信息 --> <!-- EduLoanManage EduTestDataBase -->
<!--<jdbcConnection driverClass="com.mysql.jdbc.Driver" connectionURL="jdbc:mysql://192.168.20.180:3306/fit2cloud"-->
<!--userId="root" password="Fit2cloud2015!" />-->
<jdbcConnection driverClass="com.mysql.cj.jdbc.Driver"
connectionURL="${spring.datasource.url}&amp;nullCatalogMeansCurrent=true"
userId="${spring.datasource.username}" password="${spring.datasource.password}"/>
<jdbcConnection driverClass="com.mysql.jdbc.Driver" connectionURL="jdbc:mysql://62.234.205.170:3306/dataease"
userId="root" password="Password123@mysql" />
<!-- <jdbcConnection driverClass="com.mysql.cj.jdbc.Driver"-->
<!-- connectionURL="${spring.datasource.url}&amp;nullCatalogMeansCurrent=true"-->
<!-- userId="${spring.datasource.username}" password="${spring.datasource.password}"/>-->
<!-- javaTypeResolver式类型转换的信息 -->
<javaTypeResolver>
......@@ -64,8 +64,13 @@
<!--要生成的数据库表 -->
<!-- <table tableName="datasource"/>-->
<table tableName="panel_group"/>
<table tableName="dataset_table_incremental_config" >
<!--以下为添加内容 -->
<columnOverride column="incremental_delete" javaType="java.lang.String" jdbcType="VARCHAR" />
<columnOverride column="incremental_add" javaType="java.lang.String" jdbcType="VARCHAR" />
</table>
</context>
......
......@@ -671,7 +671,12 @@ export default {
add_sql_table: '添加SQL',
preview: '预览',
pls_input_name: '请输入名称',
connect_mode: '连接模式'
connect_mode: '连接模式',
incremental_update_type: '增量更新方式:',
incremental_add: '增量添加:',
incremental_delete: '增量删除:',
last_update_time: '上次更新时间:',
current_update_time: '当前更新时间:'
},
datasource: {
create: '新建数据连接',
......@@ -689,6 +694,7 @@ export default {
please_input_port: '请输入端口',
modify: '编辑数据连接',
validate_success: '校验成功',
validate: '校验',
delete: '删除组织',
delete_confirm: '删除该组织会关联删除该组织下的所有资源(如:相关工作空间,项目,测试用例等),确定要删除吗?',
input_name: '请输入名称',
......
......@@ -11,7 +11,7 @@
@search="search"
>
<template #buttons>
<fu-table-button icon="el-icon-circle-plus-outline" :label="$t('datasource.create')" @click="create" />
<fu-table-button v-permission="['datasource:add']" icon="el-icon-circle-plus-outline" :label="$t('datasource.create')" @click="create" />
</template>
<!-- <el-table-column type="selection" fix /> -->
......@@ -62,7 +62,7 @@
<el-form-item v-show="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.data_base')" prop="configuration.dataBase" :rules="{required: true, message: $t('datasource.please_input_data_base'), trigger: 'blur'}">
<el-input v-model="form.configuration.dataBase" autocomplete="off" />
</el-form-item>
<el-form-item v-show="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.user_name')" prop="configuration.username">
<el-form-item v-show="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.user_name')" prop="configuration.username" :rules="{required: true, message: $t('datasource.please_input_user_name'), trigger: 'blur'}">
<el-input v-model="form.configuration.username" autocomplete="off" />
</el-form-item>
<el-form-item v-show="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.password')" prop="configuration.password" :rules="{required: true, message: $t('datasource.please_input_password'), trigger: 'change'}">
......@@ -79,7 +79,8 @@
<div slot="footer" class="dialog-footer">
<el-button type="text" @click="dialogVisible = false">{{ $t('commons.cancel') }}</el-button>
<el-button type="primary" @click="saveDatasource('createDatasource')">确认</el-button>
<el-button type="primary" @click="validaDatasource('createDatasource')">{{ $t('commons.validate') }}</el-button>
<el-button type="primary" @click="saveDatasource('createDatasource')">{{ $t('commons.confirm') }}</el-button>
</div>
</el-dialog>
......@@ -198,13 +199,12 @@ export default {
this.$success(this.$t('commons.save_success'))
this.search()
this.dialogVisible = false
})
});
} else {
return false
}
})
},
validaDatasource(datasourceForm) {
this.$refs[datasourceForm].validate(valid => {
if (valid) {
......@@ -230,7 +230,6 @@ export default {
const result = {}
if (condition && condition.quick) {
for (const [key, value] of Object.entries(condition)) {
// console.log(`${key}`)
if (`${key}` === 'quick') {
const v_new = Object.assign({}, value)
v_new['field'] = 'name'
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论