提交 cd65f762 authored 作者: junjie's avatar junjie

feat(backend): spark

上级 ab2a840b
package io.dataease.config;
import com.fit2cloud.autoconfigure.QuartzAutoConfiguration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
......@@ -35,8 +34,10 @@ public class CommonConfig {
@Bean
@ConditionalOnMissingBean
public JavaSparkContext javaSparkContext() {
SparkConf conf = new SparkConf().setAppName(env.getProperty("spark.appName", "DataeaseJob")).setMaster(env.getProperty("spark.master", "local[*]"));
SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
SparkSession spark = SparkSession.builder()
.appName(env.getProperty("spark.appName", "DataeaseJob"))
.master(env.getProperty("spark.master", "local[*]"))
.getOrCreate();
JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
return sc;
}
......
......@@ -59,14 +59,22 @@ public class SparkCalc {
if (x.getDeType() == 0 || x.getDeType() == 1) {
list.add(Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes())));
} else if (x.getDeType() == 2) {
list.add(Long.valueOf(Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes()))));
String l = Bytes.toString(result.getValue(column_family.getBytes(), x.getOriginName().getBytes()));
if (StringUtils.isEmpty(l)) {
l = "0";
}
list.add(Long.valueOf(l));
}
});
yAxis.forEach(y -> {
if (y.getDeType() == 0 || y.getDeType() == 1) {
list.add(Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes())));
} else if (y.getDeType() == 2) {
list.add(Long.valueOf(Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes()))));
String l = Bytes.toString(result.getValue(column_family.getBytes(), y.getOriginName().getBytes()));
if (StringUtils.isEmpty(l)) {
l = "0";
}
list.add(Long.valueOf(l));
}
});
return RowFactory.create(list.toArray());
......@@ -99,7 +107,8 @@ public class SparkCalc {
List<String[]> data = new ArrayList<>();
// transform
List<Row> list = sql.javaRDD().collect();
// List<Row> list = sql.javaRDD().collect();
List<Row> list = sql.collectAsList();
for (Row row : list) {
String[] r = new String[row.length()];
for (int i = 0; i < row.length(); i++) {
......@@ -108,6 +117,16 @@ public class SparkCalc {
data.add(r);
}
// Iterator<Row> rowIterator = sql.toLocalIterator();
// while (rowIterator.hasNext()){
// Row row = rowIterator.next();
// String[] r = new String[row.length()];
// for (int i = 0; i < row.length(); i++) {
// r[i] = row.get(i).toString();
// }
// data.add(r);
// }
return data;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论