提交 f3fad450 authored 作者: taojinlong's avatar taojinlong

feat: 测试性能

上级 6b285d73
......@@ -22,15 +22,15 @@ public class CommonConfig {
private Environment env; // 保存了配置文件的信息
private static String root_path = "/opt/dataease/data/kettle/";
@Bean
@ConditionalOnMissingBean
public org.apache.hadoop.conf.Configuration configuration() {
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
return configuration;
}
// @Bean
// @ConditionalOnMissingBean
// public org.apache.hadoop.conf.Configuration configuration() {
// org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
// configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
// configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
// configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
// return configuration;
// }
@Bean
@ConditionalOnMissingBean
......
......@@ -7,6 +7,7 @@ import io.dataease.datasource.dto.MysqlConfigrationDTO;
import io.dataease.datasource.dto.SqlServerConfigration;
import io.dataease.datasource.dto.TableFiled;
import io.dataease.datasource.request.DatasourceRequest;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
......@@ -39,6 +40,23 @@ public class JdbcProvider extends DatasourceProvider {
return list;
}
@VisibleForTesting
public void exec(DatasourceRequest datasourceRequest) throws Exception {
Connection connection = null;
try {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
stat.execute(datasourceRequest.getQuery());
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
throw new Exception("ERROR:" + e.getMessage(), e);
}finally {
returnSource(connection, datasourceRequest.getDatasource().getId());
}
}
@Override
public ResultSet getDataResultSet(DatasourceRequest datasourceRequest) throws Exception {
ResultSet rs;
......@@ -47,7 +65,6 @@ public class JdbcProvider extends DatasourceProvider {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
rs = stat.executeQuery(datasourceRequest.getQuery());
returnSource(connection, datasourceRequest.getDatasource().getId());
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
} catch (Exception e) {
......@@ -66,7 +83,6 @@ public class JdbcProvider extends DatasourceProvider {
connection = getConnectionFromPool(datasourceRequest);
Statement stat = connection.createStatement();
ResultSet rs = stat.executeQuery(datasourceRequest.getQuery() + MessageFormat.format(" LIMIT {0}, {1}", (datasourceRequest.getStartPage() - 1) * datasourceRequest.getPageSize(), datasourceRequest.getPageSize()));
returnSource(connection, datasourceRequest.getDatasource().getId());
list = fetchResult(rs);
} catch (SQLException e) {
throw new Exception("ERROR:" + e.getMessage(), e);
......@@ -174,8 +190,6 @@ public class JdbcProvider extends DatasourceProvider {
return list;
}
;
@Override
public void test(DatasourceRequest datasourceRequest) throws Exception {
String queryStr = getTablesSql(datasourceRequest);
......
......@@ -33,20 +33,20 @@ public class AppStartReadHBaseListener implements ApplicationListener<Applicatio
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
System.out.println("================= Read HBase start =================");
// 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存
DatasetTableExample datasetTableExample = new DatasetTableExample();
datasetTableExample.createCriteria().andModeEqualTo(1);
List<DatasetTable> datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample);
for (DatasetTable table : datasetTables) {
// commonThreadPool.addTask(() -> {
try {
List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
sparkCalc.getHBaseDataAndCache(table.getId(), fields);
} catch (Exception e) {
e.printStackTrace();
}
// });
}
// System.out.println("================= Read HBase start =================");
// // 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存
// DatasetTableExample datasetTableExample = new DatasetTableExample();
// datasetTableExample.createCriteria().andModeEqualTo(1);
// List<DatasetTable> datasetTables = datasetTableMapper.selectByExampleWithBLOBs(datasetTableExample);
// for (DatasetTable table : datasetTables) {
//// commonThreadPool.addTask(() -> {
// try {
// List<DatasetTableField> fields = dataSetTableFieldsService.getFieldsByTableId(table.getId());
// sparkCalc.getHBaseDataAndCache(table.getId(), fields);
// } catch (Exception e) {
// e.printStackTrace();
// }
//// });
// }
}
}
......@@ -56,6 +56,9 @@ import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.pentaho.di.trans.steps.textfileoutput.TextFileField;
import org.pentaho.di.trans.steps.textfileoutput.TextFileOutput;
import org.pentaho.di.trans.steps.textfileoutput.TextFileOutputMeta;
import org.pentaho.di.trans.steps.userdefinedjavaclass.InfoStepDefinition;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassDef;
import org.pentaho.di.trans.steps.userdefinedjavaclass.UserDefinedJavaClassMeta;
......@@ -105,6 +108,7 @@ public class ExtractDataService {
private static String currentUpdateTime = "${__current_update_time__}";
private static String dataease_column_family = "dataease";
private static String root_path = "/opt/dataease/data/kettle/";
private static String data_path = "/opt/dataease/data/db/";
private static String hbase_conf_file = "/opt/dataease/conf/hbase-site.xml";
private static String pentaho_mappings = "pentaho_mappings";
......@@ -129,7 +133,7 @@ public class ExtractDataService {
DatasetTableTaskLog datasetTableTaskLog = new DatasetTableTaskLog();
UpdateType updateType = UpdateType.valueOf(type);
try {
Admin admin = getConnection().getAdmin();
// Admin admin = getConnection().getAdmin();
DatasetTable datasetTable = dataSetTableService.get(datasetTableId);
Datasource datasource = datasourceMapper.selectByPrimaryKey(datasetTable.getDataSourceId());
List<DatasetTableField> datasetTableFields = dataSetTableFieldsService.list(DatasetTableField.builder().tableId(datasetTable.getId()).build());
......@@ -141,10 +145,10 @@ public class ExtractDataService {
writeDatasetTableTaskLog(datasetTableTaskLog, datasetTableId, taskId);
//check pentaho_mappings table
TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
if (!admin.tableExists(pentaho_mappings)) {
creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key"));
}
// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
// if (!admin.tableExists(pentaho_mappings)) {
// creatHaseTable(pentaho_mappings, admin, Arrays.asList("columns", "key"));
// }
//check pentaho files
if (!isExitFile("job_" + datasetTableId + ".kjb") || !isExitFile("trans_" + datasetTableId + ".ktr")) {
......@@ -152,25 +156,25 @@ public class ExtractDataService {
generateJobFile("all_scope", datasetTable);
}
if (!admin.tableExists(hbaseTable)) {
creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family));
}
admin.disableTable(hbaseTable);
admin.truncateTable(hbaseTable, true);
// if (!admin.tableExists(hbaseTable)) {
// creatHaseTable(hbaseTable, admin, Arrays.asList(dataease_column_family));
// }
// admin.disableTable(hbaseTable);
// admin.truncateTable(hbaseTable, true);
extractData(datasetTable, "all_scope");
// after sync complete,read data to cache from HBase
sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
break;
case add_scope:
// 增量更新
if (!admin.tableExists(hbaseTable)) {
LogUtil.error("TableName error, dataaset: " + datasetTableId);
return;
}
// if (!admin.tableExists(hbaseTable)) {
// LogUtil.error("TableName error, dataaset: " + datasetTableId);
// return;
// }
DatasetTableIncrementalConfig datasetTableIncrementalConfig = dataSetTableService.incrementalConfig(datasetTableId);
if (datasetTableIncrementalConfig == null || StringUtils.isEmpty(datasetTableIncrementalConfig.getTableId())) {
return;
......@@ -209,7 +213,7 @@ public class ExtractDataService {
extractData(datasetTable, "incremental_delete");
}
// after sync complete,read data to cache from HBase
sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
// sparkCalc.getHBaseDataAndCache(datasetTableId, dataSetTableFieldsService.getFieldsByTableId(datasetTableId));
datasetTableTaskLog.setStatus(JobStatus.Completed.name());
datasetTableTaskLog.setEndTime(System.currentTimeMillis());
dataSetTableTaskLogService.save(datasetTableTaskLog);
......@@ -239,17 +243,17 @@ public class ExtractDataService {
dataSetTableTaskLogService.save(datasetTableTaskLog);
}
private void creatHaseTable(TableName tableName, Admin admin, List<String> columnFamily) throws Exception {
TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
Collection<ColumnFamilyDescriptor> families = new ArrayList<>();
for (String s : columnFamily) {
ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s);
families.add(hcd);
}
descBuilder.setColumnFamilies(families);
TableDescriptor desc = descBuilder.build();
admin.createTable(desc);
}
// private void creatHaseTable(TableName tableName, Admin admin, List<String> columnFamily) throws Exception {
// TableDescriptorBuilder descBuilder = TableDescriptorBuilder.newBuilder(tableName);
// Collection<ColumnFamilyDescriptor> families = new ArrayList<>();
// for (String s : columnFamily) {
// ColumnFamilyDescriptor hcd = ColumnFamilyDescriptorBuilder.of(s);
// families.add(hcd);
// }
// descBuilder.setColumnFamilies(families);
// TableDescriptor desc = descBuilder.build();
// admin.createTable(desc);
// }
private void extractData(DatasetTable datasetTable, String extractType) throws Exception {
KettleFileRepository repository = CommonBeanFactory.getBean(KettleFileRepository.class);
......@@ -285,13 +289,13 @@ public class ExtractDataService {
}
}
private synchronized Connection getConnection() throws Exception {
if (connection == null || connection.isClosed()) {
Configuration cfg = CommonBeanFactory.getBean(Configuration.class);
connection = ConnectionFactory.createConnection(cfg, pool);
}
return connection;
}
// private synchronized Connection getConnection() throws Exception {
// if (connection == null || connection.isClosed()) {
// Configuration cfg = CommonBeanFactory.getBean(Configuration.class);
// connection = ConnectionFactory.createConnection(cfg, pool);
// }
// return connection;
// }
private boolean isExitFile(String fileName) {
File file = new File(root_path + fileName);
......@@ -380,6 +384,15 @@ public class ExtractDataService {
switch (extractType) {
case "all_scope":
transName = "trans_" + datasetTable.getId();
datasetTableFields.sort((o1, o2) -> {
if (o1.getOriginName() == null) {
return -1;
}
if (o2.getOriginName() == null) {
return 1;
}
return o1.getOriginName().compareTo(o2.getOriginName());
});
selectSQL = dataSetTableService.createQuerySQL(datasource.getType(), table, datasetTableFields.stream().map(DatasetTableField::getOriginName).toArray(String[]::new));
break;
case "incremental_add":
......@@ -422,70 +435,90 @@ public class ExtractDataService {
fromStep.setLocation(100, 100);
transMeta.addStep(fromStep);
//第二个 (User defined Java class)
UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta();
List<UserDefinedJavaClassMeta.FieldInfo> fields = new ArrayList<>();
UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1);
fields.add(fieldInfo);
userDefinedJavaClassMeta.setFieldInfo(fields);
List<UserDefinedJavaClassDef> definitions = new ArrayList<UserDefinedJavaClassDef>();
UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code);
userDefinedJavaClassDef.setActive(true);
definitions.add(userDefinedJavaClassDef);
userDefinedJavaClassMeta.replaceDefinitions(definitions);
StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta);
userDefinedJavaClassStep.setLocation(300, 100);
userDefinedJavaClassStep.setDraw(true);
transMeta.addStep(userDefinedJavaClassStep);
//第三个 (HBaseOutputMeta)
NamedClusterService namedClusterService = new NamedClusterManager();
NamedCluster clusterTemplate = new NamedClusterImpl();
clusterTemplate.setName("hadoop");
clusterTemplate.setZooKeeperHost(zkHost);
clusterTemplate.setZooKeeperPort(zkPort);
clusterTemplate.setStorageScheme("HDFS");
namedClusterService.setClusterTemplate(clusterTemplate);
List<ClusterInitializerProvider> providers = new ArrayList<>();
ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers);
NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer);
List<RuntimeTestActionHandler> runtimeTestActionHandlers = new ArrayList<>();
RuntimeTestActionHandler defaultHandler = null;
RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler);
RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules");
Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes());
for (DatasetTableField datasetTableField : datasetTableFields) {
put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes());
}
put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes());
TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
Table tab = getConnection().getTable(pentaho_mappings);
tab.put(put);
HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester);
hBaseOutputMeta.setTargetTableName(datasetTable.getId());
hBaseOutputMeta.setTargetMappingName("target_mapping");
hBaseOutputMeta.setNamedCluster(clusterTemplate);
hBaseOutputMeta.setCoreConfigURL(hbase_conf_file);
hBaseOutputMeta.setDisableWriteToWAL(true);
hBaseOutputMeta.setWriteBufferSize("31457280"); //30M
if (extractType.equalsIgnoreCase("incremental_delete")) {
hBaseOutputMeta.setDeleteRowKey(true);
}
StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta);
//第二个 (TextFileOutput)
TextFileOutputMeta textFileOutputMeta = new TextFileOutputMeta();
textFileOutputMeta.setFilename(data_path + datasetTable.getId());
textFileOutputMeta.setExtension("txt");
textFileOutputMeta.setSeparator(";");
textFileOutputMeta.setFileCompression("None");
textFileOutputMeta.setEnclosure("\"");
textFileOutputMeta.setEncoding("UTF-8");
TextFileField[] outputFields = new TextFileField[1];
outputFields[0] = new TextFileField();
textFileOutputMeta.setOutputFields(outputFields);
StepMeta tostep = new StepMeta("TextFileOutput", "TextFileOutput", textFileOutputMeta);
tostep.setLocation(600, 100);
tostep.setDraw(true);
transMeta.addStep(tostep);
TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep);
TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep);
TransHopMeta hi1 = new TransHopMeta(fromStep, tostep);
transMeta.addTransHop(hi1);
transMeta.addTransHop(hi2);
// //第二个 (User defined Java class)
// UserDefinedJavaClassMeta userDefinedJavaClassMeta = new UserDefinedJavaClassMeta();
// List<UserDefinedJavaClassMeta.FieldInfo> fields = new ArrayList<>();
// UserDefinedJavaClassMeta.FieldInfo fieldInfo = new UserDefinedJavaClassMeta.FieldInfo("uuid", ValueMetaInterface.TYPE_STRING, -1, -1);
// fields.add(fieldInfo);
// userDefinedJavaClassMeta.setFieldInfo(fields);
// List<UserDefinedJavaClassDef> definitions = new ArrayList<UserDefinedJavaClassDef>();
// UserDefinedJavaClassDef userDefinedJavaClassDef = new UserDefinedJavaClassDef(UserDefinedJavaClassDef.ClassType.TRANSFORM_CLASS, "Processor", code);
// userDefinedJavaClassDef.setActive(true);
// definitions.add(userDefinedJavaClassDef);
// userDefinedJavaClassMeta.replaceDefinitions(definitions);
//
// StepMeta userDefinedJavaClassStep = new StepMeta("UserDefinedJavaClass", "UserDefinedJavaClass", userDefinedJavaClassMeta);
// userDefinedJavaClassStep.setLocation(300, 100);
// userDefinedJavaClassStep.setDraw(true);
// transMeta.addStep(userDefinedJavaClassStep);
//
// //第三个 (HBaseOutputMeta)
// NamedClusterService namedClusterService = new NamedClusterManager();
// NamedCluster clusterTemplate = new NamedClusterImpl();
// clusterTemplate.setName("hadoop");
// clusterTemplate.setZooKeeperHost(zkHost);
// clusterTemplate.setZooKeeperPort(zkPort);
// clusterTemplate.setStorageScheme("HDFS");
// namedClusterService.setClusterTemplate(clusterTemplate);
//
// List<ClusterInitializerProvider> providers = new ArrayList<>();
// ClusterInitializer clusterInitializer = new ClusterInitializerImpl(providers);
// NamedClusterServiceLocator namedClusterServiceLocator = new NamedClusterServiceLocatorImpl(clusterInitializer);
//
// List<RuntimeTestActionHandler> runtimeTestActionHandlers = new ArrayList<>();
// RuntimeTestActionHandler defaultHandler = null;
//
// RuntimeTestActionService runtimeTestActionService = new RuntimeTestActionServiceImpl(runtimeTestActionHandlers, defaultHandler);
// RuntimeTester runtimeTester = new RuntimeTesterImpl(new ArrayList<>(Arrays.asList(mock(RuntimeTest.class))), mock(ExecutorService.class), "modules");
//
// Put put = new Put((datasetTable.getId() + "," + "target_mapping").getBytes());
// for (DatasetTableField datasetTableField : datasetTableFields) {
// put.addColumn("columns".getBytes(), (dataease_column_family + "," + datasetTableField.getOriginName() + "," + datasetTableField.getOriginName()).getBytes(), transToColumnType(datasetTableField.getDeType()).getBytes());
// }
// put.addColumn("key".getBytes(), "uuid".getBytes(), "String".getBytes());
// TableName pentaho_mappings = TableName.valueOf(this.pentaho_mappings);
// Table tab = getConnection().getTable(pentaho_mappings);
// tab.put(put);
//
// HBaseOutputMeta hBaseOutputMeta = new HBaseOutputMeta(namedClusterService, namedClusterServiceLocator, runtimeTestActionService, runtimeTester);
// hBaseOutputMeta.setTargetTableName(datasetTable.getId());
// hBaseOutputMeta.setTargetMappingName("target_mapping");
// hBaseOutputMeta.setNamedCluster(clusterTemplate);
// hBaseOutputMeta.setCoreConfigURL(hbase_conf_file);
// hBaseOutputMeta.setDisableWriteToWAL(true);
// hBaseOutputMeta.setWriteBufferSize("31457280"); //30M
// if (extractType.equalsIgnoreCase("incremental_delete")) {
// hBaseOutputMeta.setDeleteRowKey(true);
// }
// StepMeta tostep = new StepMeta("HBaseOutput", "HBaseOutput", hBaseOutputMeta);
// tostep.setLocation(600, 100);
//
// tostep.setDraw(true);
// transMeta.addStep(tostep);
// TransHopMeta hi1 = new TransHopMeta(fromStep, userDefinedJavaClassStep);
// TransHopMeta hi2 = new TransHopMeta(userDefinedJavaClassStep, tostep);
// transMeta.addTransHop(hi1);
// transMeta.addTransHop(hi2);
String transXml = transMeta.getXML();
File file = new File(root_path + transName + ".ktr");
......
......@@ -41,6 +41,7 @@ import java.util.List;
@Service
public class SparkCalc {
private static String column_family = "dataease";
private static String data_path = "/opt/dataease/data/db/";
@Resource
private Environment env; // 保存了配置文件的信息
......@@ -54,12 +55,13 @@ public class SparkCalc {
sqlContext.setConf("spark.sql.shuffle.partitions", env.getProperty("spark.sql.shuffle.partitions", "1"));
sqlContext.setConf("spark.default.parallelism", env.getProperty("spark.default.parallelism", "1"));
Dataset<Row> dataFrame = CacheUtil.getInstance().getCacheData(hTable);
if (ObjectUtils.isEmpty(dataFrame)) {
dataFrame = getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields);
}
Dataset<Row> dataFrame = getData(sparkContext, sqlContext, hTable, fields);
// Dataset<Row> dataFrame = CacheUtil.getInstance().getCacheData(hTable);
// if (ObjectUtils.isEmpty(dataFrame)) {
// dataFrame = getData(sparkContext, sqlContext, hTable, fields);
// }
dataFrame.createOrReplaceTempView(tmpTable);
dataFrame.createOrReplaceTempView( tmpTable);
Dataset<Row> sql = sqlContext.sql(getSQL(xAxis, yAxis, tmpTable, requestList));
// transform
List<String[]> data = new ArrayList<>();
......@@ -86,6 +88,69 @@ public class SparkCalc {
return getHBaseDataAndCache(sparkContext, sqlContext, hTable, fields);
}
public Dataset<Row> getData(JavaSparkContext sparkContext, SQLContext sqlContext, String tableId, List<DatasetTableField> fields) throws Exception {
fields.sort((o1, o2) -> {
if (o1.getOriginName() == null) {
return -1;
}
if (o2.getOriginName() == null) {
return 1;
}
return o1.getOriginName().compareTo(o2.getOriginName());
});
JavaRDD<String> pairRDD = sparkContext.textFile(data_path + tableId + ".txt");
JavaRDD<Row> rdd = pairRDD.mapPartitions( (FlatMapFunction<java.util.Iterator<String>, Row>) tuple2Iterator -> {
List<Row> iterator = new ArrayList<>();
while (tuple2Iterator.hasNext()) {
String[] items = tuple2Iterator.next().split(";");
List<Object> list = new ArrayList<>();
for(int i=0; i<items.length; i++){
String l = items[i];
DatasetTableField x = fields.get(i);
if (x.getDeType() == 0 || x.getDeType() == 1) {
list.add(l);
} else if (x.getDeType() == 2) {
if (StringUtils.isEmpty(l)) {
l = "0";
}
if (StringUtils.equalsIgnoreCase(l,"Y")) {
l = "1";
}
if (StringUtils.equalsIgnoreCase(l,"N")) {
l = "0";
}
list.add(Long.valueOf(l));
} else if (x.getDeType() == 3) {
if (StringUtils.isEmpty(l)) {
l = "0.0";
}
list.add(Double.valueOf(l));
}
}
iterator.add(RowFactory.create(list.toArray()));
}
return iterator.iterator();
});
List<StructField> structFields = new ArrayList<>();
// struct顺序要与rdd顺序一致
fields.forEach(x -> {
if (x.getDeType() == 0 || x.getDeType() == 1) {
structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.StringType, true));
} else if (x.getDeType() == 2) {
structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.LongType, true));
} else if (x.getDeType() == 3) {
structFields.add(DataTypes.createStructField(x.getOriginName(), DataTypes.DoubleType, true));
}
});
StructType structType = DataTypes.createStructType(structFields);
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType);
return dataFrame;
}
public Dataset<Row> getHBaseDataAndCache(JavaSparkContext sparkContext, SQLContext sqlContext, String hTable, List<DatasetTableField> fields) throws Exception {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(column_family));
......@@ -145,7 +210,7 @@ public class SparkCalc {
StructType structType = DataTypes.createStructType(structFields);
Dataset<Row> dataFrame = sqlContext.createDataFrame(rdd, structType).persist(StorageLevel.MEMORY_AND_DISK_SER());
CacheUtil.getInstance().addCacheData(hTable, dataFrame);
// CacheUtil.getInstance().addCacheData(hTable, dataFrame);
dataFrame.count();
return dataFrame;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论