提交 5a6ff503 authored 作者: taojinlong's avatar taojinlong

feat: 支持直连模式(elasticsearch)

上级 3de59dee
......@@ -6,7 +6,8 @@ public enum DatasourceTypes {
pg("pg", "pg", "org.postgresql.Driver", "\"", "\"", "\"", "\""),
sqlServer("sqlServer", "sqlServer", "com.microsoft.sqlserver.jdbc.SQLServerDriver", "\"", "\"", "\"", "\""),
doris("doris", "doris", "com.mysql.jdbc.Driver", "`", "`", "", ""),
oracle("oracle", "oracle", "oracle.jdbc.driver.OracleDriver", "\"", "\"", "\"", "\"");
oracle("oracle", "oracle", "oracle.jdbc.driver.OracleDriver", "\"", "\"", "\"", "\""),
es("es", "es", "", "\"", "\"", "\"", "\"");
private String feature;
private String desc;
......
package io.dataease.datasource.dto;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class EsConfigDTO {
private String url;
private String username;
private String password;
private String dataSourceType = "es";
}
package io.dataease.datasource.dto.es;
import lombok.Data;
import java.util.ArrayList;
import java.util.List;
@Data
public class EsReponse {
private List<Column>columns = new ArrayList<>();
private List<String[]>rows = new ArrayList<>();
private String cursor;
private Integer status;
private Error error;
@Data
public class Error{
private String type;
private String reason;
}
@Data
public class Column {
private String name;
private String type;
}
}
package io.dataease.datasource.dto.es;
import lombok.Data;
@Data
public class Requst {
private String query;
private Integer fetch_size = 10000;
}
package io.dataease.datasource.dto.es;
import lombok.Data;
@Data
public class RequstWithCursor extends Requst{
private String cursor;
}
package io.dataease.datasource.provider;
import com.google.gson.Gson;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import io.dataease.commons.utils.HttpClientConfig;
import io.dataease.commons.utils.HttpClientUtil;
import io.dataease.datasource.dto.*;
import io.dataease.datasource.dto.es.EsReponse;
import io.dataease.datasource.dto.es.Requst;
import io.dataease.datasource.dto.es.RequstWithCursor;
import io.dataease.datasource.request.DatasourceRequest;
import io.dataease.exception.DataEaseException;
import io.dataease.provider.es.EsQueryProvider;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHeaders;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.*;
@Service("es")
public class EsProvider extends DatasourceProvider {
private static Map<String, ComboPooledDataSource> jdbcConnection = new HashMap<>();
private static int initPoolSize = 5;
private static int maxConnections = 200;
/**
* 增加缓存机制 key 由 'provider_sql_' dsr.datasource.id dsr.table dsr.query共4部分组成,命中则使用缓存直接返回不再执行sql逻辑
* @param dsr
* @return
* @throws Exception
*/
/**
* 这里使用声明式缓存不是很妥当
* 改为chartViewService中使用编程式缓存
@Cacheable(
value = JdbcConstants.JDBC_PROVIDER_KEY,
key = "'provider_sql_' + #dsr.datasource.id + '_' + #dsr.table + '_' + #dsr.query",
condition = "#dsr.pageSize == null || #dsr.pageSize == 0L"
)
*/
@Override
public List<String[]> getData(DatasourceRequest dsr) throws Exception {
List<String[]> list = new LinkedList<>();
try {
EsConfigDTO esConfigDTO = new Gson().fromJson(dsr.getDatasource().getConfiguration(), EsConfigDTO.class);
HttpClientConfig httpClientConfig = new HttpClientConfig();
String auth = esConfigDTO.getUsername() + ":" + esConfigDTO.getPassword();
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
httpClientConfig.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + new String(encodedAuth));
Requst requst = new Requst();
requst.setQuery(dsr.getQuery());
requst.setFetch_size(dsr.getFetchSize());
String url = esConfigDTO.getUrl().endsWith("/") ? esConfigDTO.getUrl() + "_sql?format=json" : esConfigDTO.getUrl() + "/" + "_sql?format=json";
String response = HttpClientUtil.post(url, new Gson().toJson(requst), httpClientConfig);
EsReponse esReponse = new Gson().fromJson(response, EsReponse.class);
list.addAll(fetchResult(esReponse));
if(dsr.isPageable()){
Integer realSize = dsr.getPage() * dsr.getPageSize() < list.size() ? dsr.getPage() * dsr.getPageSize(): list.size();
list = list.subList((dsr.getPage() - 1) * dsr.getPageSize(), realSize);
}
if(!dsr.isPreviewData()){
while (StringUtils.isNotEmpty(esReponse.getCursor())) {
RequstWithCursor requstWithCursor = new RequstWithCursor();
requstWithCursor.setQuery(dsr.getQuery());
requstWithCursor.setFetch_size(dsr.getFetchSize());
requstWithCursor.setCursor(esReponse.getCursor());
response = HttpClientUtil.post(url, new Gson().toJson(requstWithCursor), httpClientConfig);
esReponse = new Gson().fromJson(response, EsReponse.class);
list.addAll(fetchResult(esReponse));
}
}
} catch (Exception e) {
DataEaseException.throwException(e);
}
return list;
}
@Override
public List<String[]> fetchResult(DatasourceRequest datasourceRequest) throws Exception {
List<String[]> list = new LinkedList<>();
try {
String response = exexQuery(datasourceRequest, datasourceRequest.getQuery(), "_sql?format=json");
list = fetchResult(response);
} catch (Exception e) {
DataEaseException.throwException(e);
}
return list;
}
private List<String[]> fetchResult(String response) throws Exception {
EsReponse esReponse = new Gson().fromJson(response, EsReponse.class);
return fetchResult(esReponse);
}
private List<String[]> fetchResult(EsReponse esReponse) throws Exception {
List<String[]> list = new LinkedList<>();
if(esReponse.getError() != null){
throw new Exception(esReponse.getError().getReason());
}
list.addAll(esReponse.getRows());
return list;
}
@Override
public List<TableFiled> fetchResultField(DatasourceRequest datasourceRequest) throws Exception {
List<TableFiled> tableFileds = new ArrayList<>();
try {
String response = exexQuery(datasourceRequest, datasourceRequest.getQuery(), "_sql?format=json");
tableFileds = fetchResultField(response);
} catch (Exception e) {
DataEaseException.throwException(e);
}
return tableFileds;
}
private List<TableFiled> fetchResultField(String response) throws Exception {
List<TableFiled> fieldList = new ArrayList<>();
EsReponse esReponse = new Gson().fromJson(response, EsReponse.class);
if(esReponse.getError() != null){
throw new Exception(esReponse.getError().getReason());
}
for (String[] row : esReponse.getRows()) {
TableFiled field = new TableFiled();
field.setFieldName(row[0]);
field.setRemarks(row[0]);
field.setFieldType(row[2]);
field.setFieldSize(EsQueryProvider.transFieldTypeSize(row[2]));
fieldList.add(field);
}
return fieldList;
}
@Override
public Map<String, List> fetchResultAndField(DatasourceRequest datasourceRequest) throws Exception {
Map<String, List> result = new HashMap<>();
try {
String response = exexQuery(datasourceRequest, datasourceRequest.getQuery(), "_sql?format=json");
result.put("dataList", fetchResult(response));
result.put("fieldList", fetchResultField(response));
} catch (Exception e) {
DataEaseException.throwException(e);
}
return result;
}
@Override
public void handleDatasource(DatasourceRequest datasourceRequest, String type) throws Exception {
}
@Override
public List<String> getTables(DatasourceRequest datasourceRequest) throws Exception {
List<String> tables = new ArrayList<>();
try {
String response = exexQuery(datasourceRequest, "show tables", "_sql?format=json");
tables = fetchTables(response);
} catch (Exception e) {
DataEaseException.throwException(e);
}
return tables;
}
private List<String> fetchTables(String response) throws Exception {
List<String> tables = new ArrayList<>();
EsReponse esReponse = new Gson().fromJson(response, EsReponse.class);
if(esReponse.getError() != null){
throw new Exception(esReponse.getError().getReason());
}
for (String[] row : esReponse.getRows()) {
if(row.length == 3 && row[1].equalsIgnoreCase("TABLE") && row[2].equalsIgnoreCase("INDEX")){
tables.add(row[0]);
}
}
return tables;
}
@Override
public List<String> getSchema(DatasourceRequest datasourceRequest) throws Exception {
return new ArrayList<>();
}
@Override
public List<TableFiled> getTableFileds(DatasourceRequest datasourceRequest) throws Exception {
List<TableFiled> tableFileds = new ArrayList<>();
try {
String response = exexQuery(datasourceRequest, "desc " + datasourceRequest.getTable(), "_sql?format=json");
tableFileds = fetchResultField(response);
} catch (Exception e) {
DataEaseException.throwException(e);
}
return tableFileds;
}
@Override
public void checkStatus(DatasourceRequest datasourceRequest) throws Exception {
getTables(datasourceRequest);
}
private String exexQuery(DatasourceRequest datasourceRequest, String sql, String uri){
EsConfigDTO esConfigDTO = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), EsConfigDTO.class);
HttpClientConfig httpClientConfig = new HttpClientConfig();
String auth = esConfigDTO.getUsername() + ":" + esConfigDTO.getPassword();
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
httpClientConfig.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + new String(encodedAuth));
Requst requst = new Requst();
requst.setQuery(sql);
requst.setFetch_size(datasourceRequest.getFetchSize());
String url = esConfigDTO.getUrl().endsWith("/") ? esConfigDTO.getUrl() + uri : esConfigDTO.getUrl() + "/" + uri;
String response = HttpClientUtil.post(url, new Gson().toJson(requst), httpClientConfig);
return response;
}
}
......@@ -30,6 +30,8 @@ public class ProviderFactory implements ApplicationContextAware {
return context.getBean("jdbc", DatasourceProvider.class);
case pg:
return context.getBean("jdbc", DatasourceProvider.class);
case es:
return context.getBean("es", DatasourceProvider.class);
default:
return context.getBean("jdbc", DatasourceProvider.class);
}
......@@ -48,6 +50,8 @@ public class ProviderFactory implements ApplicationContextAware {
return context.getBean("pgQuery", QueryProvider.class);
case oracle:
return context.getBean("oracleQuery", QueryProvider.class);
case es:
return context.getBean("esQuery", QueryProvider.class);
default:
return context.getBean("mysqlQuery", QueryProvider.class);
}
......
......@@ -11,8 +11,12 @@ public class DatasourceRequest {
protected String query;
protected String table;
protected Datasource datasource;
private Long pageSize;
private Long startPage;
private Integer pageSize;
private Integer page;
private Integer realSize;
private Integer fetchSize = 10000;
private boolean pageable = false;
private boolean previewData = false;
}
package io.dataease.provider.es;
import io.dataease.provider.SQLConstants;
import static io.dataease.datasource.constants.DatasourceTypes.es;
/**
* @Author gin
* @Date 2021/7/8 7:22 下午
*/
public class EsSqlLConstants extends SQLConstants {
public static final String KEYWORD_TABLE = es.getKeywordPrefix() + "%s" + es.getKeywordSuffix();
public static final String KEYWORD_FIX = "%s." + es.getKeywordPrefix() + "%s" + es.getKeywordSuffix();
public static final String UNIX_TIMESTAMP = "UNIX_TIMESTAMP(%s)";
public static final String DATETIME_FORMAT = "DATETIME_FORMAT(%s,'%s')";
public static final String CAST = "CAST(%s AS %s)";
public static final String DEFAULT_DATE_FORMAT = "YYYY-MM-dd HH:mm:ss";
public static final String DEFAULT_INT_FORMAT = "DECIMAL(20,0)";
public static final String DEFAULT_FLOAT_FORMAT = "DECIMAL(20,2)";
public static final String WHERE_VALUE_NULL = "(NULL,'')";
public static final String WHERE_VALUE_VALUE = "'%s'";
public static final String AGG_COUNT = "COUNT(*)";
public static final String AGG_FIELD = "%s(%s)";
public static final String WHERE_BETWEEN = "'%s' AND '%s'";
public static final String BRACKETS = "(%s)";
}
......@@ -470,14 +470,22 @@ public class DataSetTableService {
QueryProvider qp = ProviderFactory.getQueryProvider(ds.getType());
datasourceRequest.setQuery(qp.createQuerySQLWithPage(table, fields, page, pageSize, realSize, false, ds));
map.put("sql", datasourceRequest.getQuery());
datasourceRequest.setPage(page);
datasourceRequest.setFetchSize(Integer.parseInt(dataSetTableRequest.getRow()));
datasourceRequest.setPageSize(pageSize);
datasourceRequest.setRealSize(realSize);
datasourceRequest.setPreviewData(true);
try {
datasourceRequest.setPageable(true);
data.addAll(datasourceProvider.getData(datasourceRequest));
} catch (Exception e) {
e.printStackTrace();
DEException.throwException(e.getMessage());
}
try {
datasourceRequest.setQuery(qp.createQueryTableWithLimit(table, fields, Integer.valueOf(dataSetTableRequest.getRow()), false, ds));
datasourceRequest.setPageable(false);
dataSetPreviewPage.setTotal(datasourceProvider.getData(datasourceRequest).size());
} catch (Exception e) {
e.printStackTrace();
......
......@@ -1044,6 +1044,7 @@ export default {
please_input_user_name: 'Please enter user name',
please_input_password: 'Please enter Password',
please_input_host: 'Please enter host',
please_input_url: 'Please enter url adress',
please_input_port: 'Please enter port',
modify: 'Edit data Source',
validate_success: 'Verification successful',
......
......@@ -1043,6 +1043,7 @@ export default {
please_input_user_name: '請輸入用戶名',
please_input_password: '請輸入密碼',
please_input_host: '請輸入主機',
please_input_url: '請輸入URL地址',
please_input_port: '請輸入',
modify: '編輯數據源',
validate_success: '校驗成功',
......
......@@ -1044,6 +1044,7 @@ export default {
please_input_user_name: '请输入用户名',
please_input_password: '请输入密码',
please_input_host: '请输入主机',
please_input_url: '请输入URL地址',
please_input_port: '请输入端口',
modify: '编辑数据源',
validate_success: '校验成功',
......
......@@ -27,6 +27,9 @@
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.host')" prop="configuration.host">
<el-input v-model="form.configuration.host" autocomplete="off" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='es'" :label="$t('datasource.url')" prop="configuration.url">
<el-input v-model="form.configuration.url" placeholder="请输入 Elasticsearch 地址,如: http://es_host:es_port" autocomplete="off" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.data_base')" prop="configuration.dataBase">
<el-input v-model="form.configuration.dataBase" autocomplete="off" />
</el-form-item>
......@@ -36,10 +39,10 @@
<el-radio v-model="form.configuration.connectionType" label="serviceName">{{ $t('datasource.oracle_service_name') }}</el-radio>
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.user_name')" prop="configuration.username">
<el-form-item :label="$t('datasource.user_name')" prop="configuration.username">
<el-input v-model="form.configuration.username" autocomplete="off" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.password')" prop="configuration.password">
<el-form-item :label="$t('datasource.password')" prop="configuration.password">
<el-input v-model="form.configuration.password" autocomplete="off" show-password />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.port')" prop="configuration.port">
......@@ -61,25 +64,25 @@
/>
</el-select>
</el-form-item>
<el-collapse>
<el-collapse v-if="form.configuration.dataSourceType=='jdbc'">
<el-collapse-item :title="$t('datasource.priority')" name="1">
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.initial_pool_size')" prop="configuration.initialPoolSize">
<el-form-item :label="$t('datasource.initial_pool_size')" prop="configuration.initialPoolSize">
<el-input v-model="form.configuration.initialPoolSize" autocomplete="off" type="number" min="0" size="small" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.min_pool_size')" prop="configuration.minPoolSize">
<el-form-item :label="$t('datasource.min_pool_size')" prop="configuration.minPoolSize">
<el-input v-model="form.configuration.minPoolSize" autocomplete="off" type="number" min="0" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.max_pool_size')" prop="configuration.maxPoolSize">
<el-form-item :label="$t('datasource.max_pool_size')" prop="configuration.maxPoolSize">
<el-input v-model="form.configuration.maxPoolSize" autocomplete="off" type="number" min="0" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.max_idle_time')" prop="configuration.maxIdleTime">
<el-form-item :label="$t('datasource.max_idle_time')" prop="configuration.maxIdleTime">
<el-input v-model="form.configuration.maxIdleTime" autocomplete="off" type="number" min="0" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.acquire_increment')" prop="configuration.acquireIncrement">
<el-form-item :label="$t('datasource.acquire_increment')" prop="configuration.acquireIncrement">
<el-input v-model="form.configuration.acquireIncrement" autocomplete="off" type="number" min="0" />
</el-form-item>
<el-form-item v-if="form.configuration.dataSourceType=='jdbc'" :label="$t('datasource.connect_timeout')" prop="configuration.connectTimeout">
<el-form-item :label="$t('datasource.connect_timeout')" prop="configuration.connectTimeout">
<el-input v-model="form.configuration.connectTimeout" autocomplete="off" type="number" min="0" />
</el-form-item>
......@@ -135,6 +138,7 @@ export default {
'configuration.username': [{ required: true, message: this.$t('datasource.please_input_user_name'), trigger: 'blur' }],
'configuration.password': [{ required: true, message: this.$t('datasource.please_input_password'), trigger: 'change' }],
'configuration.host': [{ required: true, message: this.$t('datasource.please_input_host'), trigger: 'change' }],
'configuration.url': [{ required: true, message: this.$t('datasource.please_input_url'), trigger: 'change' }],
'configuration.port': [{ required: true, message: this.$t('datasource.please_input_port'), trigger: 'change' }],
'configuration.initialPoolSize': [{ required: true, message: this.$t('datasource.please_input_initial_pool_size'), trigger: 'change' }],
'configuration.minPoolSize': [{ required: true, message: this.$t('datasource.please_input_min_pool_size'), trigger: 'change' }],
......@@ -146,7 +150,8 @@ export default {
allTypes: [{ name: 'mysql', label: 'MySQL', type: 'jdbc' },
{ name: 'oracle', label: 'Oracle', type: 'jdbc' },
{ name: 'sqlServer', label: 'SQL Server', type: 'jdbc' },
{ name: 'pg', label: 'PostgreSQL', type: 'jdbc' }],
{ name: 'pg', label: 'PostgreSQL', type: 'jdbc' },
{ name: 'es', label: 'Elasticsearch', type: 'es' }],
schemas: [],
canEdit: false,
originConfiguration: {}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论