Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
D
dataease
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
zhu
dataease
Commits
35d50d9f
提交
35d50d9f
authored
4月 27, 2021
作者:
taojinlong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: 抽取数据到doris
上级
f91661e7
全部展开
隐藏空白字符变更
内嵌
并排
正在显示
16 个修改的文件
包含
218 行增加
和
374 行删除
+218
-374
pom.xml
backend/pom.xml
+5
-75
DatasetTableField.java
.../main/java/io/dataease/base/domain/DatasetTableField.java
+6
-3
DatasetTableFieldExample.java
...ava/io/dataease/base/domain/DatasetTableFieldExample.java
+120
-60
DatasetTableFieldMapper.xml
.../java/io/dataease/base/mapper/DatasetTableFieldMapper.xml
+41
-23
DorisTableUtils.java
.../main/java/io/dataease/commons/utils/DorisTableUtils.java
+13
-0
CommonConfig.java
backend/src/main/java/io/dataease/config/CommonConfig.java
+21
-34
DatasourceProvider.java
...a/io/dataease/datasource/provider/DatasourceProvider.java
+5
-8
JdbcProvider.java
...in/java/io/dataease/datasource/provider/JdbcProvider.java
+0
-0
DatasourceService.java
...ava/io/dataease/datasource/service/DatasourceService.java
+0
-29
AppStartInitDataSourceListener.java
.../io/dataease/listener/AppStartInitDataSourceListener.java
+0
-22
DataSetTableService.java
...java/io/dataease/service/dataset/DataSetTableService.java
+5
-66
ExtractDataService.java
.../java/io/dataease/service/dataset/ExtractDataService.java
+0
-0
CacheUtil.java
...nd/src/main/java/io/dataease/service/spark/CacheUtil.java
+0
-53
SparkCalc.java
...nd/src/main/java/io/dataease/service/spark/SparkCalc.java
+0
-0
V14__dataset_table_field.sql
.../main/resources/db/migration/V14__dataset_table_field.sql
+1
-0
generatorConfig.xml
backend/src/main/resources/generatorConfig.xml
+1
-1
没有找到文件。
backend/pom.xml
浏览文件 @
35d50d9f
...
...
@@ -17,7 +17,6 @@
<java.version>
1.8
</java.version>
<graalvm.version>
20.1.0
</graalvm.version>
<jwt.version>
3.12.1
</jwt.version>
<spark.version>
3.1.1
</spark.version>
</properties>
<dependencies>
...
...
@@ -315,70 +314,12 @@
<artifactId>
ehcache
</artifactId>
<version>
2.9.1
</version>
</dependency>
<!-- hbase -->
<dependency>
<groupId>
org.apache.hbase
</groupId>
<artifactId>
hbase-client
</artifactId>
<version>
2.4.1
</version>
</dependency>
<dependency>
<groupId>
org.apache.hbase
</groupId>
<artifactId>
hbase-common
</artifactId>
<version>
2.4.1
</version>
</dependency>
<dependency>
<groupId>
org.apache.hbase
</groupId>
<artifactId>
hbase-mapreduce
</artifactId>
<version>
2.4.1
</version>
</dependency>
<dependency>
<groupId>
org.testng
</groupId>
<artifactId>
testng
</artifactId>
<version>
6.8
</version>
<scope>
test
</scope>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-core_2.12
</artifactId>
<version>
${spark.version}
</version>
<exclusions>
<exclusion>
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-log4j12
</artifactId>
</exclusion>
<exclusion>
<groupId>
log4j
</groupId>
<artifactId>
log4j
</artifactId>
</exclusion>
<exclusion>
<groupId>
org.objenesis
</groupId>
<artifactId>
objenesis
</artifactId>
</exclusion>
</exclusions>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-streaming_2.12
</artifactId>
<version>
${spark.version}
</version>
<scope>
provided
</scope>
</dependency>
<dependency>
<groupId>
org.apache.spark
</groupId>
<artifactId>
spark-sql_2.12
</artifactId>
<version>
${spark.version}
</version>
<exclusions>
<exclusion>
<artifactId>
janino
</artifactId>
<groupId>
org.codehaus.janino
</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>
org.codehaus.janino
</groupId>
<artifactId>
janino
</artifactId>
...
...
@@ -400,27 +341,16 @@
<artifactId>
metastore
</artifactId>
<version>
8.3.0.18-1084
</version>
</dependency>
<dependency>
<groupId>
pentaho
</groupId>
<artifactId>
pentaho-big-data-kettle-plugins-hbase-meta
</artifactId>
<version>
8.3.0.18-1084
</version>
</dependency>
<dependency>
<groupId>
pentaho
</groupId>
<artifactId>
pentaho-big-data-kettle-plugins-hbase
</artifactId>
<version>
8.3.0.18-1084
</version>
</dependency>
<dependency>
<groupId>
pentaho
</groupId>
<artifactId>
pentaho-big-data-impl-cluster
</artifactId>
<version>
8.3.0.18-1084
</version>
</dependency>
<dependency>
<groupId>
org.pentaho.di.plugins
</groupId>
<artifactId>
pdi-engine-configuration-impl
</artifactId>
<version>
8.3.0.7-683
</version>
</dependency>
<dependency>
<groupId>
c3p0
</groupId>
<artifactId>
c3p0
</artifactId>
<version>
0.9.1.2
</version>
</dependency>
</dependencies>
<build>
...
...
backend/src/main/java/io/dataease/base/domain/DatasetTableField.java
浏览文件 @
35d50d9f
...
...
@@ -18,13 +18,15 @@ public class DatasetTableField implements Serializable {
private
String
type
;
private
Integer
size
;
private
Integer
deType
;
private
Boolean
checked
;
private
Integer
columnIndex
;
private
Long
lastSyncTime
;
private
Integer
deType
;
private
static
final
long
serialVersionUID
=
1L
;
}
}
\ No newline at end of file
backend/src/main/java/io/dataease/base/domain/DatasetTableFieldExample.java
浏览文件 @
35d50d9f
...
...
@@ -454,6 +454,126 @@ public class DatasetTableFieldExample {
return
(
Criteria
)
this
;
}
public
Criteria
andSizeIsNull
()
{
addCriterion
(
"`size` is null"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeIsNotNull
()
{
addCriterion
(
"`size` is not null"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeEqualTo
(
Integer
value
)
{
addCriterion
(
"`size` ="
,
value
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeNotEqualTo
(
Integer
value
)
{
addCriterion
(
"`size` <>"
,
value
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeGreaterThan
(
Integer
value
)
{
addCriterion
(
"`size` >"
,
value
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeGreaterThanOrEqualTo
(
Integer
value
)
{
addCriterion
(
"`size` >="
,
value
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeLessThan
(
Integer
value
)
{
addCriterion
(
"`size` <"
,
value
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeLessThanOrEqualTo
(
Integer
value
)
{
addCriterion
(
"`size` <="
,
value
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeIn
(
List
<
Integer
>
values
)
{
addCriterion
(
"`size` in"
,
values
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeNotIn
(
List
<
Integer
>
values
)
{
addCriterion
(
"`size` not in"
,
values
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeBetween
(
Integer
value1
,
Integer
value2
)
{
addCriterion
(
"`size` between"
,
value1
,
value2
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andSizeNotBetween
(
Integer
value1
,
Integer
value2
)
{
addCriterion
(
"`size` not between"
,
value1
,
value2
,
"size"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeIsNull
()
{
addCriterion
(
"de_type is null"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeIsNotNull
()
{
addCriterion
(
"de_type is not null"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type ="
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeNotEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type <>"
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeGreaterThan
(
Integer
value
)
{
addCriterion
(
"de_type >"
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeGreaterThanOrEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type >="
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeLessThan
(
Integer
value
)
{
addCriterion
(
"de_type <"
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeLessThanOrEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type <="
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeIn
(
List
<
Integer
>
values
)
{
addCriterion
(
"de_type in"
,
values
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeNotIn
(
List
<
Integer
>
values
)
{
addCriterion
(
"de_type not in"
,
values
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeBetween
(
Integer
value1
,
Integer
value2
)
{
addCriterion
(
"de_type between"
,
value1
,
value2
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeNotBetween
(
Integer
value1
,
Integer
value2
)
{
addCriterion
(
"de_type not between"
,
value1
,
value2
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andCheckedIsNull
()
{
addCriterion
(
"`checked` is null"
);
return
(
Criteria
)
this
;
...
...
@@ -633,66 +753,6 @@ public class DatasetTableFieldExample {
addCriterion
(
"last_sync_time not between"
,
value1
,
value2
,
"lastSyncTime"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeIsNull
()
{
addCriterion
(
"de_type is null"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeIsNotNull
()
{
addCriterion
(
"de_type is not null"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type ="
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeNotEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type <>"
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeGreaterThan
(
Integer
value
)
{
addCriterion
(
"de_type >"
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeGreaterThanOrEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type >="
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeLessThan
(
Integer
value
)
{
addCriterion
(
"de_type <"
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeLessThanOrEqualTo
(
Integer
value
)
{
addCriterion
(
"de_type <="
,
value
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeIn
(
List
<
Integer
>
values
)
{
addCriterion
(
"de_type in"
,
values
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeNotIn
(
List
<
Integer
>
values
)
{
addCriterion
(
"de_type not in"
,
values
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeBetween
(
Integer
value1
,
Integer
value2
)
{
addCriterion
(
"de_type between"
,
value1
,
value2
,
"deType"
);
return
(
Criteria
)
this
;
}
public
Criteria
andDeTypeNotBetween
(
Integer
value1
,
Integer
value2
)
{
addCriterion
(
"de_type not between"
,
value1
,
value2
,
"deType"
);
return
(
Criteria
)
this
;
}
}
public
static
class
Criteria
extends
GeneratedCriteria
{
...
...
backend/src/main/java/io/dataease/base/mapper/DatasetTableFieldMapper.xml
浏览文件 @
35d50d9f
...
...
@@ -7,10 +7,11 @@
<result
column=
"origin_name"
jdbcType=
"VARCHAR"
property=
"originName"
/>
<result
column=
"name"
jdbcType=
"VARCHAR"
property=
"name"
/>
<result
column=
"type"
jdbcType=
"VARCHAR"
property=
"type"
/>
<result
column=
"size"
jdbcType=
"INTEGER"
property=
"size"
/>
<result
column=
"de_type"
jdbcType=
"INTEGER"
property=
"deType"
/>
<result
column=
"checked"
jdbcType=
"BIT"
property=
"checked"
/>
<result
column=
"column_index"
jdbcType=
"INTEGER"
property=
"columnIndex"
/>
<result
column=
"last_sync_time"
jdbcType=
"BIGINT"
property=
"lastSyncTime"
/>
<result
column=
"de_type"
jdbcType=
"INTEGER"
property=
"deType"
/>
</resultMap>
<sql
id=
"Example_Where_Clause"
>
<where>
...
...
@@ -71,8 +72,8 @@
</where>
</sql>
<sql
id=
"Base_Column_List"
>
id, table_id, origin_name, `name`, `type`, `
checked`, column_index, last_sync_time
,
de_typ
e
id, table_id, origin_name, `name`, `type`, `
size`, de_type, `checked`, column_index
,
last_sync_tim
e
</sql>
<select
id=
"selectByExample"
parameterType=
"io.dataease.base.domain.DatasetTableFieldExample"
resultMap=
"BaseResultMap"
>
select
...
...
@@ -106,11 +107,13 @@
</delete>
<insert
id=
"insert"
parameterType=
"io.dataease.base.domain.DatasetTableField"
>
insert into dataset_table_field (id, table_id, origin_name,
`name`, `type`, `checked`, column_index,
last_sync_time, de_type)
`name`, `type`, `size`, de_type,
`checked`, column_index, last_sync_time
)
values (#{id,jdbcType=VARCHAR}, #{tableId,jdbcType=VARCHAR}, #{originName,jdbcType=VARCHAR},
#{name,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{checked,jdbcType=BIT}, #{columnIndex,jdbcType=INTEGER},
#{lastSyncTime,jdbcType=BIGINT}, #{deType,jdbcType=INTEGER})
#{name,jdbcType=VARCHAR}, #{type,jdbcType=VARCHAR}, #{size,jdbcType=INTEGER}, #{deType,jdbcType=INTEGER},
#{checked,jdbcType=BIT}, #{columnIndex,jdbcType=INTEGER}, #{lastSyncTime,jdbcType=BIGINT}
)
</insert>
<insert
id=
"insertSelective"
parameterType=
"io.dataease.base.domain.DatasetTableField"
>
insert into dataset_table_field
...
...
@@ -130,6 +133,12 @@
<if
test=
"type != null"
>
`type`,
</if>
<if
test=
"size != null"
>
`size`,
</if>
<if
test=
"deType != null"
>
de_type,
</if>
<if
test=
"checked != null"
>
`checked`,
</if>
...
...
@@ -139,9 +148,6 @@
<if
test=
"lastSyncTime != null"
>
last_sync_time,
</if>
<if
test=
"deType != null"
>
de_type,
</if>
</trim>
<trim
prefix=
"values ("
suffix=
")"
suffixOverrides=
","
>
<if
test=
"id != null"
>
...
...
@@ -159,6 +165,12 @@
<if
test=
"type != null"
>
#{type,jdbcType=VARCHAR},
</if>
<if
test=
"size != null"
>
#{size,jdbcType=INTEGER},
</if>
<if
test=
"deType != null"
>
#{deType,jdbcType=INTEGER},
</if>
<if
test=
"checked != null"
>
#{checked,jdbcType=BIT},
</if>
...
...
@@ -168,9 +180,6 @@
<if
test=
"lastSyncTime != null"
>
#{lastSyncTime,jdbcType=BIGINT},
</if>
<if
test=
"deType != null"
>
#{deType,jdbcType=INTEGER},
</if>
</trim>
</insert>
<select
id=
"countByExample"
parameterType=
"io.dataease.base.domain.DatasetTableFieldExample"
resultType=
"java.lang.Long"
>
...
...
@@ -197,6 +206,12 @@
<if
test=
"record.type != null"
>
`type` = #{record.type,jdbcType=VARCHAR},
</if>
<if
test=
"record.size != null"
>
`size` = #{record.size,jdbcType=INTEGER},
</if>
<if
test=
"record.deType != null"
>
de_type = #{record.deType,jdbcType=INTEGER},
</if>
<if
test=
"record.checked != null"
>
`checked` = #{record.checked,jdbcType=BIT},
</if>
...
...
@@ -206,9 +221,6 @@
<if
test=
"record.lastSyncTime != null"
>
last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT},
</if>
<if
test=
"record.deType != null"
>
de_type = #{record.deType,jdbcType=INTEGER},
</if>
</set>
<if
test=
"_parameter != null"
>
<include
refid=
"Update_By_Example_Where_Clause"
/>
...
...
@@ -221,10 +233,11 @@
origin_name = #{record.originName,jdbcType=VARCHAR},
`name` = #{record.name,jdbcType=VARCHAR},
`type` = #{record.type,jdbcType=VARCHAR},
`size` = #{record.size,jdbcType=INTEGER},
de_type = #{record.deType,jdbcType=INTEGER},
`checked` = #{record.checked,jdbcType=BIT},
column_index = #{record.columnIndex,jdbcType=INTEGER},
last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT},
de_type = #{record.deType,jdbcType=INTEGER}
last_sync_time = #{record.lastSyncTime,jdbcType=BIGINT}
<if
test=
"_parameter != null"
>
<include
refid=
"Update_By_Example_Where_Clause"
/>
</if>
...
...
@@ -244,6 +257,12 @@
<if
test=
"type != null"
>
`type` = #{type,jdbcType=VARCHAR},
</if>
<if
test=
"size != null"
>
`size` = #{size,jdbcType=INTEGER},
</if>
<if
test=
"deType != null"
>
de_type = #{deType,jdbcType=INTEGER},
</if>
<if
test=
"checked != null"
>
`checked` = #{checked,jdbcType=BIT},
</if>
...
...
@@ -253,9 +272,6 @@
<if
test=
"lastSyncTime != null"
>
last_sync_time = #{lastSyncTime,jdbcType=BIGINT},
</if>
<if
test=
"deType != null"
>
de_type = #{deType,jdbcType=INTEGER},
</if>
</set>
where id = #{id,jdbcType=VARCHAR}
</update>
...
...
@@ -265,10 +281,11 @@
origin_name = #{originName,jdbcType=VARCHAR},
`name` = #{name,jdbcType=VARCHAR},
`type` = #{type,jdbcType=VARCHAR},
`size` = #{size,jdbcType=INTEGER},
de_type = #{deType,jdbcType=INTEGER},
`checked` = #{checked,jdbcType=BIT},
column_index = #{columnIndex,jdbcType=INTEGER},
last_sync_time = #{lastSyncTime,jdbcType=BIGINT},
de_type = #{deType,jdbcType=INTEGER}
last_sync_time = #{lastSyncTime,jdbcType=BIGINT}
where id = #{id,jdbcType=VARCHAR}
</update>
</mapper>
\ No newline at end of file
backend/src/main/java/io/dataease/commons/utils/DorisTableUtils.java
0 → 100644
浏览文件 @
35d50d9f
package
io
.
dataease
.
commons
.
utils
;
public
class
DorisTableUtils
{
public
static
String
dorisName
(
String
datasetId
){
return
"ds_"
+
datasetId
.
replace
(
"-"
,
"_"
);
}
public
static
String
doristmpName
(
String
dorisName
){
return
"tmp"
+
dorisName
;
}
}
backend/src/main/java/io/dataease/config/CommonConfig.java
浏览文件 @
35d50d9f
package
io
.
dataease
.
config
;
import
com.alibaba.fastjson.JSONObject
;
import
com.fit2cloud.autoconfigure.QuartzAutoConfiguration
;
import
io.dataease.base.domain.Datasource
;
import
io.dataease.commons.utils.CommonThreadPool
;
import
org.pentaho.di.core.KettleEnvironment
;
import
org.pentaho.di.repository.filerep.KettleFileRepository
;
...
...
@@ -21,41 +23,26 @@ public class CommonConfig {
private
Environment
env
;
// 保存了配置文件的信息
private
static
String
root_path
=
"/opt/dataease/data/kettle/"
;
// @Bean
// @ConditionalOnMissingBean
// public org.apache.hadoop.conf.Configuration configuration() {
// org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
// configuration.set("hbase.zookeeper.quorum", env.getProperty("hbase.zookeeper.quorum"));
// configuration.set("hbase.zookeeper.property.clientPort", env.getProperty("hbase.zookeeper.property.clientPort"));
// configuration.set("hbase.client.retries.number", env.getProperty("hbase.client.retries.number", "1"));
// return configuration;
// }
@Bean
(
name
=
"DorisDatasource"
)
@ConditionalOnMissingBean
public
Datasource
configuration
()
{
JSONObject
jsonObject
=
new
JSONObject
();
jsonObject
.
put
(
"dataSourceType"
,
"jdbc"
);
jsonObject
.
put
(
"dataBase"
,
env
.
getProperty
(
"doris.db"
,
"doris"
));
jsonObject
.
put
(
"username"
,
env
.
getProperty
(
"doris.user"
,
"root"
));
jsonObject
.
put
(
"password"
,
env
.
getProperty
(
"doris.password"
,
"dataease"
));
jsonObject
.
put
(
"host"
,
env
.
getProperty
(
"doris.host"
,
"doris"
));
jsonObject
.
put
(
"port"
,
env
.
getProperty
(
"doris.port"
,
"9030"
));
Datasource
datasource
=
new
Datasource
();
datasource
.
setId
(
"doris"
);
datasource
.
setName
(
"doris"
);
datasource
.
setDesc
(
"doris"
);
datasource
.
setType
(
"mysql"
);
datasource
.
setConfiguration
(
jsonObject
.
toJSONString
());
return
datasource
;
}
// @Bean
// @ConditionalOnMissingBean
// public SparkSession javaSparkSession() {
// SparkSession spark = SparkSession.builder()
// .appName(env.getProperty("spark.appName", "DataeaseJob"))
// .master(env.getProperty("spark.master", "local[*]"))
// .config("spark.scheduler.mode", env.getProperty("spark.scheduler.mode", "FAIR"))
//// .config("spark.serializer", env.getProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer"))
//// .config("spark.executor.cores", env.getProperty("spark.executor.cores", "8"))
//// .config("spark.executor.memory", env.getProperty("spark.executor.memory", "6442450944b"))
//// .config("spark.locality.wait", env.getProperty("spark.locality.wait", "600000"))
//// .config("spark.maxRemoteBlockSizeFetchToMem", env.getProperty("spark.maxRemoteBlockSizeFetchToMem", "2000m"))
//// .config("spark.shuffle.detectCorrupt", env.getProperty("spark.shuffle.detectCorrupt", "false"))
//// .config("spark.shuffle.service.enabled", env.getProperty("spark.shuffle.service.enabled", "true"))
//// .config("spark.sql.adaptive.enabled", env.getProperty("spark.sql.adaptive.enabled", "true"))
//// .config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", env.getProperty("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", "200M"))
//// .config("spark.sql.broadcastTimeout", env.getProperty("spark.sql.broadcastTimeout", "12000"))
//// .config("spark.sql.retainGroupColumns", env.getProperty("spark.sql.retainGroupColumns", "false"))
//// .config("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold", "100000"))
//// .config("spark.sql.sortMergeJoinExec.buffer.spill.threshold", env.getProperty("spark.sql.sortMergeJoinExec.buffer.spill.threshold", "100000"))
//// .config("spark.sql.variable.substitute", env.getProperty("spark.sql.variable.substitute", "false"))
//// .config("spark.temp.expired.time", env.getProperty("spark.temp.expired.time", "3600"))
// .getOrCreate();
// return spark;
// }
@Bean
@ConditionalOnMissingBean
...
...
backend/src/main/java/io/dataease/datasource/provider/DatasourceProvider.java
浏览文件 @
35d50d9f
...
...
@@ -8,6 +8,7 @@ import io.dataease.datasource.request.DatasourceRequest;
import
java.sql.ResultSet
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
public
abstract
class
DatasourceProvider
{
...
...
@@ -15,8 +16,6 @@ public abstract class DatasourceProvider {
abstract
public
List
<
String
[]>
getData
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
ResultSet
getDataResultSet
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
List
<
String
>
getTables
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
public
List
<
TableFiled
>
getTableFileds
(
DatasourceRequest
datasourceRequest
)
throws
Exception
{
...
...
@@ -27,13 +26,11 @@ public abstract class DatasourceProvider {
getData
(
datasourceRequest
);
}
abstract
public
Long
count
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
List
<
String
[]>
getPageData
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
List
<
String
[]>
fetchResult
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
List
<
String
[]>
fetchResult
(
ResultSet
rs
)
throws
Exception
;
abstract
public
List
<
TableFiled
>
fetchResultField
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
List
<
TableFiled
>
fetchResultField
(
ResultSet
rs
)
throws
Exception
;
abstract
public
Map
<
String
,
List
>
fetchResultAndField
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
void
init
ConnectionPool
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
abstract
public
void
init
DataSource
(
DatasourceRequest
datasourceRequest
)
throws
Exception
;
}
backend/src/main/java/io/dataease/datasource/provider/JdbcProvider.java
浏览文件 @
35d50d9f
差异被折叠。
点击展开。
backend/src/main/java/io/dataease/datasource/service/DatasourceService.java
浏览文件 @
35d50d9f
...
...
@@ -41,7 +41,6 @@ public class DatasourceService {
datasource
.
setUpdateTime
(
currentTimeMillis
);
datasource
.
setCreateTime
(
currentTimeMillis
);
datasourceMapper
.
insertSelective
(
datasource
);
initConnectionPool
(
datasource
);
return
datasource
;
}
...
...
@@ -71,7 +70,6 @@ public class DatasourceService {
datasource
.
setCreateTime
(
null
);
datasource
.
setUpdateTime
(
System
.
currentTimeMillis
());
datasourceMapper
.
updateByPrimaryKeySelective
(
datasource
);
initConnectionPool
(
datasource
);
}
public
void
validate
(
Datasource
datasource
)
throws
Exception
{
...
...
@@ -92,31 +90,4 @@ public class DatasourceService {
public
Datasource
get
(
String
id
)
{
return
datasourceMapper
.
selectByPrimaryKey
(
id
);
}
private
void
initConnectionPool
(
Datasource
datasource
){
commonThreadPool
.
addTask
(()
->{
try
{
DatasourceProvider
datasourceProvider
=
ProviderFactory
.
getProvider
(
datasource
.
getType
());
DatasourceRequest
datasourceRequest
=
new
DatasourceRequest
();
datasourceRequest
.
setDatasource
(
datasource
);
datasourceProvider
.
initConnectionPool
(
datasourceRequest
);
}
catch
(
Exception
e
){}
});
}
public
void
initAllDataSourceConnectionPool
(){
List
<
Datasource
>
datasources
=
datasourceMapper
.
selectByExampleWithBLOBs
(
new
DatasourceExample
());
datasources
.
forEach
(
datasource
->
{
commonThreadPool
.
addTask
(()
->{
try
{
DatasourceProvider
datasourceProvider
=
ProviderFactory
.
getProvider
(
datasource
.
getType
());
DatasourceRequest
datasourceRequest
=
new
DatasourceRequest
();
datasourceRequest
.
setDatasource
(
datasource
);
datasourceProvider
.
initConnectionPool
(
datasourceRequest
);
}
catch
(
Exception
e
){
e
.
printStackTrace
();
}
});
});
}
}
backend/src/main/java/io/dataease/listener/AppStartInitDataSourceListener.java
deleted
100644 → 0
浏览文件 @
f91661e7
package
io
.
dataease
.
listener
;
import
io.dataease.datasource.service.DatasourceService
;
import
org.springframework.boot.context.event.ApplicationReadyEvent
;
import
org.springframework.context.ApplicationListener
;
import
org.springframework.core.annotation.Order
;
import
org.springframework.stereotype.Component
;
import
javax.annotation.Resource
;
@Component
@Order
(
value
=
2
)
public
class
AppStartInitDataSourceListener
implements
ApplicationListener
<
ApplicationReadyEvent
>
{
@Resource
private
DatasourceService
datasourceService
;
@Override
public
void
onApplicationEvent
(
ApplicationReadyEvent
applicationReadyEvent
)
{
System
.
out
.
println
(
"================= Init datasource connection pool ================="
);
// 项目启动,从数据集中找到定时抽取的表,从HBase中读取放入缓存
datasourceService
.
initAllDataSourceConnectionPool
();
}
}
backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java
浏览文件 @
35d50d9f
...
...
@@ -240,9 +240,9 @@ public class DataSetTableService {
datasourceRequest
.
setDatasource
(
ds
);
String
sql
=
new
Gson
().
fromJson
(
dataSetTableRequest
.
getInfo
(),
DataTableInfoDTO
.
class
).
getSql
();
datasourceRequest
.
setQuery
(
sql
);
ResultSet
dataResultSet
=
datasourceProvider
.
getDataResultSet
(
datasourceRequest
);
List
<
String
[]>
data
=
datasourceProvider
.
fetchResult
(
dataResultSet
);
List
<
TableFiled
>
fields
=
datasourceProvider
.
fetchResultField
(
dataResultSet
);
Map
<
String
,
List
>
result
=
datasourceProvider
.
fetchResultAndField
(
datasourceRequest
);
List
<
String
[]>
data
=
result
.
get
(
"dataList"
);
List
<
TableFiled
>
fields
=
result
.
get
(
"fieldList"
);
String
[]
fieldArray
=
fields
.
stream
().
map
(
TableFiled:
:
getFieldName
).
toArray
(
String
[]::
new
);
List
<
Map
<
String
,
Object
>>
jsonArray
=
new
ArrayList
<>();
...
...
@@ -263,67 +263,6 @@ public class DataSetTableService {
return
map
;
}
public
List
<
String
[]>
getDataSetData
(
String
datasourceId
,
String
table
,
List
<
DatasetTableField
>
fields
)
{
List
<
String
[]>
data
=
new
ArrayList
<>();
Datasource
ds
=
datasourceMapper
.
selectByPrimaryKey
(
datasourceId
);
DatasourceProvider
datasourceProvider
=
ProviderFactory
.
getProvider
(
ds
.
getType
());
DatasourceRequest
datasourceRequest
=
new
DatasourceRequest
();
datasourceRequest
.
setDatasource
(
ds
);
String
[]
fieldArray
=
fields
.
stream
().
map
(
DatasetTableField:
:
getOriginName
).
toArray
(
String
[]::
new
);
datasourceRequest
.
setQuery
(
createQuerySQL
(
ds
.
getType
(),
table
,
fieldArray
)
+
" LIMIT 0, 10"
);
try
{
data
.
addAll
(
datasourceProvider
.
getData
(
datasourceRequest
));
}
catch
(
Exception
e
)
{
}
return
data
;
}
public
Long
getDataSetTotalData
(
String
datasourceId
,
String
table
)
{
List
<
String
[]>
data
=
new
ArrayList
<>();
Datasource
ds
=
datasourceMapper
.
selectByPrimaryKey
(
datasourceId
);
DatasourceProvider
datasourceProvider
=
ProviderFactory
.
getProvider
(
ds
.
getType
());
DatasourceRequest
datasourceRequest
=
new
DatasourceRequest
();
datasourceRequest
.
setDatasource
(
ds
);
datasourceRequest
.
setQuery
(
"select count(*) from "
+
table
);
try
{
return
datasourceProvider
.
count
(
datasourceRequest
);
}
catch
(
Exception
e
)
{
}
return
0
l
;
}
public
List
<
String
[]>
getDataSetPageData
(
String
datasourceId
,
String
table
,
List
<
DatasetTableField
>
fields
,
Long
startPage
,
Long
pageSize
)
{
List
<
String
[]>
data
=
new
ArrayList
<>();
Datasource
ds
=
datasourceMapper
.
selectByPrimaryKey
(
datasourceId
);
DatasourceProvider
datasourceProvider
=
ProviderFactory
.
getProvider
(
ds
.
getType
());
DatasourceRequest
datasourceRequest
=
new
DatasourceRequest
();
datasourceRequest
.
setDatasource
(
ds
);
String
[]
fieldArray
=
fields
.
stream
().
map
(
DatasetTableField:
:
getOriginName
).
toArray
(
String
[]::
new
);
datasourceRequest
.
setPageSize
(
pageSize
);
datasourceRequest
.
setStartPage
(
startPage
);
datasourceRequest
.
setQuery
(
createQuerySQL
(
ds
.
getType
(),
table
,
fieldArray
));
try
{
return
datasourceProvider
.
getData
(
datasourceRequest
);
}
catch
(
Exception
e
)
{
}
return
data
;
}
public
List
<
String
[]>
getDataSetDataBySql
(
String
datasourceId
,
String
table
,
String
sql
)
{
List
<
String
[]>
data
=
new
ArrayList
<>();
Datasource
ds
=
datasourceMapper
.
selectByPrimaryKey
(
datasourceId
);
DatasourceProvider
datasourceProvider
=
ProviderFactory
.
getProvider
(
ds
.
getType
());
DatasourceRequest
datasourceRequest
=
new
DatasourceRequest
();
datasourceRequest
.
setDatasource
(
ds
);
datasourceRequest
.
setQuery
(
sql
);
try
{
return
datasourceProvider
.
getData
(
datasourceRequest
);
}
catch
(
Exception
e
)
{
}
return
data
;
}
public
void
saveTableField
(
DatasetTable
datasetTable
)
throws
Exception
{
Datasource
ds
=
datasourceMapper
.
selectByPrimaryKey
(
datasetTable
.
getDataSourceId
());
DataSetTableRequest
dataSetTableRequest
=
new
DataSetTableRequest
();
...
...
@@ -338,8 +277,7 @@ public class DataSetTableService {
DatasourceRequest
datasourceRequest
=
new
DatasourceRequest
();
datasourceRequest
.
setDatasource
(
ds
);
datasourceRequest
.
setQuery
(
new
Gson
().
fromJson
(
dataSetTableRequest
.
getInfo
(),
DataTableInfoDTO
.
class
).
getSql
());
ResultSet
dataResultSet
=
datasourceProvider
.
getDataResultSet
(
datasourceRequest
);
fields
=
datasourceProvider
.
fetchResultField
(
dataResultSet
);
fields
=
datasourceProvider
.
fetchResultField
(
datasourceRequest
);
}
else
if
(
StringUtils
.
equalsIgnoreCase
(
datasetTable
.
getType
(),
"excel"
))
{
DataTableInfoDTO
dataTableInfoDTO
=
new
Gson
().
fromJson
(
dataSetTableRequest
.
getInfo
(),
DataTableInfoDTO
.
class
);
String
path
=
dataTableInfoDTO
.
getData
();
...
...
@@ -367,6 +305,7 @@ public class DataSetTableService {
}
else
{
datasetTableField
.
setDeType
(
transFieldType
(
ds
.
getType
(),
filed
.
getFieldType
()));
}
datasetTableField
.
setSize
(
filed
.
getFieldSize
());
datasetTableField
.
setChecked
(
true
);
datasetTableField
.
setColumnIndex
(
i
);
datasetTableField
.
setLastSyncTime
(
syncTime
);
...
...
backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
浏览文件 @
35d50d9f
差异被折叠。
点击展开。
backend/src/main/java/io/dataease/service/spark/CacheUtil.java
deleted
100644 → 0
浏览文件 @
f91661e7
package
io
.
dataease
.
service
.
spark
;
import
org.apache.spark.sql.Dataset
;
import
org.apache.spark.sql.Row
;
import
java.util.HashMap
;
import
java.util.Map
;
/**
* @Author gin
* @Date 2021/4/13 12:32 下午
*/
public
class
CacheUtil
{
private
static
CacheUtil
cacheUtil
;
private
static
Map
<
String
,
Dataset
<
Row
>>
cacheMap
;
private
CacheUtil
(){
cacheMap
=
new
HashMap
<
String
,
Dataset
<
Row
>>();
}
public
static
CacheUtil
getInstance
(){
if
(
cacheUtil
==
null
){
cacheUtil
=
new
CacheUtil
();
}
return
cacheUtil
;
}
/**
* 添加缓存
* @param key
* @param obj
*/
public
void
addCacheData
(
String
key
,
Dataset
<
Row
>
obj
){
cacheMap
.
put
(
key
,
obj
);
}
/**
* 取出缓存
* @param key
* @return
*/
public
Dataset
<
Row
>
getCacheData
(
String
key
){
return
cacheMap
.
get
(
key
);
}
/**
* 清楚缓存
* @param key
*/
public
void
removeCacheData
(
String
key
){
cacheMap
.
remove
(
key
);
}
}
backend/src/main/java/io/dataease/service/spark/SparkCalc.java
deleted
100644 → 0
浏览文件 @
f91661e7
差异被折叠。
点击展开。
backend/src/main/resources/db/migration/V14__dataset_table_field.sql
0 → 100644
浏览文件 @
35d50d9f
ALTER
TABLE
`dataset_table_field`
ADD
COLUMN
`size`
INT
NULL
AFTER
`type`
;
backend/src/main/resources/generatorConfig.xml
浏览文件 @
35d50d9f
...
...
@@ -67,7 +67,7 @@
<!-- <table tableName="datasource"/>-->
<!-- <table tableName="sys_dict"/>-->
<!-- <table tableName="sys_dict_item"/>-->
<table
tableName=
"
panel_template
"
/>
<table
tableName=
"
dataset_table_field
"
/>
<!-- <table tableName="panel_design"/>-->
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论