提交 67a1179b authored 作者: wangjiahao's avatar wangjiahao

Merge remote-tracking branch 'origin/main' into main

...@@ -2,10 +2,7 @@ package io.dataease.service.chart; ...@@ -2,10 +2,7 @@ package io.dataease.service.chart;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken; import com.google.gson.reflect.TypeToken;
import io.dataease.base.domain.ChartViewExample; import io.dataease.base.domain.*;
import io.dataease.base.domain.ChartViewWithBLOBs;
import io.dataease.base.domain.DatasetTable;
import io.dataease.base.domain.Datasource;
import io.dataease.base.mapper.ChartViewMapper; import io.dataease.base.mapper.ChartViewMapper;
import io.dataease.commons.utils.BeanUtils; import io.dataease.commons.utils.BeanUtils;
import io.dataease.controller.request.chart.ChartViewRequest; import io.dataease.controller.request.chart.ChartViewRequest;
...@@ -45,6 +42,7 @@ public class ChartViewService { ...@@ -45,6 +42,7 @@ public class ChartViewService {
private SparkCalc sparkCalc; private SparkCalc sparkCalc;
public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) { public ChartViewWithBLOBs save(ChartViewWithBLOBs chartView) {
checkName(chartView);
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
chartView.setUpdateTime(timestamp); chartView.setUpdateTime(timestamp);
int i = chartViewMapper.updateByPrimaryKeySelective(chartView); int i = chartViewMapper.updateByPrimaryKeySelective(chartView);
...@@ -121,7 +119,8 @@ public class ChartViewService { ...@@ -121,7 +119,8 @@ public class ChartViewService {
data = datasourceProvider.getData(datasourceRequest); data = datasourceProvider.getData(datasourceRequest);
} else if (table.getMode() == 1) {// 抽取 } else if (table.getMode() == 1) {// 抽取
DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class); DataTableInfoDTO dataTableInfoDTO = new Gson().fromJson(table.getInfo(), DataTableInfoDTO.class);
data = sparkCalc.getData(dataTableInfoDTO.getTable() + "-" + table.getDataSourceId(), xAxis, yAxis, "tmp");// todo hBase table name maybe change String tableName = dataTableInfoDTO.getTable() + "-" + table.getDataSourceId();// todo hBase table name maybe change
data = sparkCalc.getData(tableName, xAxis, yAxis, view.getId().split("-")[0]);
} }
// 图表组件可再扩展 // 图表组件可再扩展
...@@ -224,4 +223,25 @@ public class ChartViewService { ...@@ -224,4 +223,25 @@ public class ChartViewService {
return ""; return "";
} }
} }
private void checkName(ChartViewWithBLOBs chartView) {
if (StringUtils.isEmpty(chartView.getId())) {
return;
}
ChartViewExample chartViewExample = new ChartViewExample();
ChartViewExample.Criteria criteria = chartViewExample.createCriteria();
if (StringUtils.isNotEmpty(chartView.getId())) {
criteria.andIdNotEqualTo(chartView.getId());
}
if (StringUtils.isNotEmpty(chartView.getSceneId())) {
criteria.andSceneIdEqualTo(chartView.getSceneId());
}
if (StringUtils.isNotEmpty(chartView.getName())) {
criteria.andNameEqualTo(chartView.getName());
}
List<ChartViewWithBLOBs> list = chartViewMapper.selectByExampleWithBLOBs(chartViewExample);
if (list.size() > 0) {
throw new RuntimeException("Name can't repeat in same group.");
}
}
} }
...@@ -48,6 +48,7 @@ public class DataSetTableService { ...@@ -48,6 +48,7 @@ public class DataSetTableService {
} }
public DatasetTable save(DatasetTable datasetTable) throws Exception { public DatasetTable save(DatasetTable datasetTable) throws Exception {
checkName(datasetTable);
if (StringUtils.isEmpty(datasetTable.getId())) { if (StringUtils.isEmpty(datasetTable.getId())) {
datasetTable.setId(UUID.randomUUID().toString()); datasetTable.setId(UUID.randomUUID().toString());
datasetTable.setCreateTime(System.currentTimeMillis()); datasetTable.setCreateTime(System.currentTimeMillis());
...@@ -364,34 +365,56 @@ public class DataSetTableService { ...@@ -364,34 +365,56 @@ public class DataSetTableService {
} }
} }
public DatasetTableIncrementalConfig incrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig){ public DatasetTableIncrementalConfig incrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig) {
if(StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())){return new DatasetTableIncrementalConfig();} if (StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
return new DatasetTableIncrementalConfig();
}
DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample(); DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample();
example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId()); example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId());
List<DatasetTableIncrementalConfig> configs = datasetTableIncrementalConfigMapper.selectByExample(example); List<DatasetTableIncrementalConfig> configs = datasetTableIncrementalConfigMapper.selectByExample(example);
if(CollectionUtils.isNotEmpty(configs)){ if (CollectionUtils.isNotEmpty(configs)) {
return configs.get(0); return configs.get(0);
}else { } else {
return new DatasetTableIncrementalConfig(); return new DatasetTableIncrementalConfig();
} }
} }
public DatasetTableIncrementalConfig incrementalConfig(String datasetTableId){ public DatasetTableIncrementalConfig incrementalConfig(String datasetTableId) {
DatasetTableIncrementalConfig datasetTableIncrementalConfig = new DatasetTableIncrementalConfig(); DatasetTableIncrementalConfig datasetTableIncrementalConfig = new DatasetTableIncrementalConfig();
datasetTableIncrementalConfig.setTableId(datasetTableId); datasetTableIncrementalConfig.setTableId(datasetTableId);
return incrementalConfig(datasetTableIncrementalConfig); return incrementalConfig(datasetTableIncrementalConfig);
} }
public void saveIncrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig){ public void saveIncrementalConfig(DatasetTableIncrementalConfig datasetTableIncrementalConfig) {
if(StringUtils.isEmpty(datasetTableIncrementalConfig.getId())){ if (StringUtils.isEmpty(datasetTableIncrementalConfig.getId())) {
datasetTableIncrementalConfig.setId(UUID.randomUUID().toString()); datasetTableIncrementalConfig.setId(UUID.randomUUID().toString());
datasetTableIncrementalConfigMapper.insertSelective(datasetTableIncrementalConfig); datasetTableIncrementalConfigMapper.insertSelective(datasetTableIncrementalConfig);
}else{ } else {
DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample(); DatasetTableIncrementalConfigExample example = new DatasetTableIncrementalConfigExample();
example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId()); example.createCriteria().andTableIdEqualTo(datasetTableIncrementalConfig.getTableId());
datasetTableIncrementalConfigMapper.updateByExample(datasetTableIncrementalConfig, example); datasetTableIncrementalConfigMapper.updateByExample(datasetTableIncrementalConfig, example);
} }
} }
private void checkName(DatasetTable datasetTable) {
if (StringUtils.isEmpty(datasetTable.getId()) && StringUtils.equalsIgnoreCase("db", datasetTable.getType())) {
return;
}
DatasetTableExample datasetTableExample = new DatasetTableExample();
DatasetTableExample.Criteria criteria = datasetTableExample.createCriteria();
if (StringUtils.isNotEmpty(datasetTable.getId())) {
criteria.andIdNotEqualTo(datasetTable.getId());
}
if (StringUtils.isNotEmpty(datasetTable.getSceneId())) {
criteria.andSceneIdEqualTo(datasetTable.getSceneId());
}
if (StringUtils.isNotEmpty(datasetTable.getName())) {
criteria.andNameEqualTo(datasetTable.getName());
}
List<DatasetTable> list = datasetTableMapper.selectByExample(datasetTableExample);
if (list.size() > 0) {
throw new RuntimeException("Name can't repeat in same group.");
}
}
} }
...@@ -16,16 +16,15 @@ import org.apache.spark.api.java.JavaPairRDD; ...@@ -16,16 +16,15 @@ import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.StructType;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import scala.Tuple2; import scala.Tuple2;
import javax.annotation.Resource;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64; import java.util.Base64;
...@@ -39,6 +38,8 @@ import java.util.List; ...@@ -39,6 +38,8 @@ import java.util.List;
@Service @Service
public class SparkCalc { public class SparkCalc {
private static String column_family = "dataease"; private static String column_family = "dataease";
@Resource
private Environment env; // 保存了配置文件的信息
public List<String[]> getData(String hTable, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable) throws Exception { public List<String[]> getData(String hTable, List<ChartViewFieldDTO> xAxis, List<ChartViewFieldDTO> yAxis, String tmpTable) throws Exception {
Scan scan = new Scan(); Scan scan = new Scan();
...@@ -46,8 +47,21 @@ public class SparkCalc { ...@@ -46,8 +47,21 @@ public class SparkCalc {
ClientProtos.Scan proto = ProtobufUtil.toScan(scan); ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray())); String scanToString = new String(Base64.getEncoder().encode(proto.toByteArray()));
JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class); // Spark Context
Configuration conf = CommonBeanFactory.getBean(Configuration.class); // JavaSparkContext sparkContext = CommonBeanFactory.getBean(JavaSparkContext.class);
SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]"))
.config("spark.scheduler.mode", "FAIR")
.getOrCreate();
JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
// HBase config
// Configuration conf = CommonBeanFactory.getBean(Configuration.class);
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
conf.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
conf.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
conf.set(TableInputFormat.INPUT_TABLE, hTable); conf.set(TableInputFormat.INPUT_TABLE, hTable);
conf.set(TableInputFormat.SCAN, scanToString); conf.set(TableInputFormat.SCAN, scanToString);
...@@ -103,12 +117,15 @@ public class SparkCalc { ...@@ -103,12 +117,15 @@ public class SparkCalc {
}); });
StructType structType = DataTypes.createStructType(structFields); StructType structType = DataTypes.createStructType(structFields);
SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class); // Spark SQL Context
// SQLContext sqlContext = CommonBeanFactory.getBean(SQLContext.class);
SQLContext sqlContext = new SQLContext(sparkContext);
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType); Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType);
dataFrame.createOrReplaceTempView(tmpTable); dataFrame.createOrReplaceTempView(tmpTable);
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable)); Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable));
// transform // transform
List<String[]> data = new ArrayList<>(); List<String[]> data = new ArrayList<>();
List<Row> list = sql.collectAsList(); List<Row> list = sql.collectAsList();
...@@ -119,7 +136,6 @@ public class SparkCalc { ...@@ -119,7 +136,6 @@ public class SparkCalc {
} }
data.add(r); data.add(r);
} }
return data; return data;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论