提交 9302d0f0 authored 作者: junjie's avatar junjie

feat(backend):数据抽取,切换Doris

上级 84f68f1e
......@@ -2,7 +2,6 @@ package io.dataease.config;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import io.dataease.commons.utils.CommonThreadPool;
import org.apache.spark.sql.SparkSession;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
......@@ -32,31 +31,31 @@ public class CommonConfig {
// return configuration;
// }
@Bean
@ConditionalOnMissingBean
public SparkSession javaSparkSession() {
SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]"))
.config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR"))
.config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
.config("spark.executor.cores", env.getProperty("spark.executor.cores", "8"))
.config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b"))
.config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000"))
.config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m"))
.config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false"))
.config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true"))
.config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true"))
.config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M"))
.config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000"))
.config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false"))
.config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000"))
.config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000"))
.config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false"))
.config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600"))
.getOrCreate();
return spark;
}
// @Bean
// @ConditionalOnMissingBean
// public SparkSession javaSparkSession() {
// SparkSession spark = SparkSession.builder()
// .appName(env.getProperty("spark.appName", "DataeaseJob"))
// .master(env.getProperty("spark.master", "local[*]"))
// .config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR"))
//// .config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
//// .config("spark.executor.cores", env.getProperty("spark.executor.cores", "8"))
//// .config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b"))
//// .config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000"))
//// .config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m"))
//// .config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false"))
//// .config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true"))
//// .config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true"))
//// .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M"))
//// .config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000"))
//// .config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false"))
//// .config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000"))
//// .config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000"))
//// .config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false"))
//// .config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600"))
// .getOrCreate();
// return spark;
// }
@Bean
@ConditionalOnMissingBean
......
package io.dataease.listener;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableExample;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.datasource.service.DatasourceService;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.spark.SparkCalc;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
@Order(value = 2)
......
package io.dataease.listener;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableExample;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.spark.SparkCalc;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
@Order(value = 2)
public class AppStartReadHBaseListener implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private CommonThreadPool commonThreadPool;
@Resource
private SparkCalc sparkCalc;
@Resource
private Environment env; // 保存了配置文件的信息
@Resource
private DatasetTableMapper datasetTableMapper;
@Resource
private DataSetTableFieldsService dataSetTableFieldsService;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
// System.out.println("================= Read HBase start =================");
// // 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存
// DatasetTableExample datasetTableExample = new DatasetTableExample();
// datasetTableExample.createCriteria().andModeEqualTo(1);
// List<DatasetTable> datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample);
// for (DatasetTable table : datasetTables) {
//// commonThreadPool.addTask(() -> {
// try {
// List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
// sparkCalc.getHBaseDataAndCache(table.getId(), fields);
// } catch (Exception e) {
// e.printStackTrace();
// }
//// });
// }
}
}
//package io.dataease.listener;
//
//import io.dataease.base.mapper.DatasetTableMapper;
//import io.dataease.commons.utils.CommonThreadPool;
//import io.dataease.service.dataset.DataSetTableFieldsService;
//import org.springframework.boot.context.event.ApplicationReadyEvent;
//import org.springframework.context.ApplicationListener;
//import org.springframework.core.annotation.Order;
//import org.springframework.core.env.Environment;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.Resource;
//
//@Component
//@Order(value = 2)
//public class AppStartReadHBaseListener implements ApplicationListener<ApplicationReadyEvent> {
// @Resource
// private CommonThreadPool commonThreadPool;
//// @Resource
//// private SparkCalc sparkCalc;
// @Resource
// private Environment env; // 保存了配置文件的信息
//
// @Resource
// private DatasetTableMapper datasetTableMapper;
// @Resource
// private DataSetTableFieldsService dataSetTableFieldsService;
//
// @Override
// public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
//// System.out.println("================= Read HBase start =================");
//// // 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存
//// DatasetTableExample datasetTableExample = new DatasetTableExample();
//// datasetTableExample.createCriteria().andModeEqualTo(1);
//// List<DatasetTable> datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample);
//// for (DatasetTable table : datasetTables) {
////// commonThreadPool.addTask(() -> {
//// try {
//// List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
//// sparkCalc.getHBaseDataAndCache(table.getId(), fields);
//// } catch (Exception e) {
//// e.printStackTrace();
//// }
////// });
//// }
// }
//}
package io.dataease.service.chart;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.dataease.base.domain.*;
import io.dataease.base.mapper.ChartViewMapper;
import io.dataease.commons.utils.AuthUtils;
import io.dataease.commons.utils.BeanUtils;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.controller.request.chart.ChartExtFilterRequest;
import io.dataease.controller.request.chart.ChartExtRequest;
import io.dataease.controller.request.chart.ChartViewRequest;
......@@ -20,7 +22,6 @@ import io.dataease.dto.chart.Series;
import io.dataease.dto.dataset.DataTableInfoDTO;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.dataset.DataSetTableService;
import io.dataease.service.spark.SparkCalc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -43,8 +44,8 @@ public class ChartViewService {
private DataSetTableService dataSetTableService;
@Resource
private DatasourceService datasourceService;
@Resource
private SparkCalc sparkCalc;
// @Resource
// private SparkCalc sparkCalc;
@Resource
private DataSetTableFieldsService dataSetTableFieldsService;
......@@ -146,8 +147,18 @@ public class ChartViewService {
data = datasourceProvider.getData(datasourceRequest);
} else if (table.getMode() == 1) {// 抽取
// 获取数据集de字段
List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
data = sparkCalc.getData(table.getId(), fields, xAxis, yAxis, "tmp_" + view.getId().split("-")[0], extFilterList);
// List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
// data = sparkCalc.getData(table.getId(), fields, xAxis, yAxis, "tmp_" + view.getId().split("-")[0], extFilterList);
// 连接doris,构建doris数据源查询
Datasource ds = dorisDatasource();
DatasourceProvider datasourceProvider = ProviderFactory.getProvider(ds.getType());
DatasourceRequest datasourceRequest = new DatasourceRequest();
datasourceRequest.setDatasource(ds);
String tableName = "ds_" + table.getId().replaceAll("-", "_");
datasourceRequest.setTable(tableName);
datasourceRequest.setQuery(getSQL(ds.getType(), tableName, xAxis, yAxis, extFilterList));
data = datasourceProvider.getData(datasourceRequest);
}
// 图表组件可再扩展
......@@ -214,6 +225,24 @@ public class ChartViewService {
return filter.toString();
}
public Datasource dorisDatasource() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("dataSourceType", "jdbc");
jsonObject.put("dataBase", "example_db");
jsonObject.put("username", "root");
jsonObject.put("password", "dataease");
jsonObject.put("host", "59.110.64.159");
jsonObject.put("port", "9030");
Datasource datasource = new Datasource();
datasource.setId("doris");
datasource.setName("doris");
datasource.setDesc("doris");
datasource.setType("mysql");
datasource.setConfiguration(jsonObject.toJSONString());
return datasource;
}
public String getSQL(String type, String table, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, List<ChartExtFilterRequest> extFilterRequestList) {
DatasourceTypes datasourceType = DatasourceTypes.valueOf(type);
switch (datasourceType) {
......@@ -321,7 +350,7 @@ public class ChartViewService {
return map;
}
public List<ChartView> viewsByIds(List<String> viewIds){
public List<ChartView> viewsByIds(List<String> viewIds) {
ChartViewExample example = new ChartViewExample();
example.createCriteria().andIdIn(viewIds);
return chartViewMapper.selectByExample(example);
......
......@@ -637,11 +637,12 @@ public class DataSetTableService {
private String saveFile(MultipartFile file) throws Exception {
String filename = file.getOriginalFilename();
File p = new File(path);
String dirPath = path + AuthUtils.getUser().getUsername() + "/";
File p = new File(dirPath);
if (!p.exists()) {
p.mkdirs();
}
String filePath = path + AuthUtils.getUser().getUsername() + "/" + filename;
String filePath = dirPath + filename;
File f = new File(filePath);
FileOutputStream fileOutputStream = new FileOutputStream(f);
fileOutputStream.write(file.getBytes());
......
package io.dataease.service.dataset;
import com.google.gson.Gson;
import com.sun.org.apache.bcel.internal.generic.SWITCH;
import io.dataease.base.domain.*;
import io.dataease.base.mapper.DatasourceMapper;
import io.dataease.commons.constants.JobStatus;
......@@ -13,31 +12,15 @@ import io.dataease.datasource.constants.DatasourceTypes;
import io.dataease.datasource.dto.MysqlConfigrationDTO;
import io.dataease.dto.dataset.DataSetTaskLogDTO;
import io.dataease.dto.dataset.DataTableInfoDTO;
import io.dataease.service.spark.SparkCalc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.pentaho.big.data.api.cluster.NamedCluster;
import org.pentaho.big.data.api.cluster.NamedClusterService;
import org.pentaho.big.data.api.cluster.service.locator.NamedClusterServiceLocator;
import org.pentaho.big.data.api.cluster.service.locator.impl.NamedClusterServiceLocatorImpl;
import org.pentaho.big.data.api.initializer.ClusterInitializer;
import org.pentaho.big.data.api.initializer.ClusterInitializerProvider;
import org.pentaho.big.data.api.initializer.impl.ClusterInitializerImpl;
import org.pentaho.big.data.impl.cluster.NamedClusterImpl;
import org.pentaho.big.data.impl.cluster.NamedClusterManager;
import org.pentaho.big.data.kettle.plugins.hbase.MappingDefinition;
import org.pentaho.big.data.kettle.plugins.hbase.output.HBaseOutputMeta;
import org.apache.hadoop.hbase.client.Connection;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.engine.configuration.impl.pentaho.DefaultRunConfiguration;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobExecutionConfiguration;
import org.pentaho.di.job.JobHopMeta;
......@@ -45,49 +28,25 @@ import org.pentaho.di.job.JobMeta;
import org.pentaho.di.job.entries.special.JobEntrySpecial;
import org.pentaho.di.job.entries.success.JobEntrySuccess;
import org.pentaho.di.job.entries.trans.JobEntryTrans;
import org.pentaho.di.job.entries.writetolog.JobEntryWriteToLog;
import org.pentaho.di.job.entry.JobEntryCopy;
import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
import org.pentaho.di.trans.TransConfiguration;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.pentaho.di.trans.steps.textfileoutput.TextFileField;
import org.pentaho.di.trans.steps.textfileoutput.TextFileOutput;
import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta;
import org.pentaho.di.trans.steps.userdefinedjavaclass.InfoStepDefinition;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
import org.pentaho.di.www.SlaveServerJobStatus;
import org.pentaho.runtime.test.RuntimeTest;
import org.pentaho.runtime.test.RuntimeTester;
import org.pentaho.runtime.test.action.RuntimeTestActionHandler;
import org.pentaho.runtime.test.action.RuntimeTestActionService;
import org.pentaho.runtime.test.action.impl.RuntimeTestActionServiceImpl;
import org.pentaho.runtime.test.impl.RuntimeTesterImpl;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.pentaho.di.core.row.ValueMetaInterface;
import scala.annotation.meta.field;
import javax.annotation.Resource;
import javax.sound.sampled.Line;
import java.io.File;
import java.security.MessageDigest;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static org.mockito.Mockito.mock;
@Service
public class ExtractDataService {
......@@ -125,8 +84,8 @@ public class ExtractDataService {
@Value("${hbase.zookeeper.property.clientPort:2181}")
private String zkPort;
@Resource
private SparkCalc sparkCalc;
// @Resource
// private SparkCalc sparkCalc;
public void extractData(String datasetTableId, String taskId, String type) {
......
......@@ -600,8 +600,8 @@ export default {
avg: '平均',
max: '最大值',
min: '最小值',
std: '标准差',
var_samp: '方差',
stddev_pop: '标准差',
var_pop: '方差',
quick_calc: '快速计算',
show_name_set: '显示名设置',
color: '颜色',
......
......@@ -22,8 +22,8 @@
<el-dropdown-item :command="beforeSummary('avg')">{{ $t('chart.avg') }}</el-dropdown-item>
<el-dropdown-item :command="beforeSummary('max')">{{ $t('chart.max') }}</el-dropdown-item>
<el-dropdown-item :command="beforeSummary('min')">{{ $t('chart.min') }}</el-dropdown-item>
<el-dropdown-item :command="beforeSummary('std')">{{ $t('chart.std') }}</el-dropdown-item>
<el-dropdown-item :command="beforeSummary('var_samp')">{{ $t('chart.var_samp') }}</el-dropdown-item>
<el-dropdown-item :command="beforeSummary('stddev_pop')">{{ $t('chart.stddev_pop') }}</el-dropdown-item>
<el-dropdown-item :command="beforeSummary('var_pop')">{{ $t('chart.var_pop') }}</el-dropdown-item>
</el-dropdown-menu>
</el-dropdown>
</el-dropdown-item>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论