提交 260d09a1 authored 作者: taojinlong's avatar taojinlong

feat: 抽取 excel 数据

上级 ac938c28
...@@ -16,6 +16,8 @@ public class DatasetTableField implements Serializable { ...@@ -16,6 +16,8 @@ public class DatasetTableField implements Serializable {
private String name; private String name;
private String dataeaseName;
private String type; private String type;
private Integer size; private Integer size;
......
...@@ -384,6 +384,76 @@ public class DatasetTableFieldExample { ...@@ -384,6 +384,76 @@ public class DatasetTableFieldExample {
return (Criteria) this; return (Criteria) this;
} }
public Criteria andDataeaseNameIsNull() {
addCriterion("dataease_name is null");
return (Criteria) this;
}
public Criteria andDataeaseNameIsNotNull() {
addCriterion("dataease_name is not null");
return (Criteria) this;
}
public Criteria andDataeaseNameEqualTo(String value) {
addCriterion("dataease_name =", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameNotEqualTo(String value) {
addCriterion("dataease_name <>", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameGreaterThan(String value) {
addCriterion("dataease_name >", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameGreaterThanOrEqualTo(String value) {
addCriterion("dataease_name >=", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameLessThan(String value) {
addCriterion("dataease_name <", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameLessThanOrEqualTo(String value) {
addCriterion("dataease_name <=", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameLike(String value) {
addCriterion("dataease_name like", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameNotLike(String value) {
addCriterion("dataease_name not like", value, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameIn(List<String> values) {
addCriterion("dataease_name in", values, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameNotIn(List<String> values) {
addCriterion("dataease_name not in", values, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameBetween(String value1, String value2) {
addCriterion("dataease_name between", value1, value2, "dataeaseName");
return (Criteria) this;
}
public Criteria andDataeaseNameNotBetween(String value1, String value2) {
addCriterion("dataease_name not between", value1, value2, "dataeaseName");
return (Criteria) this;
}
public Criteria andTypeIsNull() { public Criteria andTypeIsNull() {
addCriterion("`type` is null"); addCriterion("`type` is null");
return (Criteria) this; return (Criteria) this;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
<result column="table_id" jdbcType="VARCHAR" property="tableId" /> <result column="table_id" jdbcType="VARCHAR" property="tableId" />
<result column="origin_name" jdbcType="VARCHAR" property="originName" /> <result column="origin_name" jdbcType="VARCHAR" property="originName" />
<result column="name" jdbcType="VARCHAR" property="name" /> <result column="name" jdbcType="VARCHAR" property="name" />
<result column="dataease_name" jdbcType="VARCHAR" property="dataeaseName" />
<result column="type" jdbcType="VARCHAR" property="type" /> <result column="type" jdbcType="VARCHAR" property="type" />
<result column="size" jdbcType="INTEGER" property="size" /> <result column="size" jdbcType="INTEGER" property="size" />
<result column="de_type" jdbcType="INTEGER" property="deType" /> <result column="de_type" jdbcType="INTEGER" property="deType" />
...@@ -72,8 +73,8 @@ ...@@ -72,8 +73,8 @@
</where> </where>
</sql> </sql>
<sql id="Base_Column_List"> <sql id="Base_Column_List">
id, table_id, origin_name, `name`, `type`, `size`, de_type, `checked`, column_index, id, table_id, origin_name, `name`, dataease_name, `type`, `size`, de_type, `checked`,
last_sync_time column_index, last_sync_time
</sql> </sql>
<select id="selectByExample" parameterType="io.dataease.base.domain.DatasetTableFieldExample" resultMap="BaseResultMap"> <select id="selectByExample" parameterType="io.dataease.base.domain.DatasetTableFieldExample" resultMap="BaseResultMap">
select select
...@@ -107,13 +108,13 @@ ...@@ -107,13 +108,13 @@
</delete> </delete>
<insert id="insert" parameterType="io.dataease.base.domain.DatasetTableField"> <insert id="insert" parameterType="io.dataease.base.domain.DatasetTableField">
insert into dataset_table_field (id, table_id, origin_name, insert into dataset_table_field (id, table_id, origin_name,
`name`, `type`, `size`, de_type, `name`, dataease_name, `type`,
`checked`, column_index, last_sync_time `size`, de_type, `checked`,
) column_index, last_sync_time)
values (#{id,jdbcType=VARCHAR}, #{tableId,jdbcType=VARCHAR}, #{originName,jdbcType=VARCHAR}, values (#{id,jdbcType=VARCHAR}, #{tableId,jdbcType=VARCHAR}, #{originName,jdbcType=VARCHAR},
#{name,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{size,jdbcType=INTEGER}, #{deType,jdbcType=INTEGER}, #{name,jdbcType=VARCHAR}, #{dataeaseName,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
#{checked,jdbcType=BIT}, #{columnIndex,jdbcType=INTEGER}, #{lastSyncTime,jdbcType=BIGINT} #{size,jdbcType=INTEGER}, #{deType,jdbcType=INTEGER}, #{checked,jdbcType=BIT},
) #{columnIndex,jdbcType=INTEGER}, #{lastSyncTime,jdbcType=BIGINT})
</insert> </insert>
<insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTableField"> <insert id="insertSelective" parameterType="io.dataease.base.domain.DatasetTableField">
insert into dataset_table_field insert into dataset_table_field
...@@ -130,6 +131,9 @@ ...@@ -130,6 +131,9 @@
<if test="name != null"> <if test="name != null">
`name`, `name`,
</if> </if>
<if test="dataeaseName != null">
dataease_name,
</if>
<if test="type != null"> <if test="type != null">
`type`, `type`,
</if> </if>
...@@ -162,6 +166,9 @@ ...@@ -162,6 +166,9 @@
<if test="name != null"> <if test="name != null">
#{name,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR},
</if> </if>
<if test="dataeaseName != null">
#{dataeaseName,jdbcType=VARCHAR},
</if>
<if test="type != null"> <if test="type != null">
#{type,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR},
</if> </if>
...@@ -203,6 +210,9 @@ ...@@ -203,6 +210,9 @@
<if test="record.name != null"> <if test="record.name != null">
`name` = #{record.name,jdbcType=VARCHAR}, `name` = #{record.name,jdbcType=VARCHAR},
</if> </if>
<if test="record.dataeaseName != null">
dataease_name = #{record.dataeaseName,jdbcType=VARCHAR},
</if>
<if test="record.type != null"> <if test="record.type != null">
`type` = #{record.type,jdbcType=VARCHAR}, `type` = #{record.type,jdbcType=VARCHAR},
</if> </if>
...@@ -232,6 +242,7 @@ ...@@ -232,6 +242,7 @@
table_id = #{record.tableId,jdbcType=VARCHAR}, table_id = #{record.tableId,jdbcType=VARCHAR},
origin_name = #{record.originName,jdbcType=VARCHAR}, origin_name = #{record.originName,jdbcType=VARCHAR},
`name` = #{record.name,jdbcType=VARCHAR}, `name` = #{record.name,jdbcType=VARCHAR},
dataease_name = #{record.dataeaseName,jdbcType=VARCHAR},
`type` = #{record.type,jdbcType=VARCHAR}, `type` = #{record.type,jdbcType=VARCHAR},
`size` = #{record.size,jdbcType=INTEGER}, `size` = #{record.size,jdbcType=INTEGER},
de_type = #{record.deType,jdbcType=INTEGER}, de_type = #{record.deType,jdbcType=INTEGER},
...@@ -254,6 +265,9 @@ ...@@ -254,6 +265,9 @@
<if test="name != null"> <if test="name != null">
`name` = #{name,jdbcType=VARCHAR}, `name` = #{name,jdbcType=VARCHAR},
</if> </if>
<if test="dataeaseName != null">
dataease_name = #{dataeaseName,jdbcType=VARCHAR},
</if>
<if test="type != null"> <if test="type != null">
`type` = #{type,jdbcType=VARCHAR}, `type` = #{type,jdbcType=VARCHAR},
</if> </if>
...@@ -280,6 +294,7 @@ ...@@ -280,6 +294,7 @@
set table_id = #{tableId,jdbcType=VARCHAR}, set table_id = #{tableId,jdbcType=VARCHAR},
origin_name = #{originName,jdbcType=VARCHAR}, origin_name = #{originName,jdbcType=VARCHAR},
`name` = #{name,jdbcType=VARCHAR}, `name` = #{name,jdbcType=VARCHAR},
dataease_name = #{dataeaseName,jdbcType=VARCHAR},
`type` = #{type,jdbcType=VARCHAR}, `type` = #{type,jdbcType=VARCHAR},
`size` = #{size,jdbcType=INTEGER}, `size` = #{size,jdbcType=INTEGER},
de_type = #{deType,jdbcType=INTEGER}, de_type = #{deType,jdbcType=INTEGER},
......
package io.dataease.commons.utils;
import java.util.List;
import java.io.File;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Md5Utils {
private static final char[] HEX_DIGITS = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
private static final String UTF_8 = "UTF-8";
public static String md5(String src) {
return md5(src, UTF_8);
}
public static String md5(String src, String charset) {
try {
byte[] strTemp = charset == null || charset.equals("") ? src.getBytes() : src.getBytes(charset);
MessageDigest mdTemp = MessageDigest.getInstance("MD5");
mdTemp.update(strTemp);
byte[] md = mdTemp.digest();
int j = md.length;
char[] str = new char[j * 2];
int k = 0;
for (byte byte0 : md) {
str[k++] = HEX_DIGITS[byte0 >>> 4 & 0xf];
str[k++] = HEX_DIGITS[byte0 & 0xf];
}
return new String(str);
} catch (Exception e) {
throw new RuntimeException("MD5 encrypt error:", e);
}
}
}
package io.dataease.datasource.constants; package io.dataease.datasource.constants;
public enum DatasourceTypes { public enum DatasourceTypes {
mysql, sqlServer mysql, sqlServer, excel
} }
...@@ -8,6 +8,8 @@ import io.dataease.base.mapper.DatasetTableMapper; ...@@ -8,6 +8,8 @@ import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.base.mapper.DatasourceMapper; import io.dataease.base.mapper.DatasourceMapper;
import io.dataease.commons.utils.AuthUtils; import io.dataease.commons.utils.AuthUtils;
import io.dataease.commons.utils.BeanUtils; import io.dataease.commons.utils.BeanUtils;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.commons.utils.Md5Utils;
import io.dataease.controller.request.dataset.DataSetTableRequest; import io.dataease.controller.request.dataset.DataSetTableRequest;
import io.dataease.datasource.constants.DatasourceTypes; import io.dataease.datasource.constants.DatasourceTypes;
import io.dataease.datasource.dto.TableFiled; import io.dataease.datasource.dto.TableFiled;
...@@ -54,6 +56,10 @@ public class DataSetTableService { ...@@ -54,6 +56,10 @@ public class DataSetTableService {
@Resource @Resource
private DataSetTableTaskService dataSetTableTaskService; private DataSetTableTaskService dataSetTableTaskService;
@Resource @Resource
private CommonThreadPool commonThreadPool;
@Resource
private ExtractDataService extractDataService;
@Resource
private DatasetTableIncrementalConfigMapper datasetTableIncrementalConfigMapper; private DatasetTableIncrementalConfigMapper datasetTableIncrementalConfigMapper;
@Value("${upload.file.path}") @Value("${upload.file.path}")
private String path; private String path;
...@@ -79,6 +85,9 @@ public class DataSetTableService { ...@@ -79,6 +85,9 @@ public class DataSetTableService {
// 添加表成功后,获取当前表字段和类型,抽象到dataease数据库 // 添加表成功后,获取当前表字段和类型,抽象到dataease数据库
if (insert == 1) { if (insert == 1) {
saveTableField(datasetTable); saveTableField(datasetTable);
commonThreadPool.addTask(()->{
extractDataService.extractData(datasetTable.getId(), null, "all_scope");
});
} }
} else { } else {
int update = datasetTableMapper.updateByPrimaryKeySelective(datasetTable); int update = datasetTableMapper.updateByPrimaryKeySelective(datasetTable);
...@@ -330,6 +339,11 @@ public class DataSetTableService { ...@@ -330,6 +339,11 @@ public class DataSetTableService {
datasetTableField.setTableId(datasetTable.getId()); datasetTableField.setTableId(datasetTable.getId());
datasetTableField.setOriginName(filed.getFieldName()); datasetTableField.setOriginName(filed.getFieldName());
datasetTableField.setName(filed.getRemarks()); datasetTableField.setName(filed.getRemarks());
if(StringUtils.equalsIgnoreCase(datasetTable.getType(), "excel")){
datasetTableField.setDataeaseName("C_" + Md5Utils.md5(filed.getFieldName()));
}else {
datasetTableField.setDataeaseName(filed.getFieldName());
}
datasetTableField.setType(filed.getFieldType()); datasetTableField.setType(filed.getFieldType());
if (ObjectUtils.isEmpty(ds)) { if (ObjectUtils.isEmpty(ds)) {
datasetTableField.setDeType(transFieldType(filed.getFieldType())); datasetTableField.setDeType(transFieldType(filed.getFieldType()));
...@@ -560,6 +574,7 @@ public class DataSetTableService { ...@@ -560,6 +574,7 @@ public class DataSetTableService {
tableFiled.setFieldName(readCell(row.getCell(j))); tableFiled.setFieldName(readCell(row.getCell(j)));
tableFiled.setRemarks(readCell(row.getCell(j))); tableFiled.setRemarks(readCell(row.getCell(j)));
tableFiled.setFieldType("TEXT"); tableFiled.setFieldType("TEXT");
tableFiled.setFieldSize(1024);
fields.add(tableFiled); fields.add(tableFiled);
} else { } else {
r[j] = readCell(row.getCell(j)); r[j] = readCell(row.getCell(j));
......
...@@ -39,6 +39,9 @@ import org.pentaho.di.repository.filerep.KettleFileRepository; ...@@ -39,6 +39,9 @@ import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.excelinput.ExcelInputField;
import org.pentaho.di.trans.steps.excelinput.ExcelInputMeta;
import org.pentaho.di.trans.steps.excelinput.SpreadSheetType;
import org.pentaho.di.trans.steps.sql.ExecSQLMeta; import org.pentaho.di.trans.steps.sql.ExecSQLMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.pentaho.di.trans.steps.textfileoutput.TextFileField; import org.pentaho.di.trans.steps.textfileoutput.TextFileField;
...@@ -87,7 +90,7 @@ public class ExtractDataService { ...@@ -87,7 +90,7 @@ public class ExtractDataService {
@Value("${carte.passwd:cluster}") @Value("${carte.passwd:cluster}")
private String passwd; private String passwd;
private static String creatTableSql = "CREATE TABLE IF NOT EXISTS TABLE_NAME" + private static String creatTableSql = "CREATE TABLE IF NOT EXISTS `TABLE_NAME`" +
"Column_Fields" + "Column_Fields" +
"UNIQUE KEY(dataease_uuid)\n" + "UNIQUE KEY(dataease_uuid)\n" +
"DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" + "DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" +
...@@ -97,28 +100,28 @@ public class ExtractDataService { ...@@ -97,28 +100,28 @@ public class ExtractDataService {
"rm -rf %s\n"; "rm -rf %s\n";
private String createDorisTablColumnSql( List<DatasetTableField> datasetTableFields){ private String createDorisTablColumnSql( List<DatasetTableField> datasetTableFields){
String Column_Fields = "dataease_uuid varchar(50),"; String Column_Fields = "dataease_uuid varchar(50), `";
for (DatasetTableField datasetTableField : datasetTableFields) { for (DatasetTableField datasetTableField : datasetTableFields) {
Column_Fields = Column_Fields + datasetTableField.getOriginName() + " "; Column_Fields = Column_Fields + datasetTableField.getDataeaseName() + "` ";
switch (datasetTableField.getDeType()){ switch (datasetTableField.getDeType()){
case 0: case 0:
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",`";
break; break;
case 1: case 1:
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",`";
break; break;
case 2: case 2:
Column_Fields = Column_Fields + "bigint(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; Column_Fields = Column_Fields + "bigint(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",`";
break; break;
case 3: case 3:
Column_Fields = Column_Fields + "DOUBLE" + ","; Column_Fields = Column_Fields + "DOUBLE" + ",`";
break; break;
default: default:
Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ","; Column_Fields = Column_Fields + "varchar(lenth)".replace("lenth", String.valueOf(datasetTableField.getSize())) + ",";
break; break;
} }
} }
Column_Fields = Column_Fields.substring(0, Column_Fields.length() -1 ); Column_Fields = Column_Fields.substring(0, Column_Fields.length() -2);
Column_Fields = "(" + Column_Fields + ")\n"; Column_Fields = "(" + Column_Fields + ")\n";
return Column_Fields; return Column_Fields;
} }
...@@ -129,7 +132,6 @@ public class ExtractDataService { ...@@ -129,7 +132,6 @@ public class ExtractDataService {
DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(dorisDatasource); datasourceRequest.setDatasource(dorisDatasource);
datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql)); datasourceRequest.setQuery(creatTableSql.replace("TABLE_NAME", dorisTableName).replace("Column_Fields", dorisTablColumnSql));
System.out.println(datasourceRequest.getQuery());
jdbcProvider.exec(datasourceRequest); jdbcProvider.exec(datasourceRequest);
} }
...@@ -145,21 +147,30 @@ public class ExtractDataService { ...@@ -145,21 +147,30 @@ public class ExtractDataService {
public void extractData(String datasetTableId, String taskId, String type) { public void extractData(String datasetTableId, String taskId, String type) {
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
UpdateType updateType = UpdateType.valueOf(type); UpdateType updateType = UpdateType.valueOf(type);
DatasetTable datasetTable = null;
Datasource datasource = new Datasource();
try { try {
DatasetTable datasetTable = dataSetTableService.get(datasetTableId); datasetTable = dataSetTableService.get(datasetTableId);
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); if(StringUtils.isNotEmpty(datasetTable.getDataSourceId())){
datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
}else {
datasource.setType(datasetTable.getType());
}
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
datasetTableFields.sort((o1, o2) -> { datasetTableFields.sort((o1, o2) -> {
if (o1.getOriginName() == null) { if (o1.getColumnIndex() == null) {
return -1; return -1;
} }
if (o2.getOriginName() == null) { if (o2.getColumnIndex() == null) {
return 1; return 1;
} }
return o1.getOriginName().compareTo(o2.getOriginName()); return o1.getColumnIndex().compareTo(o2.getColumnIndex());
}); });
for (DatasetTableField datasetTableField : datasetTableFields) {
System.out.println(new Gson().toJson(datasetTableField));
}
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields); String dorisTablColumnSql = createDorisTablColumnSql(datasetTableFields);
switch (updateType) { switch (updateType) {
// 全量更新 // 全量更新
...@@ -168,8 +179,8 @@ public class ExtractDataService { ...@@ -168,8 +179,8 @@ public class ExtractDataService {
// TODO before: check doris table column type // TODO before: check doris table column type
createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql); createDorisTable(DorisTableUtils.dorisName(datasetTableId), dorisTablColumnSql);
createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql); createDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)), dorisTablColumnSql);
generateTransFile("all_scope", datasetTable, datasource, tableName, datasetTableFields, null); generateTransFile("all_scope", datasetTable, datasource, datasetTableFields, null);
generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList()))); generateJobFile("all_scope", datasetTable, String.join(",", datasetTableFields.stream().map(DatasetTableField::getDataeaseName).collect(Collectors.toList())));
extractData(datasetTable, "all_scope"); extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId)); replaceTable(DorisTableUtils.dorisName(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
...@@ -196,9 +207,7 @@ public class ExtractDataService { ...@@ -196,9 +207,7 @@ public class ExtractDataService {
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
//TODO sql column generateTransFile("incremental_add", datasetTable, datasource, datasetTableFields, sql);
generateTransFile("incremental_add", datasetTable, datasource, tableName, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource)); generateJobFile("incremental_add", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_add"); extractData(datasetTable, "incremental_add");
} }
...@@ -207,9 +216,7 @@ public class ExtractDataService { ...@@ -207,9 +216,7 @@ public class ExtractDataService {
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) { if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
//TODO sql column generateTransFile("incremental_delete", datasetTable, datasource, datasetTableFields, sql);
;
generateTransFile("incremental_delete", datasetTable, datasource, tableName, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource)); generateJobFile("incremental_delete", datasetTable, fetchSqlField(sql, datasource));
extractData(datasetTable, "incremental_delete"); extractData(datasetTable, "incremental_delete");
} }
...@@ -395,37 +402,40 @@ public class ExtractDataService { ...@@ -395,37 +402,40 @@ public class ExtractDataService {
return String.join(",", datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).collect(Collectors.toList())); return String.join(",", datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).collect(Collectors.toList()));
} }
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception { private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
TransMeta transMeta = new TransMeta(); TransMeta transMeta = new TransMeta();
String dorisOutputTable = null; String dorisOutputTable = null;
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType()); DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
DatabaseMeta dataMeta = null; DatabaseMeta dataMeta = null;
StepMeta inputStep = null;
StepMeta outputStep = null;
StepMeta udjcStep = null;
TransHopMeta hi1 = null;
TransHopMeta hi2 = null;
String transName = null;
switch (datasourceType) { switch (datasourceType) {
case mysql: case mysql:
MysqlConfigration mysqlConfigration = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigration.class); MysqlConfigration mysqlConfigration = new Gson().fromJson(datasource.getConfiguration(), MysqlConfigration.class);
dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword()); dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword());
transMeta.addDatabase(dataMeta); transMeta.addDatabase(dataMeta);
if(extractType.equalsIgnoreCase("all_scope")){
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), tableName, datasetTableFields.stream().map(DatasetTableField::getDataeaseName).toArray(String[]::new));
}
inputStep = inputStep(transMeta, selectSQL);
break; break;
case excel:
String filePath = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getData();
inputStep = excelInputStep(filePath, datasetTableFields);
default: default:
break; break;
} }
Datasource dorisDatasource = (Datasource)CommonBeanFactory.getBean("DorisDatasource");
MysqlConfigration dorisConfigration = new Gson().fromJson(dorisDatasource.getConfiguration(), MysqlConfigration.class);
DatabaseMeta dorisDataMeta = new DatabaseMeta("doris", "MYSQL", "Native", dorisConfigration.getHost(), dorisConfigration.getDataBase(), dorisConfigration.getPort().toString(), dorisConfigration.getUsername(), dorisConfigration.getPassword());
transMeta.addDatabase(dorisDataMeta);
StepMeta inputStep = null;
StepMeta outputStep = null;
StepMeta udjcStep = null;
TransHopMeta hi1 = null;
TransHopMeta hi2 = null;
String transName = null;
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
transName = "trans_" + datasetTable.getId(); transName = "trans_" + datasetTable.getId();
dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId())); dorisOutputTable = DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTable.getId()));
selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
transMeta.setName(transName); transMeta.setName(transName);
break; break;
case "incremental_add": case "incremental_add":
...@@ -442,7 +452,6 @@ public class ExtractDataService { ...@@ -442,7 +452,6 @@ public class ExtractDataService {
break; break;
} }
inputStep = inputStep(transMeta, selectSQL);
udjcStep = udjc(datasetTableFields); udjcStep = udjc(datasetTableFields);
outputStep = outputStep(dorisOutputTable); outputStep = outputStep(dorisOutputTable);
hi1 = new TransHopMeta(inputStep, udjcStep); hi1 = new TransHopMeta(inputStep, udjcStep);
...@@ -469,6 +478,27 @@ public class ExtractDataService { ...@@ -469,6 +478,27 @@ public class ExtractDataService {
return fromStep; return fromStep;
} }
private StepMeta excelInputStep(String filePath, List<DatasetTableField> datasetTableFields){
ExcelInputMeta excelInputMeta = new ExcelInputMeta();
excelInputMeta.setSpreadSheetType(SpreadSheetType.SAX_POI);
excelInputMeta.setPassword("Encrypted");
excelInputMeta.setFileName(new String[]{filePath});
excelInputMeta.setStartsWithHeader(true);
ExcelInputField[] fields = new ExcelInputField[datasetTableFields.size()];
for (int i=0; i<datasetTableFields.size();i++) {
ExcelInputField field = new ExcelInputField();
field.setName(datasetTableFields.get(i).getOriginName());
field.setType("String");
fields[i] = field;
}
excelInputMeta.setField(fields);
StepMeta fromStep = new StepMeta("ExcelInput", "Data Input", excelInputMeta);
fromStep.setDraw(true);
fromStep.setLocation(100, 100);
return fromStep;
}
private StepMeta outputStep(String dorisOutputTable){ private StepMeta outputStep(String dorisOutputTable){
TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta(); TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta();
textFileOutputMeta.setEncoding("UTF-8"); textFileOutputMeta.setEncoding("UTF-8");
...@@ -502,20 +532,6 @@ public class ExtractDataService { ...@@ -502,20 +532,6 @@ public class ExtractDataService {
return userDefinedJavaClassStep; return userDefinedJavaClassStep;
} }
private StepMeta execSqlStep(TransMeta transMeta, String dorisOutputTable, List<DatasetTableField>datasetTableFields){
ExecSQLMeta execSQLMeta = new ExecSQLMeta();
DatabaseMeta dorisDatabaseMeta = transMeta.findDatabase("doris");
execSQLMeta.setDatabaseMeta(dorisDatabaseMeta);
String deleteSql = "delete from DORIS_TABLE where dataease_uuid='?';".replace("DORIS_TABLE", dorisOutputTable);
execSQLMeta.setSql(deleteSql);
execSQLMeta.setExecutedEachInputRow(true);
execSQLMeta.setArguments(new String[]{"dataease_uuid"});
StepMeta execSQLStep = new StepMeta("ExecSQL", "ExecSQL", execSQLMeta);
execSQLStep.setLocation(600, 100);
execSQLStep.setDraw(true);
return execSQLStep;
}
private static String code = "import org.pentaho.di.core.row.ValueMetaInterface;\n" + private static String code = "import org.pentaho.di.core.row.ValueMetaInterface;\n" +
"import java.util.List;\n" + "import java.util.List;\n" +
"import java.io.File;\n" + "import java.io.File;\n" +
......
ALTER TABLE `dataset_table_field` ADD COLUMN `dataease_name` varchar(255) NOT NULL COMMENT '字段名' AFTER `name`;
ALTER TABLE dataset_table_task_log CHANGE COLUMN `task_id` `task_id` VARCHAR(50) NULL COMMENT '任务ID' ;
\ No newline at end of file
...@@ -67,7 +67,7 @@ ...@@ -67,7 +67,7 @@
<!-- <table tableName="datasource"/>--> <!-- <table tableName="datasource"/>-->
<!-- <table tableName="sys_dict"/>--> <!-- <table tableName="sys_dict"/>-->
<!-- <table tableName="sys_dict_item"/>--> <!-- <table tableName="sys_dict_item"/>-->
<table tableName="chart_view"/> <table tableName="dataset_table_field"/>
<!-- <table tableName="panel_design"/>--> <!-- <table tableName="panel_design"/>-->
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论