Unverified 提交 ec28949d authored 作者: taojinlong's avatar taojinlong 提交者: GitHub

Merge pull request #1966 from dataease/pr@dev@cluster

Merge branch 'dev' into pr@dev@cluster
......@@ -33,7 +33,7 @@ import java.util.*;
public class ExcelXlsxReader extends DefaultHandler {
/**
* 自定义获取表格某些信
* 自定义获取表格某些信
*/
public Map map = new TreeMap<String,String>();
/**
......
......@@ -10,6 +10,7 @@ import io.dataease.controller.request.dataset.DataSetGroupRequest;
import io.dataease.dto.dataset.DataSetGroupDTO;
import io.dataease.service.dataset.DataSetGroupService;
import io.dataease.service.dataset.ExtractDataService;
import io.dataease.service.kettle.KettleService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.shiro.authz.annotation.Logical;
......@@ -32,6 +33,8 @@ public class DataSetGroupController {
private DataSetGroupService dataSetGroupService;
@Resource
private ExtractDataService extractDataService;
@Resource
private KettleService kettleService;
@DePermissions(value = {
@DePermission(type = DePermissionType.DATASET, value = "id"),
......@@ -71,6 +74,6 @@ public class DataSetGroupController {
@ApiIgnore
@PostMapping("/isKettleRunning")
public boolean isKettleRunning() {
return extractDataService.isKettleRunning();
return kettleService.isKettleRunning();
}
}
......@@ -42,4 +42,6 @@ public class EngineController {
return engineService.save(engine);
}
}
package io.dataease.controller.engine;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import io.dataease.auth.annotation.DePermission;
import io.dataease.base.domain.DeEngine;
import io.dataease.commons.constants.DePermissionType;
import io.dataease.commons.constants.ResourceAuthLevel;
import io.dataease.commons.utils.PageUtils;
import io.dataease.commons.utils.Pager;
import io.dataease.controller.ResultHolder;
import io.dataease.dto.KettleDTO;
import io.dataease.plugins.common.entity.XpackConditionEntity;
import io.dataease.plugins.common.entity.XpackGridRequest;
import io.dataease.plugins.config.SpringContextUtil;
import io.dataease.plugins.xpack.auth.dto.request.DataSetColumnPermissionsDTO;
import io.dataease.plugins.xpack.auth.service.ColumnPermissionService;
import io.dataease.service.kettle.KettleService;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.*;
import springfox.documentation.annotations.ApiIgnore;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@ApiIgnore
@RequestMapping("kettle")
@RestController
public class KettleController {
@Resource
private KettleService kettleService;
@ApiIgnore
@PostMapping("save")
public ResultHolder save(@RequestBody DeEngine engine) throws Exception{
return kettleService.save(engine);
}
@ApiIgnore
@PostMapping("validate")
public void validate(@RequestBody KettleDTO kettleDTO) throws Exception{
kettleService.validate(kettleDTO);
}
@ApiIgnore
@PostMapping("validate/{id}")
public ResultHolder validate(@PathVariable String id) throws Exception{
return kettleService.validate(id);
}
@PostMapping("/pageList/{goPage}/{pageSize}")
public Pager<List<DeEngine>> pageList( @PathVariable int goPage, @PathVariable int pageSize) {
Page<Object> page = PageHelper.startPage(goPage, pageSize, true);
return PageUtils.setPageInfo(page, kettleService.pageList());
}
@ApiIgnore
@DeleteMapping("delete/{id}")
public void delete(@PathVariable String id) throws Exception{
kettleService.delete(id);
}
}
package io.dataease.dto;
import lombok.Data;
@Data
public class KettleDTO {
private String carte;
private String port;
private String user;
private String passwd;
}
......@@ -7,5 +7,8 @@ import lombok.Setter;
@Setter
public class DorisConfiguration extends MysqlConfiguration {
private Integer httpPort;
private Integer httpPort = 8030;
private Integer replicationNum = 1;
private Integer bucketNum = 10;
}
......@@ -3,6 +3,7 @@ package io.dataease.job.sechedule;
import com.fit2cloud.quartz.anno.QuartzScheduled;
import io.dataease.service.datasource.DatasourceService;
import io.dataease.service.dataset.DataSetTableService;
import io.dataease.service.kettle.KettleService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
......@@ -13,6 +14,8 @@ public class Schedular {
private DataSetTableService dataSetTableService;
@Resource
private DatasourceService datasourceService;
@Resource
private KettleService kettleService;
@QuartzScheduled(cron = "0 0/3 * * * ?")
public void updateDatasetTableStatus() {
......@@ -24,4 +27,9 @@ public class Schedular {
datasourceService.updateDatasourceStatus();
}
@QuartzScheduled(cron = "0 0/30 * * * ?")
public void updateKettleStatus() {
kettleService.updateKettleStatus();
}
}
package io.dataease.provider;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.Datasource;
import java.util.List;
......@@ -17,7 +18,7 @@ public abstract class DDLProvider {
public abstract String replaceTable(String name);
public abstract String createTableSql(String name, List<DatasetTableField> datasetTableFields);
public abstract String createTableSql(String name, List<DatasetTableField> datasetTableFields, Datasource engine);
public abstract String insertSql(String name, List<String[]> dataList, int page, int pageNumber);
}
package io.dataease.provider;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.Datasource;
import io.dataease.commons.utils.Md5Utils;
import java.util.Arrays;
......@@ -28,7 +29,7 @@ public class DDLProviderImpl extends DDLProvider {
}
@Override
public String createTableSql(String name, List<DatasetTableField> datasetTableFields) {
public String createTableSql(String name, List<DatasetTableField> datasetTableFields, Datasource engine) {
return null;
}
......
......@@ -487,7 +487,6 @@ public class JdbcProvider extends DatasourceProvider {
break;
case impala:
ImpalaConfiguration impalaConfiguration = new Gson().fromJson(datasourceRequest.getDatasource().getConfiguration(), ImpalaConfiguration.class);
System.out.println(new Gson().toJson(impalaConfiguration));
username = impalaConfiguration.getUsername();
password = impalaConfiguration.getPassword();
driver = impalaConfiguration.getDriver();
......
package io.dataease.provider.engine.doris;
import com.google.gson.Gson;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.Datasource;
import io.dataease.commons.utils.TableUtils;
import io.dataease.dto.datasource.DorisConfiguration;
import io.dataease.dto.datasource.JdbcConfiguration;
import io.dataease.dto.datasource.MysqlConfiguration;
import io.dataease.provider.DDLProviderImpl;
import org.springframework.stereotype.Service;
......@@ -16,8 +21,8 @@ public class DorisDDLProvider extends DDLProviderImpl {
private static final String creatTableSql = "CREATE TABLE IF NOT EXISTS `TABLE_NAME`" +
"Column_Fields" +
"UNIQUE KEY(dataease_uuid)\n" +
"DISTRIBUTED BY HASH(dataease_uuid) BUCKETS 10\n" +
"PROPERTIES(\"replication_num\" = \"1\");";
"DISTRIBUTED BY HASH(dataease_uuid) BUCKETS BUCKETS_NUM\n" +
"PROPERTIES(\"replication_num\" = \"ReplicationNum\");";
@Override
public String createView(String name, String viewSQL) {
......@@ -41,9 +46,12 @@ public class DorisDDLProvider extends DDLProviderImpl {
}
@Override
public String createTableSql(String tableName, List<DatasetTableField> datasetTableFields) {
public String createTableSql(String tableName, List<DatasetTableField> datasetTableFields, Datasource engine) {
DorisConfiguration dorisConfiguration = new Gson().fromJson(engine.getConfiguration(), DorisConfiguration.class);
String dorisTableColumnSql = createDorisTableColumnSql(datasetTableFields);
return creatTableSql.replace("TABLE_NAME", tableName).replace("Column_Fields", dorisTableColumnSql);
return creatTableSql.replace("TABLE_NAME", tableName).replace("Column_Fields", dorisTableColumnSql)
.replace("BUCKETS_NUM", dorisConfiguration.getBucketNum().toString())
.replace("ReplicationNum", dorisConfiguration.getReplicationNum().toString());
}
private String createDorisTableColumnSql(final List<DatasetTableField> datasetTableFields) {
......
package io.dataease.provider.engine.mysql;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.Datasource;
import io.dataease.commons.utils.TableUtils;
import io.dataease.provider.DDLProviderImpl;
import org.springframework.stereotype.Service;
......@@ -43,7 +44,7 @@ public class MysqlDDLProvider extends DDLProviderImpl {
}
@Override
public String createTableSql(String tableName, List<DatasetTableField> datasetTableFields) {
public String createTableSql(String tableName, List<DatasetTableField> datasetTableFields, Datasource engine) {
String dorisTableColumnSql = createDorisTableColumnSql(datasetTableFields);
return creatTableSql.replace("TABLE_NAME", tableName).replace("Column_Fields", dorisTableColumnSql);
}
......
......@@ -43,7 +43,6 @@ public class MongoQueryProvider extends QueryProvider {
@Override
public Integer transFieldType(String field) {
System.out.println(field);
field = field.toUpperCase();
switch (field) {
case "CHAR":
......
......@@ -25,6 +25,7 @@ import io.dataease.exception.DataEaseException;
import io.dataease.listener.util.CacheUtils;
import io.dataease.provider.QueryProvider;
import io.dataease.service.engine.EngineService;
import io.dataease.service.kettle.KettleService;
import io.dataease.service.message.DeMsgutil;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
......@@ -98,6 +99,8 @@ public class ExtractDataService {
private ExtChartViewMapper extChartViewMapper;
@Resource
private EngineService engineService;
@Resource
private KettleService kettleService;
private static final String lastUpdateTime = "${__last_update_time__}";
private static final String currentUpdateTime = "${__current_update_time__}";
......@@ -107,14 +110,6 @@ public class ExtractDataService {
@Value("${kettle.files.keep:false}")
private boolean kettleFilesKeep;
@Value("${carte.host:127.0.0.1}")
private String carte;
@Value("${carte.port:8080}")
private String port;
@Value("${carte.user:cluster}")
private String user;
@Value("${carte.passwd:cluster}")
private String passwd;
private static final String shellScript = "result=`curl --location-trusted -u %s:%s -H \"label:%s\" -H \"column_separator:%s\" -H \"columns:%s\" -H \"merge_type: %s\" -T %s -XPUT http://%s:%s/api/%s/%s/_stream_load`\n" +
"if [ $? -eq 0 ] ; then\n" +
......@@ -605,7 +600,7 @@ public class ExtractDataService {
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(engine);
DDLProvider ddlProvider = ProviderFactory.getDDLProvider(engine.getType());
datasourceRequest.setQuery(ddlProvider.createTableSql(tableName, datasetTableFields));
datasourceRequest.setQuery(ddlProvider.createTableSql(tableName, datasetTableFields, engine));
jdbcProvider.exec(datasourceRequest);
}
......@@ -730,7 +725,7 @@ public class ExtractDataService {
break;
}
SlaveServer remoteSlaveServer = getSlaveServer();
SlaveServer remoteSlaveServer = kettleService.getSlaveServer();
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
jobExecutionConfiguration.setRemoteServer(remoteSlaveServer);
jobExecutionConfiguration.setRepository(repository);
......@@ -738,7 +733,6 @@ public class ExtractDataService {
TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration();
transExecutionConfiguration.setRepository(repository);
transExecutionConfiguration.setRemoteServer(remoteSlaveServer);
String lastTranceId = Trans.sendToSlaveServer(transMeta, transExecutionConfiguration, repository, null);
SlaveServerTransStatus transStatus = null;
boolean executing = true;
......@@ -772,15 +766,6 @@ public class ExtractDataService {
}
}
private SlaveServer getSlaveServer() {
SlaveServer remoteSlaveServer = new SlaveServer();
remoteSlaveServer.setHostname(carte);// 设置远程IP
remoteSlaveServer.setPort(port);// 端口
remoteSlaveServer.setUsername(user);
remoteSlaveServer.setPassword(passwd);
return remoteSlaveServer;
}
private void generateJobFile(String extractType, DatasetTable datasetTable, String columnFields) throws Exception {
if (engineService.isSimpleMode()) {
return;
......@@ -1251,33 +1236,6 @@ public class ExtractDataService {
}
}
public boolean isKettleRunning() {
try {
if (!InetAddress.getByName(carte).isReachable(1000)) {
return false;
}
} catch (Exception e) {
return false;
}
HttpGet getMethod = new HttpGet("http://" + carte + ":" + port);
HttpClientManager.HttpClientBuilderFacade clientBuilder = HttpClientManager.getInstance().createBuilder();
clientBuilder.setConnectionTimeout(1);
clientBuilder.setCredentials(user, passwd);
try {
CloseableHttpClient httpClient = clientBuilder.build();
HttpResponse httpResponse = httpClient.execute(getMethod);
int statusCode = httpResponse.getStatusLine().getStatusCode();
if (statusCode != -1 && statusCode < 400) {
httpResponse.getEntity().getContent().close();
return true;
} else {
return false;
}
} catch (Exception e) {
return false;
}
}
private final static String handleBinaryType = " \t\tif(\"FIELD\".equalsIgnoreCase(filed)){\n" +
" get(Fields.Out, filed).setValue(r, \"\");\n" +
" get(Fields.Out, filed).getValueMeta().setType(2);\n" +
......
......@@ -59,6 +59,11 @@ public class DatasourceService {
@DeCleaner(DePermissionType.DATASOURCE)
public Datasource addDatasource(Datasource datasource) throws Exception{
try{
DatasourceTypes datasourceType = DatasourceTypes.valueOf(datasource.getType());
}catch (Exception e){
throw e;
}
checkName(datasource);
long currentTimeMillis = System.currentTimeMillis();
datasource.setId(UUID.randomUUID().toString());
......
package io.dataease.service.engine;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import io.dataease.base.domain.Datasource;
import io.dataease.base.domain.DeEngine;
import io.dataease.base.domain.DeEngineExample;
import io.dataease.base.mapper.DeEngineMapper;
import io.dataease.commons.utils.BeanUtils;
import io.dataease.commons.utils.HttpClientConfig;
import io.dataease.commons.utils.HttpClientUtil;
import io.dataease.controller.ResultHolder;
import io.dataease.controller.request.datasource.DatasourceRequest;
import io.dataease.dto.DatasourceDTO;
import io.dataease.dto.datasource.DorisConfiguration;
import io.dataease.listener.util.CacheUtils;
import io.dataease.provider.ProviderFactory;
import io.dataease.provider.datasource.DatasourceProvider;
import io.dataease.service.datasource.DatasourceService;
......@@ -19,7 +26,9 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Base64;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@Service
......@@ -31,35 +40,39 @@ public class EngineService {
private DeEngineMapper deEngineMapper;
@Resource
private DatasourceService datasource;
static private Datasource ds = null;
public Boolean isLocalMode(){
public Boolean isLocalMode() {
return env.getProperty("engine_mode", "local").equalsIgnoreCase("local");
}
public Boolean isSimpleMode(){
public Boolean isSimpleMode() {
return env.getProperty("engine_mode", "local").equalsIgnoreCase("simple");
}
public Boolean isClusterMode(){
public Boolean isClusterMode() {
return env.getProperty("engine_mode", "local").equalsIgnoreCase("cluster");
}
public String mode(){
public String mode() {
return env.getProperty("engine_mode", "local");
}
public DeEngine info(){
List<DeEngine> deEngines = deEngineMapper.selectByExampleWithBLOBs(new DeEngineExample());
if(CollectionUtils.isEmpty(deEngines)){
public DeEngine info() {
DeEngineExample deEngineExample = new DeEngineExample();
if (isClusterMode()) {
deEngineExample.createCriteria().andTypeEqualTo("engine_doris");
} else {
deEngineExample.createCriteria().andTypeEqualTo("engine_mysql");
}
List<DeEngine> deEngines = deEngineMapper.selectByExampleWithBLOBs(deEngineExample);
if (CollectionUtils.isEmpty(deEngines)) {
return new DeEngine();
}
return deEngines.get(0);
}
public ResultHolder validate(DatasourceDTO datasource) throws Exception {
if(StringUtils.isEmpty(datasource.getType()) || StringUtils.isEmpty(datasource.getConfiguration())){
if (StringUtils.isEmpty(datasource.getType()) || StringUtils.isEmpty(datasource.getConfiguration())) {
throw new Exception("未完整设置数据引擎");
}
try {
......@@ -67,39 +80,70 @@ public class EngineService {
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(datasource);
datasourceProvider.checkStatus(datasourceRequest);
return ResultHolder.success(datasource);
}catch (Exception e){
return ResultHolder.error("Datasource is invalid: " + e.getMessage());
} catch (Exception e) {
return ResultHolder.error("Engine is invalid: " + e.getMessage());
}
if (datasource.getType().equalsIgnoreCase("engine_doris")) {
DorisConfiguration dorisConfiguration = new Gson().fromJson(datasource.getConfiguration(), DorisConfiguration.class);
HttpClientConfig httpClientConfig = new HttpClientConfig();
String authValue = "Basic " + Base64.getUrlEncoder().encodeToString((dorisConfiguration.getUsername()
+ ":" + dorisConfiguration.getPassword()).getBytes());
httpClientConfig.addHeader("Authorization", authValue);
String response;
try {
response = HttpClientUtil.get("http://" + dorisConfiguration.getHost() + ":" + dorisConfiguration.getHttpPort() + "/api/backends", httpClientConfig);
}catch (Exception e){
return ResultHolder.error("Engine is invalid: " + e.getMessage());
}
JSONArray backends = Optional.ofNullable(JSONObject.parseObject(response).getJSONObject("data")).orElse(new JSONObject()).getJSONArray("backends");
if(CollectionUtils.isEmpty(backends)){
return ResultHolder.error("Engine is invalid: no backends found.");
}
Integer alives = 0;
for (int i = 0; i < backends.size(); i++) {
JSONObject kv = backends.getJSONObject(i);
if (kv.getBoolean("is_alive")) {
alives ++;
}
}
if(alives < dorisConfiguration.getReplicationNum()){
return ResultHolder.error("Engine params is invalid: 副本数量不能大于节点数量.");
}
}
return ResultHolder.success(datasource);
}
public ResultHolder save(DeEngine engine) throws Exception {
if(StringUtils.isEmpty(engine.getId())){
if (StringUtils.isEmpty(engine.getId())) {
engine.setId(UUID.randomUUID().toString());
deEngineMapper.insert(engine);
}else {
} else {
deEngineMapper.updateByPrimaryKeyWithBLOBs(engine);
datasource.handleConnectionPool(getDeEngine(), "delete");
}
datasource.handleConnectionPool(this.ds, "delete");
setDs(engine);
datasource.handleConnectionPool(this.ds, "add");
datasource.handleConnectionPool(getDeEngine(), "add");
return ResultHolder.success(engine);
}
private void setDs(DeEngine engine){
if(this.ds == null){
this.ds = new Datasource();
BeanUtils.copyBean(this.ds, engine);
}else {
BeanUtils.copyBean(this.ds, engine);
}
private void setDs(DeEngine engine) {
Datasource datasource = new Datasource();
BeanUtils.copyBean(datasource, engine);
CacheUtils.put("ENGINE", "engine", datasource, null, null);
}
public Datasource getDeEngine() throws Exception{
if (this.ds != null) {
return this.ds;
public Datasource getDeEngine() throws Exception {
Object catcheEngine = CacheUtils.get("ENGINE", "engine");
if (catcheEngine != null) {
return (Datasource) catcheEngine;
}
if(isLocalMode()){
if (isLocalMode()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("dataSourceType", "jdbc");
jsonObject.put("dataBase", env.getProperty("doris.db", "doris"));
......@@ -116,19 +160,14 @@ public class EngineService {
engine.setType("engine_doris");
engine.setConfiguration(jsonObject.toJSONString());
setDs(engine);
}else {
} else {
List<DeEngine> deEngines = deEngineMapper.selectByExampleWithBLOBs(new DeEngineExample());
if(CollectionUtils.isEmpty(deEngines)){
if (CollectionUtils.isEmpty(deEngines)) {
throw new Exception("未设置数据引擎");
}
setDs(deEngines.get(0));
}
// if(isSimpleMode()){
//
// }
//TODO cluster mode
return this.ds;
return getDeEngine();
}
......
package io.dataease.service.kettle;
import com.google.gson.Gson;
import io.dataease.base.domain.DeEngine;
import io.dataease.base.domain.DeEngineExample;
import io.dataease.base.mapper.DeEngineMapper;
import io.dataease.commons.utils.HttpClientConfig;
import io.dataease.commons.utils.HttpClientUtil;
import io.dataease.controller.ResultHolder;
import io.dataease.dto.KettleDTO;
import io.dataease.service.engine.EngineService;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.checkerframework.checker.units.qual.K;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.util.HttpClientManager;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.util.Base64;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
@Service
public class KettleService {
@Resource
private Environment env;
@Resource
private DeEngineMapper deEngineMapper;
@Resource
private EngineService engineService;
public ResultHolder save(DeEngine kettle) throws Exception {
try {
validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class));
kettle.setStatus("Success");
}catch (Exception e){
kettle.setStatus("Error");
}
if (StringUtils.isEmpty(kettle.getId())) {
kettle.setId(UUID.randomUUID().toString());
kettle.setType("kettle");
deEngineMapper.insert(kettle);
} else {
deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle);
}
return ResultHolder.success(kettle);
}
public void delete(String id){
deEngineMapper.deleteByPrimaryKey(id);
}
public void validate(KettleDTO kettleDTO) throws Exception {
HttpClientConfig httpClientConfig = new HttpClientConfig();
String authValue = "Basic " + Base64.getUrlEncoder().encodeToString((kettleDTO.getUser()
+ ":" + kettleDTO.getPasswd()).getBytes());
httpClientConfig.addHeader("Authorization", authValue);
String response = HttpClientUtil.get("http://" + kettleDTO.getCarte() + ":" + kettleDTO.getPort() + "/kettle/status/", httpClientConfig);
}
public ResultHolder validate(String id) {
DeEngine kettle = deEngineMapper.selectByPrimaryKey(id);
try {
validate(new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class));
kettle.setStatus("Success");
deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle);
return ResultHolder.success(kettle);
}catch (Exception e){
kettle.setStatus("Error");
deEngineMapper.updateByPrimaryKeyWithBLOBs(kettle);
return ResultHolder.error(e.getMessage());
}
}
public List<DeEngine> pageList(){
DeEngineExample deEngineExample = new DeEngineExample();
deEngineExample.createCriteria().andTypeEqualTo("kettle");
return deEngineMapper.selectByExampleWithBLOBs(deEngineExample);
}
public void updateKettleStatus(){
if(!engineService.isClusterMode()){
return;
}
List<DeEngine>kettles = pageList();
kettles.forEach(kettle -> {
validate(kettle.getId());
});
}
public SlaveServer getSlaveServer() throws Exception{
SlaveServer remoteSlaveServer = new SlaveServer();
if(engineService.isLocalMode()){
remoteSlaveServer.setHostname(env.getProperty("carte.host", "127.0.0.1"));
remoteSlaveServer.setPort(env.getProperty("carte.port", "8080"));
remoteSlaveServer.setUsername(env.getProperty("carte.user", "cluster"));
remoteSlaveServer.setPassword(env.getProperty("carte.passwd", "cluster"));
}else {
List<DeEngine> kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success"))
.collect(Collectors.toList());
if(CollectionUtils.isEmpty(kettles)){
throw new Exception("No valid kettle service.");
}
DeEngine kettle = kettles.get(new Random().nextInt(kettles.size()));
KettleDTO kettleDTO = new Gson().fromJson(kettle.getConfiguration(), KettleDTO.class);
remoteSlaveServer.setHostname(kettleDTO.getCarte());
remoteSlaveServer.setPort(kettleDTO.getPort());
remoteSlaveServer.setUsername(kettleDTO.getUser());
remoteSlaveServer.setPort(kettleDTO.getPasswd());
}
return remoteSlaveServer;
}
public boolean isKettleRunning() {
if(engineService.isLocalMode()){
try {
KettleDTO kettleDTO = new KettleDTO();
kettleDTO.setCarte(env.getProperty("carte.host", "127.0.0.1"));
kettleDTO.setPort(env.getProperty("carte.port", "8080"));
kettleDTO.setUser(env.getProperty("carte.user", "cluster"));
kettleDTO.setPasswd(env.getProperty("carte.passwd", "cluster"));
validate(kettleDTO);
return true;
}catch (Exception e){
return false;
}
}
if(engineService.isClusterMode()){
List<DeEngine> kettles = pageList().stream().filter(kettle -> kettle.getStatus() != null && kettle.getStatus().equalsIgnoreCase("Success"))
.collect(Collectors.toList());
if(CollectionUtils.isEmpty(kettles)){
return false;
}else {
return true;
}
}
return false;
}
}
import request from '@/utils/request'
import {validateDs} from "@/api/system/datasource";
export function validate(data) {
return request({
url: '/kettle/validate',
method: 'post',
loading: true,
data
})
}
export function validateById(id) {
return request({
url: '/kettle/validate/' + id,
method: 'post',
loading: true
})
}
export function save(data) {
return request({
url: '/kettle/save',
method: 'post',
loading: true,
data
})
}
export function deleteKettle(id) {
return request({
url: '/delete/' + id,
method: 'delete',
loading: true
})
}
export function pageList(url, data) {
return request({
url: url,
method: 'post',
loading: true,
data
})
}
......@@ -1329,6 +1329,13 @@ export default {
min_pool_size: 'Minimum of connections',
max_pool_size: 'Maximum connection',
max_idle_time: 'Maximum idle (seconds)',
doris_host: 'Doris Address',
query_port: 'Query Port',
http_port: 'Http Port',
bucket_num: 'Bucket number',
replication_num: 'Replication number',
please_input_bucket_num: 'Please enter Bucket number',
please_input_replication_num: 'Please enter Replication number',
acquire_increment: 'Growth number',
connect_timeout: 'Connection timeout (seconds)',
please_input_initial_pool_size: 'Please enter the number of initial connections',
......@@ -1928,5 +1935,13 @@ export default {
email: 'Email:',
tel: 'Tel:',
web: 'Web:'
},
kettle: {
add: 'Add Kettle',
status: 'Status',
carte: 'Kettle Address',
port: 'Port',
user: 'User',
passwd: 'Password'
}
}
......@@ -1330,6 +1330,13 @@ export default {
min_pool_size: '最小連接數',
max_pool_size: '最大連接數',
max_idle_time: '最大空閑(秒)',
doris_host: 'Doris 地址',
query_port: 'Query Port',
http_port: 'Http Port',
bucket_num: 'Bucket 數量',
replication_num: '副本數量',
please_input_bucket_num: '請輸入 Bucket 數量',
please_input_replication_num: '請輸入副本數量',
acquire_increment: '增長數',
connect_timeout: '連接超時(秒)',
please_input_initial_pool_size: '請輸入初始連接數',
......@@ -1938,5 +1945,13 @@ export default {
email: '郵箱:',
tel: '電話:',
web: '網址:'
},
kettle: {
add: '添加 Kettle 服務',
status: '狀態',
carte: 'Kettle 地址',
port: '端口',
user: '用戶名',
passwd: '密碼'
}
}
......@@ -279,7 +279,7 @@ export default {
id: 'ID',
millisecond: '毫秒',
cannot_be_null: '不能为空',
required: '{0}是必填的',
required: '必填',
already_exists: '名称不能重复',
modifier: '修改人',
validate: '校验',
......@@ -1302,6 +1302,9 @@ export default {
user_name: '用户名',
password: '密码',
host: '主机名/IP地址',
doris_host: 'Doris 地址',
query_port: 'Query Port',
http_port: 'Http Port',
port: '端口',
datasource_url: '地址',
please_input_datasource_url: '请输入 Elasticsearch 地址,如: http://es_host:es_port',
......@@ -1333,6 +1336,10 @@ export default {
initial_pool_size: '初始连接数',
min_pool_size: '最小连接数',
max_pool_size: '最大连接数',
bucket_num: 'Bucket 数量',
replication_num: '副本数量',
please_input_bucket_num: '请输入 Bucket 数量',
please_input_replication_num: '请输入副本数量',
max_idle_time: '最大空闲(秒)',
acquire_increment: '增长数',
connect_timeout: '连接超时(秒)',
......@@ -1946,5 +1953,13 @@ export default {
email: '邮箱:',
tel: '电话:',
web: '网址:'
},
kettle: {
add: '添加 Kettle 服务',
status: '状态',
carte: 'Kettle 地址',
port: '端口',
user: '用户名',
passwd: '密码'
}
}
<template>
<div>
<el-form ref="form" v-loading="loading"
:model="form"
:rules="rules"
class="demo-form-inline"
:disabled="show"
label-width="180px"
label-position="top"
size="small"
>
<el-row>
<el-col>
<el-form-item :label="$t('datasource.doris_host')" prop="configuration.host">
<el-input v-model="form.configuration.host"/>
</el-form-item>
</el-col>
</el-row>
<el-row>
<el-col>
<el-form-item :label="$t('datasource.data_base')" prop="configuration.dataBase">
<el-input v-model="form.configuration.dataBase"/>
</el-form-item>
</el-col>
</el-row>
<el-row>
<el-col>
<el-form-item :label="$t('datasource.user_name')">
<el-input v-model="form.configuration.username"/>
</el-form-item>
</el-col>
</el-row>
<el-row>
<el-col>
<el-form-item :label="$t('datasource.password')">
<el-input v-model="form.configuration.password" show-password/>
</el-form-item>
</el-col>
</el-row>
<el-row>
<el-col>
<el-form-item :label="$t('datasource.query_port')" prop="configuration.port">
<el-input v-model="form.configuration.port" autocomplete="off" type="number" min="0"/>
</el-form-item>
</el-col>
</el-row>
<el-row>
<el-col>
<el-form-item :label="$t('datasource.http_port')" prop="configuration.port">
<el-input v-model="form.configuration.httpPort" autocomplete="off" type="number" min="0"/>
</el-form-item>
</el-col>
</el-row>
<el-collapse>
<el-collapse-item :title="$t('datasource.priority')" name="1">
<el-form-item :label="$t('datasource.replication_num')" prop="configuration.replicationNum">
<el-input v-model="form.configuration.replicationNum" autocomplete="off" type="number" min="1"/>
</el-form-item>
<el-form-item :label="$t('datasource.bucket_num')" prop="configuration.bucketNum">
<el-input v-model="form.configuration.bucketNum" autocomplete="off" type="number" min="1"/>
</el-form-item>
<el-form-item :label="$t('datasource.initial_pool_size')" prop="configuration.initialPoolSize">
<el-input v-model="form.configuration.initialPoolSize" autocomplete="off" type="number" min="0" size="small"/>
</el-form-item>
<el-form-item :label="$t('datasource.min_pool_size')" prop="configuration.minPoolSize">
<el-input v-model="form.configuration.minPoolSize" autocomplete="off" type="number" min="0"/>
</el-form-item>
<el-form-item :label="$t('datasource.max_pool_size')" prop="configuration.maxPoolSize">
<el-input v-model="form.configuration.maxPoolSize" autocomplete="off" type="number" min="0"/>
</el-form-item>
</el-collapse-item>
</el-collapse>
</el-form>
<div>
<el-button type="primary" size="small" @click="validaDatasource">
{{ $t('commons.validate') }}
</el-button>
<el-button v-if="showEdit" size="small" @click="edit">
{{ $t('commons.edit') }}
</el-button>
<el-button v-if="showSave" type="success" size="small" @click="save">
{{ $t('commons.save') }}
</el-button>
<el-button v-if="showCancel" type="info" size="small" @click="cancel">
{{ $t('commons.cancel') }}
</el-button>
</div>
</div>
</template>
<script>
import {engineInfo, validate, save} from '@/api/system/engine'
import i18n from "@/lang";
export default {
name: 'ClusterMode',
data() {
return {
form:
{
type: 'engine_doris',
configuration: {
host: '',
dataBase: '',
username: '',
password: '',
port: '',
httpPort: 8030,
extraParams: 'characterEncoding=UTF-8&connectTimeout=5000&useSSL=false&allowPublicKeyRetrieval=true',
replicationNum: 1,
bucketNum: 10,
minPoolSize: 5,
maxPoolSize: 50,
initialPoolSize: 5
}
},
originConfiguration: {
host: '',
dataBase: '',
username: '',
password: '',
port: '',
httpPort: 8030,
extraParams: 'characterEncoding=UTF-8&connectTimeout=5000&useSSL=false&allowPublicKeyRetrieval=true',
replicationNum: 1,
bucketNum: 10,
minPoolSize: 5,
maxPoolSize: 50,
initialPoolSize: 5
},
input: '',
visible: true,
showEdit: true,
showSave: false,
showCancel: false,
show: true,
disabledConnection: false,
disabledSave: false,
loading: false,
rules: {
'configuration.host': [
{
required: true,
message: this.$t('datasource.please_input_host'),
trigger: ['change', 'blur']
}
],
'configuration.port': [
{
required: true,
message: this.$t('datasource.please_input_port'),
trigger: ['change', 'blur']
}
],
'configuration.dataBase': [
{
required: true,
message: this.$t('datasource.please_input_data_base'),
trigger: ['change', 'blur']
}
],
'configuration.replicationNum': [
{
required: true,
message: this.$t('datasource.please_input_replication_num'),
trigger: ['change', 'blur']
}
],
'configuration.bucketNum': [
{
required: true,
message: this.$t('datasource.please_input_bucket_num'),
trigger: ['change', 'blur']
}
]
},
allTypes: [
{
name: 'engine_mysql',
label: 'MySQL',
type: 'jdbc',
extraParams: 'characterEncoding=UTF-8&connectTimeout=5000&useSSL=false&allowPublicKeyRetrieval=true'
}
]
}
},
created() {
this.query()
},
methods: {
query() {
engineInfo().then(response => {
if (response.data.id) {
this.form = JSON.parse(JSON.stringify(response.data))
this.form.configuration = JSON.parse(this.form.configuration)
this.originConfiguration = JSON.parse(JSON.stringify(this.form.configuration))
}
this.$nextTick(() => {
this.$refs.form.clearValidate()
})
})
},
edit() {
this.showEdit = false
this.showSave = true
this.showCancel = true
this.show = false
},
save() {
if (this.form.configuration.dataSourceType === 'jdbc' && this.form.configuration.port <= 0) {
this.$message.error(i18n.t('datasource.port_no_less_then_0'))
return
}
if (this.form.configuration.initialPoolSize < 0 || this.form.configuration.minPoolSize < 0 || this.form.configuration.maxPoolSize < 0) {
this.$message.error(i18n.t('datasource.no_less_then_0'))
return
}
this.$refs.form.validate(valid => {
if (!valid) {
return false
}
const form = JSON.parse(JSON.stringify(this.form))
form.configuration = JSON.stringify(form.configuration)
save(form).then(res => {
this.showEdit = true
this.showCancel = false
this.showSave = false
this.show = true
this.originConfiguration = JSON.parse(JSON.stringify(this.form.configuration))
this.$success(i18n.t('commons.save_success'))
})
})
},
cancel() {
this.showEdit = true
this.showCancel = false
this.showSave = false
this.show = true
this.form.configuration = JSON.parse(JSON.stringify(this.originConfiguration))
},
changeType() {
for (let i = 0; i < this.allTypes.length; i++) {
if (this.allTypes[i].name === this.form.type) {
this.form.configuration.dataSourceType = this.allTypes[i].type
this.form.configuration.extraParams = this.allTypes[i].extraParams
}
}
},
validaDatasource() {
if (!this.form.configuration.schema && this.form.type === 'oracle') {
this.$message.error(i18n.t('datasource.please_choose_schema'))
return
}
if (this.form.configuration.dataSourceType === 'jdbc' && this.form.configuration.port <= 0) {
this.$message.error(i18n.t('datasource.port_no_less_then_0'))
return
}
this.$refs.form.validate(valid => {
if (valid) {
const data = JSON.parse(JSON.stringify(this.form))
data.configuration = JSON.stringify(data.configuration)
validate(data).then(res => {
if (res.success) {
this.$success(i18n.t('datasource.validate_success'))
} else {
if (res.message.length < 2500) {
this.$error(res.message)
} else {
this.$error(res.message.substring(0, 2500) + '......')
}
}
}).catch(res => {
this.$error(res.message)
})
} else {
return false
}
})
},
}
}
</script>
<style scoped>
</style>
<template>
<el-col>
<el-row style="margin-top: 10px;">
<el-button icon="el-icon-circle-plus-outline" @click="create(undefined)">{{ $t('kettle.add') }}</el-button>
<fu-table :data="data">
<el-table-column prop="configuration.carte" :label="$t('kettle.carte')"/>
<el-table-column prop="configuration.port" :label="$t('kettle.port')"/>
<el-table-column prop="status" :label="$t('kettle.status')">
<template slot-scope="scope">
<span v-if="scope.row.status === 'Error'" style="color: red">{{ $t('datasource.invalid') }}</span>
<span v-if="scope.row.status === 'Success'" style="color: green">{{ $t('datasource.valid') }}</span>
</template>
</el-table-column>
<fu-table-operations :buttons="buttons" :label="$t('commons.operating')" fix/>
</fu-table>
<div class="pagination">
<fu-table-pagination :current-page.sync="paginationConfig.currentPage"
:page-size.sync="paginationConfig.pageSize"
:total="paginationConfig.total"
@size-change="sizeChange"
@current-change="currentChange"/>
</div>
</el-row>
<el-dialog v-dialogDrag :title="edit_dialog_title" :visible="show_dialog" :before-close="closeDialog"
:show-close="true" width="50%" class="dialog-css" append-to-body>
<el-col>
<el-form ref="kettleform" :form="form" :model="form" label-width="120px" :rules="rule">
<el-form-item :label="$t('kettle.carte')" prop="configuration.carte">
<el-input v-model="form.configuration.carte"/>
</el-form-item>
<el-form-item :label="$t('kettle.port')" prop="configuration.port">
<el-input v-model="form.configuration.port" autocomplete="off" type="number" min="0"/>
</el-form-item>
<el-form-item :label="$t('kettle.user')" prop="configuration.user">
<el-input v-model="form.configuration.user"/>
</el-form-item>
<el-form-item :label="$t('kettle.passwd')" prop="configuration.passwd">
<el-input v-model="form.configuration.passwd" show-password/>
</el-form-item>
</el-form>
</el-col>
<div slot="footer" class="dialog-footer">
<el-button size="mini" @click="validate()">{{ $t('commons.validate') }}</el-button>
<el-button type="primary" size="mini" @click="save()">{{ $t('commons.save') }}</el-button>
</div>
</el-dialog>
</el-col>
</template>
<script>
import {deleteKettle, validate, save, pageList, validateById} from '@/api/system/kettle'
import i18n from "@/lang";
export default {
name: 'KettleSetting',
data() {
return {
columns: [],
buttons: [
{
label: this.$t('commons.edit'),
icon: 'el-icon-edit',
type: 'primary',
click: this.create
},
{
label: this.$t('commons.validate'),
icon: 'el-icon-success',
type: 'success',
click: this.validateById
},
{
label: this.$t('commons.delete'),
icon: 'el-icon-delete',
type: 'danger',
click: this.delete,
}
],
last_condition: null,
paginationConfig: {
currentPage: 1,
pageSize: 10,
total: 0
},
data: [],
show_dialog: false,
edit_dialog_title: '',
form: {
configuration: {
carte: '',
port: '',
user: '',
passwd: ''
}
},
rule: {
'configuration.carte': [{
required: true,
message: this.$t('commons.required'),
trigger: 'blur'
}],
'configuration.port': [{
required: true,
message: this.$t('commons.required'),
trigger: 'blur'
}],
'configuration.user': [{
required: true,
message: this.$t('commons.required'),
trigger: 'blur'
}],
'configuration.passwd': [{
required: true,
message: this.$t('dcommons.required'),
trigger: 'blur'
}]
}
}
},
created() {
this.search()
},
methods: {
currentChange() {
this.search()
},
sizeChange() {
this.currentPage = 1;
this.search()
},
search() {
const {currentPage, pageSize} = this.paginationConfig
pageList('/kettle/pageList/' + currentPage + '/' + pageSize, {}).then(response => {
this.data = response.data.listObject
this.data.forEach(item => {
item.configuration = JSON.parse(item.configuration)
})
this.paginationConfig.total = response.data.itemCount
})
},
delete(item) {
deleteKettle(item.id).then(response => {
this.search()
})
},
create(item) {
if (!item) {
this.targetObjs = []
this.form = {configuration: {carte: '', port: '', user: '', passwd: ''}}
this.edit_dialog_title = this.$t('kettle.add')
} else {
this.edit_dialog_title = this.$t('commons.edit')
this.form = JSON.parse(JSON.stringify(item))
}
this.show_dialog = true
},
save() {
this.$refs.kettleform.validate(valid => {
if (!valid) {
return false
}
const form = JSON.parse(JSON.stringify(this.form))
form.configuration = JSON.stringify(form.configuration)
save(form).then(res => {
this.show_dialog = false
this.$success(i18n.t('commons.save_success'))
this.search()
})
})
},
closeDialog() {
this.show_dialog = false
this.form = {configuration: {carte: '', port: '', user: '', passwd: ''}}
},
validate() {
this.$refs.kettleform.validate(valid => {
if (valid) {
validate(this.form.configuration).then(res => {
if (res.success) {
this.$success(i18n.t('datasource.validate_success'))
} else {
if (res.message.length < 2500) {
this.$error(res.message)
} else {
this.$error(res.message.substring(0, 2500) + '......')
}
}
}).catch(res => {
this.$error(res.message)
})
} else {
return
}
})
},
validateById(item) {
validateById(item.id).then(res => {
if (res.success) {
item.status = res.data.status
this.$success(i18n.t('datasource.validate_success'))
} else {
item.status = 'Error'
if (res.message.length < 2500) {
this.$error(res.message)
} else {
this.$error(res.message.substring(0, 2500) + '......')
}
}
}).catch(res => {
this.$error(res.message)
})
},
}
}
</script>
<style scoped>
</style>
<template>
<div>
<!--邮件表单-->
<el-form ref="form" v-loading="loading"
:model="form"
:rules="rules"
......
......@@ -30,6 +30,14 @@
<simple-mode />
</el-tab-pane>
<el-tab-pane v-if="engineMode==='cluster'" :lazy="true" :label="$t('system_parameter_setting.engine_mode_setting')" name="six">
<cluster-mode />
</el-tab-pane>
<el-tab-pane :lazy="true" :label="$t('system_parameter_setting.engine_mode_setting')" name="seven">
<kettle-setting />
</el-tab-pane>
</el-tabs>
</layout-content>
</template>
......@@ -37,13 +45,15 @@
import BasicSetting from './BasicSetting'
import EmailSetting from './EmailSetting'
import SimpleMode from './SimpleModeSetting'
import ClusterMode from './ClusterModeSetting'
import KettleSetting from './KettleSetting'
import LayoutContent from '@/components/business/LayoutContent'
import PluginCom from '@/views/system/plugin/PluginCom'
import { pluginLoaded } from '@/api/user'
import { engineMode } from '@/api/system/engine'
export default {
components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode},
components: { BasicSetting, EmailSetting, LayoutContent, PluginCom, SimpleMode, ClusterMode, KettleSetting},
data() {
return {
activeName: 'zero',
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论