提交 a02df9d8 authored 作者: junjie's avatar junjie

feat(backend):spark cache 初步实现

上级 a11eb21c
package io.dataease.commons.utils;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.concurrent.*;
/**
* @Author gin
* @Date 2021/4/13 4:08 下午
*/
public class CommonThreadPool {
private int corePoolSize = 10;
private int maxQueueSize = 10;
private int keepAliveSeconds = 600;
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
@PostConstruct
public void init() {
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize);
scheduledThreadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
}
@PreDestroy
public void shutdown() {
if (scheduledThreadPoolExecutor != null) {
scheduledThreadPoolExecutor.shutdown();
}
}
/**
* 线程池是否可用(实际队列数是否小于最大队列数)
*
* @return true为可用,false不可用
*/
public boolean available() {
return scheduledThreadPoolExecutor.getQueue().size() <= maxQueueSize;
}
/**
* 添加任务,不强制限制队列数
*
* @param task 任务
*/
public void addTask(Runnable task) {
scheduledThreadPoolExecutor.execute(task);
}
/**
* 添加延迟执行任务,不强制限制队列数
*
* @param task 任务
* @param delay 延迟时间
* @param unit 延迟时间单位
*/
public void scheduleTask(Runnable task, long delay, TimeUnit unit) {
scheduledThreadPoolExecutor.schedule(task, delay, unit);
}
/**
* 添加任务和超时时间(超时时间内未执行完的任务将被终止并移除线程池,防止任务执行时间过长而占用线程池)
*
* @param task 任务
* @param timeOut 超时时间
* @param timeUnit 超时时间单位
*/
public void addTask(Runnable task, long timeOut, TimeUnit timeUnit) {
scheduledThreadPoolExecutor.execute(() -> {
ExecutorService executorService = Executors.newSingleThreadExecutor();
try {
Future future = executorService.submit(task);
future.get(timeOut, timeUnit); // 此行会阻塞,直到任务执行完或超时
} catch (TimeoutException timeoutException) {
LogUtil.getLogger().error("timeout to execute task", timeoutException);
} catch (Exception exception) {
LogUtil.getLogger().error("failed to execute task", exception);
} finally {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
}
});
}
public void setCorePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
}
public void setMaxQueueSize(int maxQueueSize) {
this.maxQueueSize = maxQueueSize;
}
public void setKeepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
}
package io.dataease.config; package io.dataease.config;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration; import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import io.dataease.commons.utils.CommonThreadPool;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SparkSession;
...@@ -33,31 +34,20 @@ public class CommonConfig { ...@@ -33,31 +34,20 @@ public class CommonConfig {
return configuration; return configuration;
} }
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public JavaSparkContext javaSparkContext() { public SparkSession javaSparkSession() {
SparkSession spark = SparkSession.builder() SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob")) .appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]")) .master(env.getProperty("spark.master", "local[*]"))
.config("spark.scheduler.mode", "FAIR") .config("spark.scheduler.mode", "FAIR")
.getOrCreate(); .getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); return spark;
return sc;
} }
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
public SQLContext sqlContext(JavaSparkContext javaSparkContext) { public KettleFileRepository kettleFileRepository() throws Exception {
SQLContext sqlContext = new SQLContext(javaSparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
return sqlContext;
}
@Bean
@ConditionalOnMissingBean
public KettleFileRepository kettleFileRepository()throws Exception{
KettleEnvironment.init(); KettleEnvironment.init();
KettleFileRepository repository = new KettleFileRepository(); KettleFileRepository repository = new KettleFileRepository();
KettleFileRepositoryMeta kettleDatabaseMeta = new KettleFileRepositoryMeta("KettleFileRepository", "repo", KettleFileRepositoryMeta kettleDatabaseMeta = new KettleFileRepositoryMeta("KettleFileRepository", "repo",
...@@ -65,4 +55,13 @@ public class CommonConfig { ...@@ -65,4 +55,13 @@ public class CommonConfig {
repository.init(kettleDatabaseMeta); repository.init(kettleDatabaseMeta);
return repository; return repository;
} }
@Bean(destroyMethod = "shutdown")
public CommonThreadPool resourcePoolThreadPool() {
CommonThreadPool commonThreadPool = new CommonThreadPool();
commonThreadPool.setCorePoolSize(20);
commonThreadPool.setMaxQueueSize(100);
commonThreadPool.setKeepAliveSeconds(3600);
return commonThreadPool;
}
} }
...@@ -6,12 +6,14 @@ import io.dataease.service.ScheduleService; ...@@ -6,12 +6,14 @@ import io.dataease.service.ScheduleService;
import io.dataease.service.dataset.DataSetTableTaskService; import io.dataease.service.dataset.DataSetTableTaskService;
import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.List; import java.util.List;
@Component @Component
@Order(value = 1)
public class AppStartListener implements ApplicationListener<ApplicationReadyEvent> { public class AppStartListener implements ApplicationListener<ApplicationReadyEvent> {
@Resource @Resource
private ScheduleService scheduleService; private ScheduleService scheduleService;
......
package io.dataease.listener;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.DatasetTableExample;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.base.domain.DatasetTableFieldExample;
import io.dataease.base.mapper.DatasetTableFieldMapper;
import io.dataease.base.mapper.DatasetTableMapper;
import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.commons.utils.CommonThreadPool;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.spark.SparkCalc;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
@Component
@Order(value = 2)
public class AppStartReadHBaseListener implements ApplicationListener<ApplicationReadyEvent> {
@Resource
private CommonThreadPool commonThreadPool;
@Resource
private SparkCalc sparkCalc;
@Resource
private Environment env; // 保存了配置文件的信息
@Resource
private DatasetTableMapper datasetTableMapper;
@Resource
private DataSetTableFieldsService dataSetTableFieldsService;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
System.out.println("================= Read HBase start =================");
// 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存
DatasetTableExample datasetTableExample = new DatasetTableExample();
datasetTableExample.createCriteria().andModeEqualTo(1);
List<DatasetTable> datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample);
for (DatasetTable table : datasetTables) {
commonThreadPool.addTask(() -> {
try {
List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
sparkCalc.getHBaseDataAndCache(table.getId(), fields);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
...@@ -4,6 +4,7 @@ import com.google.gson.Gson; ...@@ -4,6 +4,7 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import io.dataease.base.domain.*; import io.dataease.base.domain.*;
import io.dataease.base.mapper.ChartViewMapper; import io.dataease.base.mapper.ChartViewMapper;
import io.dataease.base.mapper.DatasetTableFieldMapper;
import io.dataease.commons.utils.AuthUtils; import io.dataease.commons.utils.AuthUtils;
import io.dataease.commons.utils.BeanUtils; import io.dataease.commons.utils.BeanUtils;
import io.dataease.controller.request.chart.ChartViewRequest; import io.dataease.controller.request.chart.ChartViewRequest;
...@@ -16,6 +17,7 @@ import io.dataease.dto.chart.ChartViewDTO; ...@@ -16,6 +17,7 @@ import io.dataease.dto.chart.ChartViewDTO;
import io.dataease.dto.chart.ChartViewFieldDTO; import io.dataease.dto.chart.ChartViewFieldDTO;
import io.dataease.dto.chart.Series; import io.dataease.dto.chart.Series;
import io.dataease.dto.dataset.DataTableInfoDTO; import io.dataease.dto.dataset.DataTableInfoDTO;
import io.dataease.service.dataset.DataSetTableFieldsService;
import io.dataease.service.dataset.DataSetTableService; import io.dataease.service.dataset.DataSetTableService;
import io.dataease.service.spark.SparkCalc; import io.dataease.service.spark.SparkCalc;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
...@@ -41,6 +43,8 @@ public class ChartViewService { ...@@ -41,6 +43,8 @@ public class ChartViewService {
private DatasourceService datasourceService; private DatasourceService datasourceService;
@Resource @Resource
private SparkCalc sparkCalc; private SparkCalc sparkCalc;
@Resource
private DataSetTableFieldsService dataSetTableFieldsService;
public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) { public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) {
checkName(chartView); checkName(chartView);
...@@ -121,9 +125,9 @@ public class ChartViewService { ...@@ -121,9 +125,9 @@ public class ChartViewService {
} }
data = datasourceProvider.getData(datasourceRequest); data = datasourceProvider.getData(datasourceRequest);
} else if (table.getMode() == 1) {// 抽取 } else if (table.getMode() == 1) {// 抽取
// DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class); // 获取数据集de字段
// String tableName = dataTableInfoDTO.getTable() + "-" + table.getDataSourceId();// todo hBase table name maybe change List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
data = sparkCalc.getData(table.getId(), xAxis, yAxis, "tmp_" + view.getId().split("-")[0]); data = sparkCalc.getData(table.getId(), fields, xAxis, yAxis, "tmp_" + view.getId().split("-")[0]);
} }
// 图表组件可再扩展 // 图表组件可再扩展
......
...@@ -60,4 +60,10 @@ public class DataSetTableFieldsService { ...@@ -60,4 +60,10 @@ public class DataSetTableFieldsService {
datasetTableFieldExample.createCriteria().andIdIn(ids); datasetTableFieldExample.createCriteria().andIdIn(ids);
return datasetTableFieldMapper.selectByExample(datasetTableFieldExample); return datasetTableFieldMapper.selectByExample(datasetTableFieldExample);
} }
public List<DatasetTableField> getFieldsByTableId(String id) {
DatasetTableFieldExample datasetTableFieldExample = new DatasetTableFieldExample();
datasetTableFieldExample.createCriteria().andTableIdEqualTo(id);
return datasetTableFieldMapper.selectByExample(datasetTableFieldExample);
}
} }
...@@ -13,6 +13,7 @@ import io.dataease.datasource.constants.DatasourceTypes; ...@@ -13,6 +13,7 @@ import io.dataease.datasource.constants.DatasourceTypes;
import io.dataease.datasource.dto.MysqlConfigrationDTO; import io.dataease.datasource.dto.MysqlConfigrationDTO;
import io.dataease.dto.dataset.DataSetTaskLogDTO; import io.dataease.dto.dataset.DataSetTaskLogDTO;
import io.dataease.dto.dataset.DataTableInfoDTO; import io.dataease.dto.dataset.DataTableInfoDTO;
import io.dataease.service.spark.SparkCalc;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -97,7 +98,7 @@ public class ExtractDataService { ...@@ -97,7 +98,7 @@ public class ExtractDataService {
private DataSetTableTaskService dataSetTableTaskService; private DataSetTableTaskService dataSetTableTaskService;
@Resource @Resource
private DatasourceMapper datasourceMapper; private DatasourceMapper datasourceMapper;
private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池 private static ExecutorService pool = Executors.newScheduledThreadPool(50); //设置连接池
private Connection connection; private Connection connection;
private static String lastUpdateTime = "${__last_update_time__}"; private static String lastUpdateTime = "${__last_update_time__}";
...@@ -120,6 +121,9 @@ public class ExtractDataService { ...@@ -120,6 +121,9 @@ public class ExtractDataService {
@Value("${hbase.zookeeper.property.clientPort:2181}") @Value("${hbase.zookeeper.property.clientPort:2181}")
private String zkPort; private String zkPort;
@Resource
private SparkCalc sparkCalc;
public void extractData(String datasetTableId, String taskId, String type) { public void extractData(String datasetTableId, String taskId, String type) {
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog(); DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
...@@ -131,60 +135,62 @@ public class ExtractDataService { ...@@ -131,60 +135,62 @@ public class ExtractDataService {
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build()); List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable(); String table = new Gson().fromJson(datasetTable.getInfo(), DataTableInfoDTO.class).getTable();
TableName hbaseTable = TableName.valueOf(datasetTableId); TableName hbaseTable = TableName.valueOf(datasetTableId);
switch (updateType){ switch (updateType) {
// 全量更新 // 全量更新
case all_scope: case all_scope:
writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId); writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
//check pentaho_mappings table //check pentaho_mappings table
TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
if(!admin.tableExists(pentaho_mappings)){ if (!admin.tableExists(pentaho_mappings)) {
creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns","key")); creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key"));
} }
//check pentaho files //check pentaho files
if(!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")){ if (!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")) {
generateTransFile("all_scope", datasetTable, datasource, table, datasetTableFields, null); generateTransFile("all_scope", datasetTable, datasource, table, datasetTableFields, null);
generateJobFile("all_scope", datasetTable); generateJobFile("all_scope", datasetTable);
} }
if(!admin.tableExists(hbaseTable)){ if (!admin.tableExists(hbaseTable)) {
creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family)); creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family));
} }
admin.disableTable(hbaseTable); admin.disableTable(hbaseTable);
admin.truncateTable(hbaseTable, true); admin.truncateTable(hbaseTable, true);
extractData(datasetTable, "all_scope"); extractData(datasetTable, "all_scope");
// after sync complete,read data to cache from HBase
sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis()); datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
break; break;
case add_scope: case add_scope:
// 增量更新 // 增量更新
if(!admin.tableExists(hbaseTable)){ if (!admin.tableExists(hbaseTable)) {
LogUtil.error("TableName error, dataaset: " + datasetTableId); LogUtil.error("TableName error, dataaset: " + datasetTableId);
return; return;
} }
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId); DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if(datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())){ if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
return; return;
} }
DatasetTableTaskLog request = new DatasetTableTaskLog(); DatasetTableTaskLog request = new DatasetTableTaskLog();
request.setTableId(datasetTableId); request.setTableId(datasetTableId);
request.setStatus(JobStatus.Completed.name()); request.setStatus(JobStatus.Completed.name());
List<DataSetTaskLogDTO> dataSetTaskLogDTOS = dataSetTableTaskLogService.list(request); List<DataSetTaskLogDTO> dataSetTaskLogDTOS = dataSetTableTaskLogService.list(request);
if(CollectionUtils.isEmpty(dataSetTaskLogDTOS)){ if (CollectionUtils.isEmpty(dataSetTaskLogDTOS)) {
return; return;
} }
writeDatasetTableTaskLog(datasetTableTaskLog,datasetTableId, taskId); writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
// 增量添加 // 增量添加
if(StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))){ if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalAdd().replace(" ", ""))) {
System.out.println("datasetTableIncrementalConfig.getIncrementalAdd(): " + datasetTableIncrementalConfig.getIncrementalAdd()); System.out.println("datasetTableIncrementalConfig.getIncrementalAdd(): " + datasetTableIncrementalConfig.getIncrementalAdd());
String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalAdd().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
if(!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")){ if (!isExitFile("job_add_" + datasetTableId + ".kjb") || !isExitFile("trans_add_" + datasetTableId + ".ktr")) {
generateTransFile("incremental_add", datasetTable, datasource, table, datasetTableFields, sql); generateTransFile("incremental_add", datasetTable, datasource, table, datasetTableFields, sql);
generateJobFile("incremental_add", datasetTable); generateJobFile("incremental_add", datasetTable);
} }
...@@ -193,39 +199,39 @@ public class ExtractDataService { ...@@ -193,39 +199,39 @@ public class ExtractDataService {
} }
// 增量删除 // 增量删除
if( StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())){ if (StringUtils.isNotEmpty(datasetTableIncrementalConfig.getIncrementalDelete())) {
String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString() String sql = datasetTableIncrementalConfig.getIncrementalDelete().replace(lastUpdateTime, dataSetTaskLogDTOS.get(0).getStartTime().toString()
.replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString())); .replace(currentUpdateTime, Long.valueOf(System.currentTimeMillis()).toString()));
if(!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")){ if (!isExitFile("job_delete_" + datasetTableId + ".kjb") || !isExitFile("trans_delete_" + datasetTableId + ".ktr")) {
generateTransFile("incremental_delete", datasetTable, datasource, table, datasetTableFields, sql); generateTransFile("incremental_delete", datasetTable, datasource, table, datasetTableFields, sql);
generateJobFile("incremental_delete", datasetTable); generateJobFile("incremental_delete", datasetTable);
} }
extractData(datasetTable, "incremental_delete"); extractData(datasetTable, "incremental_delete");
} }
// after sync complete,read data to cache from HBase
sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name()); datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis()); datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
break; break;
} }
}catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
LogUtil.error("ExtractData error, dataaset: " + datasetTableId); LogUtil.error("ExtractData error, dataaset: " + datasetTableId);
LogUtil.error(e.getMessage(), e); LogUtil.error(e.getMessage(), e);
datasetTableTaskLog.setStatus(JobStatus.Error.name()); datasetTableTaskLog.setStatus(JobStatus.Error.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis()); datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
} } finally {
finally {
DatasetTableTask datasetTableTask = dataSetTableTaskService.get(taskId); DatasetTableTask datasetTableTask = dataSetTableTaskService.get(taskId);
if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())){ if (datasetTableTask != null && datasetTableTask.getRate().equalsIgnoreCase(ScheduleType.SIMPLE.toString())) {
datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString()); datasetTableTask.setRate(ScheduleType.SIMPLE_COMPLETE.toString());
dataSetTableTaskService.update(datasetTableTask); dataSetTableTaskService.update(datasetTableTask);
} }
} }
} }
private void writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId){ private void writeDatasetTableTaskLog(DatasetTableTaskLog datasetTableTaskLog, String datasetTableId, String taskId) {
datasetTableTaskLog.setTableId(datasetTableId); datasetTableTaskLog.setTableId(datasetTableId);
datasetTableTaskLog.setTaskId(taskId); datasetTableTaskLog.setTaskId(taskId);
datasetTableTaskLog.setStatus(JobStatus.Underway.name()); datasetTableTaskLog.setStatus(JobStatus.Underway.name());
...@@ -233,7 +239,7 @@ public class ExtractDataService { ...@@ -233,7 +239,7 @@ public class ExtractDataService {
dataSetTableTaskLogService.save(datasetTableTaskLog); dataSetTableTaskLogService.save(datasetTableTaskLog);
} }
private void creatHaseTable(TableName tableName, Admin admin, List<String> columnFamily)throws Exception{ private void creatHaseTable(TableName tableName, Admin admin, List<String> columnFamily) throws Exception {
TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName); TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
Collection<ColumnFamilyDescriptor> families = new ArrayList<>(); Collection<ColumnFamilyDescriptor> families = new ArrayList<>();
for (String s : columnFamily) { for (String s : columnFamily) {
...@@ -245,11 +251,11 @@ public class ExtractDataService { ...@@ -245,11 +251,11 @@ public class ExtractDataService {
admin.createTable(desc); admin.createTable(desc);
} }
private void extractData(DatasetTable datasetTable, String extractType)throws Exception{ private void extractData(DatasetTable datasetTable, String extractType) throws Exception {
KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class); KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class);
RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree(); RepositoryDirectoryInterface repositoryDirectoryInterface = repository.loadRepositoryDirectoryTree();
JobMeta jobMeta = null; JobMeta jobMeta = null;
switch (extractType){ switch (extractType) {
case "all_scope": case "all_scope":
jobMeta = repository.loadJob("job_" + datasetTable.getId(), repositoryDirectoryInterface, null, null); jobMeta = repository.loadJob("job_" + datasetTable.getId(), repositoryDirectoryInterface, null, null);
break; break;
...@@ -272,27 +278,27 @@ public class ExtractDataService { ...@@ -272,27 +278,27 @@ public class ExtractDataService {
do { do {
jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0); jobStatus = remoteSlaveServer.getJobStatus(jobMeta.getName(), lastCarteObjectId, 0);
} while (jobStatus != null && jobStatus.isRunning()); } while (jobStatus != null && jobStatus.isRunning());
if(jobStatus.getStatusDescription().equals("Finished")){ if (jobStatus.getStatusDescription().equals("Finished")) {
return; return;
}else { } else {
throw new Exception(jobStatus.getLoggingString()); throw new Exception(jobStatus.getLoggingString());
} }
} }
private synchronized Connection getConnection() throws Exception{ private synchronized Connection getConnection() throws Exception {
if(connection == null || connection.isClosed()){ if (connection == null || connection.isClosed()) {
Configuration cfg = CommonBeanFactory.getBean(Configuration.class); Configuration cfg = CommonBeanFactory.getBean(Configuration.class);
connection = ConnectionFactory.createConnection(cfg, pool); connection = ConnectionFactory.createConnection(cfg, pool);
} }
return connection; return connection;
} }
private boolean isExitFile(String fileName){ private boolean isExitFile(String fileName) {
File file=new File(root_path + fileName); File file = new File(root_path + fileName);
return file.exists(); return file.exists();
} }
private SlaveServer getSlaveServer(){ private SlaveServer getSlaveServer() {
SlaveServer remoteSlaveServer = new SlaveServer(); SlaveServer remoteSlaveServer = new SlaveServer();
remoteSlaveServer.setHostname(carte);// 设置远程IP remoteSlaveServer.setHostname(carte);// 设置远程IP
remoteSlaveServer.setPort(port);// 端口 remoteSlaveServer.setPort(port);// 端口
...@@ -301,14 +307,14 @@ public class ExtractDataService { ...@@ -301,14 +307,14 @@ public class ExtractDataService {
return remoteSlaveServer; return remoteSlaveServer;
} }
private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception{ private void generateJobFile(String extractType, DatasetTable datasetTable) throws Exception {
String jobName = null; String jobName = null;
switch (extractType) { switch (extractType) {
case "all_scope": case "all_scope":
jobName = "job_" + datasetTable.getId(); jobName = "job_" + datasetTable.getId();
break; break;
case "incremental_add": case "incremental_add":
jobName = "job_add_" + datasetTable.getId(); jobName = "job_add_" + datasetTable.getId();
break; break;
case "incremental_delete": case "incremental_delete":
jobName = "job_delete_" + datasetTable.getId(); jobName = "job_delete_" + datasetTable.getId();
...@@ -323,7 +329,7 @@ public class ExtractDataService { ...@@ -323,7 +329,7 @@ public class ExtractDataService {
transName = "trans_" + datasetTable.getId(); transName = "trans_" + datasetTable.getId();
break; break;
case "incremental_add": case "incremental_add":
transName = "trans_add_" + datasetTable.getId(); transName = "trans_add_" + datasetTable.getId();
break; break;
case "incremental_delete": case "incremental_delete":
transName = "trans_delete_" + datasetTable.getId(); transName = "trans_delete_" + datasetTable.getId();
...@@ -364,11 +370,11 @@ public class ExtractDataService { ...@@ -364,11 +370,11 @@ public class ExtractDataService {
jobMeta.addJobHop(greenHop); jobMeta.addJobHop(greenHop);
String jobXml = jobMeta.getXML(); String jobXml = jobMeta.getXML();
File file = new File( root_path + jobName + ".kjb"); File file = new File(root_path + jobName + ".kjb");
FileUtils.writeStringToFile(file, jobXml, "UTF-8"); FileUtils.writeStringToFile(file, jobXml, "UTF-8");
} }
private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception{ private void generateTransFile(String extractType, DatasetTable datasetTable, Datasource datasource, String table, List<DatasetTableField> datasetTableFields, String selectSQL) throws Exception {
TransMeta transMeta = new TransMeta(); TransMeta transMeta = new TransMeta();
String transName = null; String transName = null;
switch (extractType) { switch (extractType) {
...@@ -377,7 +383,7 @@ public class ExtractDataService { ...@@ -377,7 +383,7 @@ public class ExtractDataService {
selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new)); selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
break; break;
case "incremental_add": case "incremental_add":
transName = "trans_add_" + datasetTable.getId(); transName = "trans_add_" + datasetTable.getId();
break; break;
case "incremental_delete": case "incremental_delete":
transName = "trans_delete_" + datasetTable.getId(); transName = "trans_delete_" + datasetTable.getId();
...@@ -450,11 +456,11 @@ public class ExtractDataService { ...@@ -450,11 +456,11 @@ public class ExtractDataService {
RuntimeTestActionHandler defaultHandler = null; RuntimeTestActionHandler defaultHandler = null;
RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler); RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler);
RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>( Arrays.asList( mock( RuntimeTest.class ) ) ), mock( ExecutorService.class ), "modules"); RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules");
Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes()); Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes());
for (DatasetTableField datasetTableField : datasetTableFields) { for (DatasetTableField datasetTableField : datasetTableFields) {
put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes()); put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes());
} }
put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes()); put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes());
TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings); TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
...@@ -466,7 +472,7 @@ public class ExtractDataService { ...@@ -466,7 +472,7 @@ public class ExtractDataService {
hBaseOutputMeta.setTargetMappingName("target_mapping"); hBaseOutputMeta.setTargetMappingName("target_mapping");
hBaseOutputMeta.setNamedCluster(clusterTemplate); hBaseOutputMeta.setNamedCluster(clusterTemplate);
hBaseOutputMeta.setCoreConfigURL(hbase_conf_file); hBaseOutputMeta.setCoreConfigURL(hbase_conf_file);
if(extractType.equalsIgnoreCase("incremental_delete")){ if (extractType.equalsIgnoreCase("incremental_delete")) {
hBaseOutputMeta.setDeleteRowKey(true); hBaseOutputMeta.setDeleteRowKey(true);
} }
StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta); StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta);
......
package io.dataease.service.spark;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashMap;
import java.util.Map;
/**
* @Author gin
* @Date 2021/4/13 12:32 下午
*/
public class CacheUtil {
private static CacheUtil cacheUtil;
private static Map<String, Dataset<Row>> cacheMap;
private CacheUtil(){
cacheMap = new HashMap<String, Dataset<Row>>();
}
public static CacheUtil getInstance(){
if (cacheUtil == null){
cacheUtil = new CacheUtil();
}
return cacheUtil;
}
/**
* 添加缓存
* @param key
* @param obj
*/
public void addCacheData(String key,Dataset<Row> obj){
cacheMap.put(key,obj);
}
/**
* 取出缓存
* @param key
* @return
*/
public Dataset<Row> getCacheData(String key){
return cacheMap.get(key);
}
/**
* 清楚缓存
* @param key
*/
public void removeCacheData(String key){
cacheMap.remove(key);
}
}
package io.dataease.service.spark; package io.dataease.service.spark;
import io.dataease.base.domain.DatasetTableField;
import io.dataease.commons.utils.CommonBeanFactory; import io.dataease.commons.utils.CommonBeanFactory;
import io.dataease.dto.chart.ChartViewFieldDTO; import io.dataease.dto.chart.ChartViewFieldDTO;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
...@@ -42,21 +44,56 @@ public class SparkCalc { ...@@ -42,21 +44,56 @@ public class SparkCalc {
@Resource @Resource
private Environment env; // 保存了配置文件的信息 private Environment env; // 保存了配置文件的信息
public List<String[]> getData(String hTable, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable) throws Exception { public List<String[]> getData(String hTable, List<DatasetTableField> fields, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable) throws Exception {
// Spark Context
SparkSession spark = CommonBeanFactory.getBean(SparkSession.class);
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
// Spark SQL Context
// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
SQLContext sqlContext = new SQLContext(sparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
Dataset<Row> dataFrame = CacheUtil.getInstance().getCacheData(hTable);
if (ObjectUtils.isEmpty(dataFrame)) {
dataFrame = getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields);
}
dataFrame.createOrReplaceTempView(tmpTable);
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable));
// transform
List<String[]> data = new ArrayList<>();
List<Row> list = sql.collectAsList();
for (Row row : list) {
String[] r = new String[row.length()];
for (int i = 0; i < row.length(); i++) {
r[i] = row.get(i) == null ? "null" : row.get(i).toString();
}
data.add(r);
}
return data;
}
public Dataset<Row> getHBaseDataAndCache(String hTable, List<DatasetTableField> fields) throws Exception {
// Spark Context
SparkSession spark = CommonBeanFactory.getBean(SparkSession.class);
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
// Spark SQL Context
// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
SQLContext sqlContext = new SQLContext(sparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
return getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields);
}
public Dataset<Row> getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List<DatasetTableField> fields) throws Exception {
Scan scan = new Scan(); Scan scan = new Scan();
scan.addFamily(column_family.getBytes()); scan.addFamily(column_family.getBytes());
ClientProtos.Scan proto = ProtobufUtil.toScan(scan); ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray())); String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));
// Spark Context
// JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class);
SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]"))
.config("spark.scheduler.mode", "FAIR")
.getOrCreate();
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
// HBase config // HBase config
// Configuration conf = CommonBeanFactory.getBean(Configuration.class); // Configuration conf = CommonBeanFactory.getBean(Configuration.class);
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
...@@ -73,7 +110,7 @@ public class SparkCalc { ...@@ -73,7 +110,7 @@ public class SparkCalc {
while (tuple2Iterator.hasNext()) { while (tuple2Iterator.hasNext()) {
Result result = tuple2Iterator.next()._2; Result result = tuple2Iterator.next()._2;
List<Object> list = new ArrayList<>(); List<Object> list = new ArrayList<>();
xAxis.forEach(x -> { fields.forEach(x -> {
String l = Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes())); String l = Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes()));
if (x.getDeType() == 0 || x.getDeType() == 1) { if (x.getDeType() == 0 || x.getDeType() == 1) {
list.add(l); list.add(l);
...@@ -89,22 +126,6 @@ public class SparkCalc { ...@@ -89,22 +126,6 @@ public class SparkCalc {
list.add(Double.valueOf(l)); list.add(Double.valueOf(l));
} }
}); });
yAxis.forEach(y -> {
String l = Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes()));
if (y.getDeType() == 0 || y.getDeType() == 1) {
list.add(l);
} else if (y.getDeType() == 2) {
if (StringUtils.isEmpty(l)) {
l = "0";
}
list.add(Long.valueOf(l));
} else if (y.getDeType() == 3) {
if (StringUtils.isEmpty(l)) {
l = "0.0";
}
list.add(Double.valueOf(l));
}
});
iterator.add(RowFactory.create(list.toArray())); iterator.add(RowFactory.create(list.toArray()));
} }
return iterator.iterator(); return iterator.iterator();
...@@ -112,7 +133,7 @@ public class SparkCalc { ...@@ -112,7 +133,7 @@ public class SparkCalc {
List<StructField> structFields = new ArrayList<>(); List<StructField> structFields = new ArrayList<>();
// struct顺序要与rdd顺序一致 // struct顺序要与rdd顺序一致
xAxis.forEach(x -> { fields.forEach(x -> {
if (x.getDeType() == 0 || x.getDeType() == 1) { if (x.getDeType() == 0 || x.getDeType() == 1) {
structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true)); structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true));
} else if (x.getDeType() == 2) { } else if (x.getDeType() == 2) {
...@@ -121,40 +142,15 @@ public class SparkCalc { ...@@ -121,40 +142,15 @@ public class SparkCalc {
structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true)); structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true));
} }
}); });
yAxis.forEach(y -> {
if (y.getDeType() == 0 || y.getDeType() == 1) {
structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.StringType, true));
} else if (y.getDeType() == 2) {
structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.LongType, true));
} else if (y.getDeType() == 3) {
structFields.add(DataTypes.createStructField(y.getOriginName(), DataTypes.DoubleType, true));
}
});
StructType structType = DataTypes.createStructType(structFields); StructType structType = DataTypes.createStructType(structFields);
// Spark SQL Context Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType).persist();
// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); CacheUtil.getInstance().addCacheData(hTable, dataFrame);
SQLContext sqlContext = new SQLContext(sparkContext); dataFrame.count();
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1")); return dataFrame;
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType);
dataFrame.createOrReplaceTempView(tmpTable);
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable));
// transform
List<String[]> data = new ArrayList<>();
List<Row> list = sql.collectAsList();
for (Row row : list) {
String[] r = new String[row.length()];
for (int i = 0; i < row.length(); i++) {
r[i] = row.get(i) == null ? "null" : row.get(i).toString();
}
data.add(r);
}
return data;
} }
private String getSQL(List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String table) { public String getSQL(List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String table) {
// 字段汇总 排序等 // 字段汇总 排序等
String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new); String[] field = yAxis.stream().map(y -> "CAST(" + y.getSummary() + "(" + y.getOriginName() + ") AS DECIMAL(20,2)) AS _" + y.getSummary() + "_" + y.getOriginName()).toArray(String[]::new);
String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new); String[] group = xAxis.stream().map(ChartViewFieldDTO::getOriginName).toArray(String[]::new);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论