提交 696549c4 authored 作者: taojinlong's avatar taojinlong

feat: 优化 oracle 增量更新

上级 620cb123
...@@ -43,4 +43,8 @@ public abstract class QueryProvider { ...@@ -43,4 +43,8 @@ public abstract class QueryProvider {
public abstract String getSQLSummaryAsTmp(String sql, List<ChartViewFieldDTO> yAxis, List<ChartCustomFilterDTO> customFilter, List<ChartExtFilterRequest> extFilterRequestList); public abstract String getSQLSummaryAsTmp(String sql, List<ChartViewFieldDTO> yAxis, List<ChartCustomFilterDTO> customFilter, List<ChartExtFilterRequest> extFilterRequestList);
public abstract String wrapSql(String sql); public abstract String wrapSql(String sql);
public abstract String createRawQuerySQL(String table, List<DatasetTableField> fields);
public abstract String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields);
} }
...@@ -348,6 +348,28 @@ public class DorisQueryProvider extends QueryProvider { ...@@ -348,6 +348,28 @@ public class DorisQueryProvider extends QueryProvider {
return tmpSql; return tmpSql;
} }
@Override
public String createRawQuerySQL(String table, List<DatasetTableField> fields){
String[] array = fields.stream().map(f -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("`").append(f.getOriginName()).append("` AS ").append(f.getDataeaseName());
return stringBuilder.toString();
}).toArray(String[]::new);
return MessageFormat.format("SELECT {0} FROM {1} ORDER BY null", StringUtils.join(array, ","), table);
}
@Override
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields);
}
private String sqlFix(String sql) {
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
return sql;
}
public String transMysqlFilterTerm(String term) { public String transMysqlFilterTerm(String term) {
switch (term) { switch (term) {
case "eq": case "eq":
......
...@@ -102,8 +102,6 @@ public class MysqlQueryProvider extends QueryProvider { ...@@ -102,8 +102,6 @@ public class MysqlQueryProvider extends QueryProvider {
} }
return stringBuilder.toString(); return stringBuilder.toString();
}).toArray(String[]::new); }).toArray(String[]::new);
// return MessageFormat.format("SELECT {0} FROM {1} ORDER BY " + (fields.size() > 0 ? fields.get(0).getOriginName() : "null"), StringUtils.join(array, ","), table);
return MessageFormat.format("SELECT {0} FROM {1} ORDER BY null", StringUtils.join(array, ","), table); return MessageFormat.format("SELECT {0} FROM {1} ORDER BY null", StringUtils.join(array, ","), table);
} }
...@@ -356,6 +354,21 @@ public class MysqlQueryProvider extends QueryProvider { ...@@ -356,6 +354,21 @@ public class MysqlQueryProvider extends QueryProvider {
return tmpSql; return tmpSql;
} }
@Override
public String createRawQuerySQL(String table, List<DatasetTableField> fields){
String[] array = fields.stream().map(f -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("`").append(f.getOriginName()).append("` AS ").append(f.getDataeaseName());
return stringBuilder.toString();
}).toArray(String[]::new);
return MessageFormat.format("SELECT {0} FROM {1} ORDER BY null", StringUtils.join(array, ","), table);
}
@Override
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields);
}
public String transMysqlFilterTerm(String term) { public String transMysqlFilterTerm(String term) {
switch (term) { switch (term) {
case "eq": case "eq":
......
...@@ -380,6 +380,31 @@ public class OracleQueryProvider extends QueryProvider { ...@@ -380,6 +380,31 @@ public class OracleQueryProvider extends QueryProvider {
return getSQLSummary(" (" + sqlFix(sql) + ") tmp ", yAxis, customFilter, extFilterRequestList); return getSQLSummary(" (" + sqlFix(sql) + ") tmp ", yAxis, customFilter, extFilterRequestList);
} }
@Override
public String wrapSql(String sql) {
sql = sql.trim();
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
String tmpSql = "SELECT * FROM (" + sql + ") tmp " + " where rownum <= 0";
return tmpSql;
}
@Override
public String createRawQuerySQL(String table, List<DatasetTableField> fields){
String[] array = fields.stream().map(f -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(" ").append(f.getOriginName()).append(" AS ").append(f.getDataeaseName());
return stringBuilder.toString();
}).toArray(String[]::new);
return MessageFormat.format("SELECT {0} FROM {1} ORDER BY null", StringUtils.join(array, ","), table);
}
@Override
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
return createRawQuerySQL(" (" + sqlFix(sql) + ") tmp ", fields);
}
public String transMysqlFilterTerm(String term) { public String transMysqlFilterTerm(String term) {
switch (term) { switch (term) {
case "eq": case "eq":
...@@ -490,16 +515,6 @@ public class OracleQueryProvider extends QueryProvider { ...@@ -490,16 +515,6 @@ public class OracleQueryProvider extends QueryProvider {
return sql; return sql;
} }
@Override
public String wrapSql(String sql) {
sql = sql.trim();
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
String tmpSql = "SELECT * FROM (" + sql + ") tmp " + " where rownum <= 0";
return tmpSql;
}
private String transDateFormat(String dateStyle, String datePattern) { private String transDateFormat(String dateStyle, String datePattern) {
String split = "-"; String split = "-";
if (StringUtils.equalsIgnoreCase(datePattern, "date_sub")) { if (StringUtils.equalsIgnoreCase(datePattern, "date_sub")) {
......
...@@ -270,6 +270,21 @@ public class SqlserverQueryProvider extends QueryProvider { ...@@ -270,6 +270,21 @@ public class SqlserverQueryProvider extends QueryProvider {
return tmpSql; return tmpSql;
} }
@Override
public String createRawQuerySQL(String table, List<DatasetTableField> fields){
String[] array = fields.stream().map(f -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("`").append(f.getOriginName()).append("` AS ").append(f.getDataeaseName());
return stringBuilder.toString();
}).toArray(String[]::new);
return MessageFormat.format("SELECT {0} FROM {1} ORDER BY null", StringUtils.join(array, ","), table);
}
@Override
public String createRawQuerySQLAsTmp(String sql, List<DatasetTableField> fields) {
return createRawQuerySQL(" (" + sqlFix(sql) + ") AS tmp ", fields);
}
public String transMysqlFilterTerm(String term) { public String transMysqlFilterTerm(String term) {
switch (term) { switch (term) {
case "eq": case "eq":
......
...@@ -206,13 +206,13 @@ public class ExtractDataService { ...@@ -206,13 +206,13 @@ public class ExtractDataService {
extractData(datasetTable, "all_scope"); extractData(datasetTable, "all_scope");
replaceTable(DorisTableUtils.dorisName(datasetTableId)); replaceTable(DorisTableUtils.dorisName(datasetTableId));
saveSucessLog(datasetTableTaskLog); saveSucessLog(datasetTableTaskLog);
deleteFile("all_scope", datasetTableId); // deleteFile("all_scope", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
}catch (Exception e){ }catch (Exception e){
saveErrorLog(datasetTableId, taskId, e); saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null); updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId))); dropDorisTable(DorisTableUtils.dorisTmpName(DorisTableUtils.dorisName(datasetTableId)));
deleteFile("all_scope", datasetTableId); // deleteFile("all_scope", datasetTableId);
}finally { }finally {
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString()); datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
...@@ -267,15 +267,15 @@ public class ExtractDataService { ...@@ -267,15 +267,15 @@ public class ExtractDataService {
extractData(datasetTable, "incremental_delete"); extractData(datasetTable, "incremental_delete");
} }
saveSucessLog(datasetTableTaskLog); saveSucessLog(datasetTableTaskLog);
deleteFile("incremental_add", datasetTableId); // deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId); // deleteFile("incremental_delete", datasetTableId);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime); updateTableStatus(datasetTableId, datasetTable, JobStatus.Completed, execTime);
} }
}catch (Exception e){ }catch (Exception e){
saveErrorLog(datasetTableId, taskId, e); saveErrorLog(datasetTableId, taskId, e);
updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null); updateTableStatus(datasetTableId, datasetTable, JobStatus.Error, null);
deleteFile("incremental_add", datasetTableId); // deleteFile("incremental_add", datasetTableId);
deleteFile("incremental_delete", datasetTableId); // deleteFile("incremental_delete", datasetTableId);
}finally { }finally {
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) { if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString()); datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
...@@ -610,15 +610,7 @@ public class ExtractDataService { ...@@ -610,15 +610,7 @@ public class ExtractDataService {
dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword()); dataMeta = new DatabaseMeta("db", "MYSQL", "Native", mysqlConfigration.getHost(), mysqlConfigration.getDataBase(), mysqlConfigration.getPort().toString(), mysqlConfigration.getUsername(), mysqlConfigration.getPassword());
dataMeta.addExtraOption("MYSQL","characterEncoding", "UTF-8"); dataMeta.addExtraOption("MYSQL","characterEncoding", "UTF-8");
transMeta.addDatabase(dataMeta); transMeta.addDatabase(dataMeta);
if (extractType.equalsIgnoreCase("all_scope")) { selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
if(datasetTable.getType().equalsIgnoreCase("sql")){
selectSQL = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql();
}else {
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
selectSQL = qp.createQuerySQL(tableName, datasetTableFields);
}
}
inputStep = inputStep(transMeta, selectSQL); inputStep = inputStep(transMeta, selectSQL);
udjcStep = udjc(datasetTableFields, false); udjcStep = udjc(datasetTableFields, false);
break; break;
...@@ -626,15 +618,7 @@ public class ExtractDataService { ...@@ -626,15 +618,7 @@ public class ExtractDataService {
SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasource.getConfiguration(), SqlServerConfigration.class); SqlServerConfigration sqlServerConfigration = new Gson().fromJson(datasource.getConfiguration(), SqlServerConfigration.class);
dataMeta = new DatabaseMeta("db", "MSSQLNATIVE", "Native", sqlServerConfigration.getHost(), sqlServerConfigration.getDataBase(), sqlServerConfigration.getPort().toString(), sqlServerConfigration.getUsername(), sqlServerConfigration.getPassword()); dataMeta = new DatabaseMeta("db", "MSSQLNATIVE", "Native", sqlServerConfigration.getHost(), sqlServerConfigration.getDataBase(), sqlServerConfigration.getPort().toString(), sqlServerConfigration.getUsername(), sqlServerConfigration.getPassword());
transMeta.addDatabase(dataMeta); transMeta.addDatabase(dataMeta);
if (extractType.equalsIgnoreCase("all_scope")) { selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
if(datasetTable.getType().equalsIgnoreCase("sql")){
selectSQL = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql();
}else {
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
selectSQL = qp.createQuerySQL(tableName, datasetTableFields);
}
}
inputStep = inputStep(transMeta, selectSQL); inputStep = inputStep(transMeta, selectSQL);
udjcStep = udjc(datasetTableFields, false); udjcStep = udjc(datasetTableFields, false);
break; break;
...@@ -647,15 +631,8 @@ public class ExtractDataService { ...@@ -647,15 +631,8 @@ public class ExtractDataService {
dataMeta = new DatabaseMeta("db", "ORACLE", "Native", oracleConfigration.getHost(), oracleConfigration.getDataBase(), oracleConfigration.getPort().toString(), oracleConfigration.getUsername(), oracleConfigration.getPassword()); dataMeta = new DatabaseMeta("db", "ORACLE", "Native", oracleConfigration.getHost(), oracleConfigration.getDataBase(), oracleConfigration.getPort().toString(), oracleConfigration.getUsername(), oracleConfigration.getPassword());
} }
transMeta.addDatabase(dataMeta); transMeta.addDatabase(dataMeta);
if (extractType.equalsIgnoreCase("all_scope")) {
if(datasetTable.getType().equalsIgnoreCase("sql")){ selectSQL = getSelectSQL(extractType, datasetTable, datasource, datasetTableFields, selectSQL);
selectSQL = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql();
}else {
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
selectSQL = qp.createQuerySQL(tableName, datasetTableFields);
}
}
inputStep = inputStep(transMeta, selectSQL); inputStep = inputStep(transMeta, selectSQL);
udjcStep = udjc(datasetTableFields, false); udjcStep = udjc(datasetTableFields, false);
break; break;
...@@ -702,6 +679,25 @@ public class ExtractDataService { ...@@ -702,6 +679,25 @@ public class ExtractDataService {
FileUtils.writeStringToFile(file, transXml, "UTF-8"); FileUtils.writeStringToFile(file, transXml, "UTF-8");
} }
private String getSelectSQL(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) {
if (extractType.equalsIgnoreCase("all_scope") && datasetTable.getType().equalsIgnoreCase("db")) {
String tableName = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
selectSQL = qp.createRawQuerySQL(tableName, datasetTableFields);
}
if(extractType.equalsIgnoreCase("all_scope") && datasetTable.getType().equalsIgnoreCase("sql")){
selectSQL = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getSql();
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
}
if(!extractType.equalsIgnoreCase("all_scope")){
QueryProvider qp = ProviderFactory.getQueryProvider(datasource.getType());
selectSQL = qp.createRawQuerySQLAsTmp(selectSQL, datasetTableFields);
}
return selectSQL;
}
private StepMeta inputStep(TransMeta transMeta, String selectSQL) { private StepMeta inputStep(TransMeta transMeta, String selectSQL) {
TableInputMeta tableInput = new TableInputMeta(); TableInputMeta tableInput = new TableInputMeta();
DatabaseMeta database = transMeta.findDatabase("db"); DatabaseMeta database = transMeta.findDatabase("db");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论