Unverified 提交 e6089dea authored 作者: taojinlong's avatar taojinlong 提交者: GitHub

Merge pull request #128 from dataease/pr@dev@oracle

feat: oracle 增量更新
...@@ -41,4 +41,6 @@ public abstract class QueryProvider { ...@@ -41,4 +41,6 @@ public abstract class QueryProvider {
public abstract String getSQLSummary(String table, List<ChartViewFieldDTO> yAxis, List<ChartCustomFilterDTO> customFilter, List<ChartExtFilterRequest> extFilterRequestList); public abstract String getSQLSummary(String table, List<ChartViewFieldDTO> yAxis, List<ChartCustomFilterDTO> customFilter, List<ChartExtFilterRequest> extFilterRequestList);
public abstract String getSQLSummaryAsTmp(String sql, List<ChartViewFieldDTO> yAxis, List<ChartCustomFilterDTO> customFilter, List<ChartExtFilterRequest> extFilterRequestList); public abstract String getSQLSummaryAsTmp(String sql, List<ChartViewFieldDTO> yAxis, List<ChartCustomFilterDTO> customFilter, List<ChartExtFilterRequest> extFilterRequestList);
public abstract String wrapSql(String sql);
} }
...@@ -338,6 +338,16 @@ public class DorisQueryProvider extends QueryProvider { ...@@ -338,6 +338,16 @@ public class DorisQueryProvider extends QueryProvider {
return getSQLSummary(" (" + sql + ") AS tmp ", yAxis, customFilter, extFilterRequestList); return getSQLSummary(" (" + sql + ") AS tmp ", yAxis, customFilter, extFilterRequestList);
} }
@Override
public String wrapSql(String sql) {
sql = sql.trim();
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
String tmpSql = "SELECT * FROM (" + sql + ") AS tmp " + " LIMIT 0";
return tmpSql;
}
public String transMysqlFilterTerm(String term) { public String transMysqlFilterTerm(String term) {
switch (term) { switch (term) {
case "eq": case "eq":
......
...@@ -346,6 +346,16 @@ public class MysqlQueryProvider extends QueryProvider { ...@@ -346,6 +346,16 @@ public class MysqlQueryProvider extends QueryProvider {
return getSQLSummary(" (" + sqlFix(sql) + ") AS tmp ", yAxis, customFilter, extFilterRequestList); return getSQLSummary(" (" + sqlFix(sql) + ") AS tmp ", yAxis, customFilter, extFilterRequestList);
} }
@Override
public String wrapSql(String sql) {
sql = sql.trim();
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
String tmpSql = "SELECT * FROM (" + sql + ") AS tmp " + " LIMIT 0";
return tmpSql;
}
public String transMysqlFilterTerm(String term) { public String transMysqlFilterTerm(String term) {
switch (term) { switch (term) {
case "eq": case "eq":
......
...@@ -490,6 +490,16 @@ public class OracleQueryProvider extends QueryProvider { ...@@ -490,6 +490,16 @@ public class OracleQueryProvider extends QueryProvider {
return sql; return sql;
} }
@Override
public String wrapSql(String sql) {
sql = sql.trim();
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
String tmpSql = "SELECT * FROM (" + sql + ") tmp " + " where rownum <= 0";
return tmpSql;
}
private String transDateFormat(String dateStyle, String datePattern) { private String transDateFormat(String dateStyle, String datePattern) {
String split = "-"; String split = "-";
if (StringUtils.equalsIgnoreCase(datePattern, "date_sub")) { if (StringUtils.equalsIgnoreCase(datePattern, "date_sub")) {
......
...@@ -260,6 +260,16 @@ public class SqlserverQueryProvider extends QueryProvider { ...@@ -260,6 +260,16 @@ public class SqlserverQueryProvider extends QueryProvider {
return null; return null;
} }
@Override
public String wrapSql(String sql) {
sql = sql.trim();
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
String tmpSql = "SELECT * FROM (" + sql + ") AS tmp " + " LIMIT 0";
return tmpSql;
}
public String transMysqlFilterTerm(String term) { public String transMysqlFilterTerm(String term) {
switch (term) { switch (term) {
case "eq": case "eq":
......
...@@ -721,13 +721,14 @@ public class DataSetTableService { ...@@ -721,13 +721,14 @@ public class DataSetTableService {
}); });
List<String> originNameFileds = datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList()); List<String> originNameFileds = datasetTableFields.stream().map(DatasetTableField::getOriginName).collect(Collectors.toList());
Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId()); Datasource ds = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
QueryProvider qp = ProviderFactory.getQueryProvider(ds.getType());
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds); datasourceRequest.setDatasource(ds);
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加 if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {// 增量添加
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()) String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
datasourceRequest.setQuery(extractDataService.sqlFix(sql)); datasourceRequest.setQuery(qp.wrapSql(sql));
List<String> sqlFileds = new ArrayList<>(); List<String> sqlFileds = new ArrayList<>();
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed -> { datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed -> {
sqlFileds.add(filed); sqlFileds.add(filed);
...@@ -740,7 +741,7 @@ public class DataSetTableService { ...@@ -740,7 +741,7 @@ public class DataSetTableService {
if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除 if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete()) && StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete().replace(" ", ""))) {// 增量删除
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()) String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString());
datasourceRequest.setQuery(extractDataService.sqlFix(sql)); datasourceRequest.setQuery(qp.wrapSql(sql));
List<String> sqlFileds = new ArrayList<>(); List<String> sqlFileds = new ArrayList<>();
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed -> { datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed -> {
sqlFileds.add(filed); sqlFileds.add(filed);
......
...@@ -580,10 +580,11 @@ public class ExtractDataService { ...@@ -580,10 +580,11 @@ public class ExtractDataService {
} }
private String fetchSqlField(String sql, Datasource ds) throws Exception { private String fetchSqlField(String sql, Datasource ds) throws Exception {
QueryProvider qp = ProviderFactory.getQueryProvider(ds.getType());
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType()); DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest(); DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds); datasourceRequest.setDatasource(ds);
datasourceRequest.setQuery(sqlFix(sql)); datasourceRequest.setQuery(qp.wrapSql(sql));
List<String> dorisFileds = new ArrayList<>(); List<String> dorisFileds = new ArrayList<>();
datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed ->{ datasourceProvider.fetchResultField(datasourceRequest).stream().map(TableFiled::getFieldName).forEach(filed ->{
dorisFileds.add(DorisTableUtils.columnName(filed)); dorisFileds.add(DorisTableUtils.columnName(filed));
...@@ -591,16 +592,6 @@ public class ExtractDataService { ...@@ -591,16 +592,6 @@ public class ExtractDataService {
return String.join(",", dorisFileds); return String.join(",", dorisFileds);
} }
public String sqlFix(String sql) {
sql = sql.trim();
if (sql.lastIndexOf(";") == (sql.length() - 1)) {
sql = sql.substring(0, sql.length() - 1);
}
String tmpSql = "SELECT * FROM (" + sql + ") AS tmp " + " LIMIT 0";
return tmpSql;
}
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception { private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
TransMeta transMeta = new TransMeta(); TransMeta transMeta = new TransMeta();
String outFile = null; String outFile = null;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论