Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
D
dataease
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
zhu
dataease
Commits
afd2a7ae
提交
afd2a7ae
authored
5月 01, 2022
作者:
taojinlong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: 定时任务创建全量任务,执行完成后修改为增量,发现增量不生效
上级
ec9139bd
隐藏空白字符变更
内嵌
并排
正在显示
5 个修改的文件
包含
100 行增加
和
73 行删除
+100
-73
ChartGroupService.java
...ain/java/io/dataease/service/chart/ChartGroupService.java
+0
-4
DataSetTableService.java
...java/io/dataease/service/dataset/DataSetTableService.java
+5
-2
DataSetTableTaskLogService.java
.../dataease/service/dataset/DataSetTableTaskLogService.java
+6
-0
DataSetTableTaskService.java
.../io/dataease/service/dataset/DataSetTableTaskService.java
+45
-27
ExtractDataService.java
.../java/io/dataease/service/dataset/ExtractDataService.java
+44
-40
没有找到文件。
backend/src/main/java/io/dataease/service/chart/ChartGroupService.java
浏览文件 @
afd2a7ae
...
@@ -31,10 +31,6 @@ public class ChartGroupService {
...
@@ -31,10 +31,6 @@ public class ChartGroupService {
@Resource
@Resource
private
ExtChartGroupMapper
extChartGroupMapper
;
private
ExtChartGroupMapper
extChartGroupMapper
;
@Resource
@Resource
private
ExtDataSetGroupMapper
extDataSetGroupMapper
;
@Resource
private
SysAuthService
sysAuthService
;
@Resource
private
ExtChartViewMapper
extChartViewMapper
;
private
ExtChartViewMapper
extChartViewMapper
;
public
ChartGroupDTO
save
(
ChartGroup
chartGroup
)
{
public
ChartGroupDTO
save
(
ChartGroup
chartGroup
)
{
...
...
backend/src/main/java/io/dataease/service/dataset/DataSetTableService.java
浏览文件 @
afd2a7ae
...
@@ -2245,8 +2245,7 @@ public class DataSetTableService {
...
@@ -2245,8 +2245,7 @@ public class DataSetTableService {
List
<
DatasetTable
>
jobStoppeddDatasetTables
=
new
ArrayList
<>();
List
<
DatasetTable
>
jobStoppeddDatasetTables
=
new
ArrayList
<>();
datasetTableMapper
.
selectByExample
(
example
).
forEach
(
datasetTable
->
{
datasetTableMapper
.
selectByExample
(
example
).
forEach
(
datasetTable
->
{
if
(
StringUtils
.
isEmpty
(
datasetTable
.
getQrtzInstance
())
||
!
activeQrtzInstances
.
contains
(
if
(
StringUtils
.
isNotEmpty
(
datasetTable
.
getQrtzInstance
())
&&
!
activeQrtzInstances
.
contains
(
datasetTable
.
getQrtzInstance
().
substring
(
0
,
datasetTable
.
getQrtzInstance
().
length
()
-
13
)))
{
datasetTable
.
getQrtzInstance
().
substring
(
0
,
datasetTable
.
getQrtzInstance
().
length
()
-
13
)))
{
jobStoppeddDatasetTables
.
add
(
datasetTable
);
jobStoppeddDatasetTables
.
add
(
datasetTable
);
}
}
});
});
...
@@ -2314,4 +2313,8 @@ public class DataSetTableService {
...
@@ -2314,4 +2313,8 @@ public class DataSetTableService {
saveTableField
(
datasetTable
);
saveTableField
(
datasetTable
);
return
datasetTable
;
return
datasetTable
;
}
}
public
int
updateByExampleSelective
(
DatasetTable
record
,
DatasetTableExample
example
){
return
datasetTableMapper
.
updateByExampleSelective
(
record
,
example
);
}
}
}
backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskLogService.java
浏览文件 @
afd2a7ae
...
@@ -10,6 +10,7 @@ import io.dataease.dto.dataset.DataSetTaskLogDTO;
...
@@ -10,6 +10,7 @@ import io.dataease.dto.dataset.DataSetTaskLogDTO;
import
io.dataease.plugins.common.base.domain.DatasetTableTaskLog
;
import
io.dataease.plugins.common.base.domain.DatasetTableTaskLog
;
import
io.dataease.plugins.common.base.domain.DatasetTableTaskLogExample
;
import
io.dataease.plugins.common.base.domain.DatasetTableTaskLogExample
;
import
io.dataease.plugins.common.base.mapper.DatasetTableTaskLogMapper
;
import
io.dataease.plugins.common.base.mapper.DatasetTableTaskLogMapper
;
import
io.dataease.plugins.common.base.mapper.DatasetTableTaskMapper
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
...
@@ -29,8 +30,13 @@ public class DataSetTableTaskLogService {
...
@@ -29,8 +30,13 @@ public class DataSetTableTaskLogService {
private
DatasetTableTaskLogMapper
datasetTableTaskLogMapper
;
private
DatasetTableTaskLogMapper
datasetTableTaskLogMapper
;
@Resource
@Resource
private
ExtDataSetTaskMapper
extDataSetTaskMapper
;
private
ExtDataSetTaskMapper
extDataSetTaskMapper
;
@Resource
private
DatasetTableTaskMapper
datasetTableTaskMapper
;
public
DatasetTableTaskLog
save
(
DatasetTableTaskLog
datasetTableTaskLog
)
{
public
DatasetTableTaskLog
save
(
DatasetTableTaskLog
datasetTableTaskLog
)
{
if
(
datasetTableTaskMapper
.
selectByPrimaryKey
(
datasetTableTaskLog
.
getTaskId
())
==
null
){
return
datasetTableTaskLog
;
}
if
(
StringUtils
.
isEmpty
(
datasetTableTaskLog
.
getId
()))
{
if
(
StringUtils
.
isEmpty
(
datasetTableTaskLog
.
getId
()))
{
datasetTableTaskLog
.
setId
(
UUID
.
randomUUID
().
toString
());
datasetTableTaskLog
.
setId
(
UUID
.
randomUUID
().
toString
());
datasetTableTaskLog
.
setCreateTime
(
System
.
currentTimeMillis
());
datasetTableTaskLog
.
setCreateTime
(
System
.
currentTimeMillis
());
...
...
backend/src/main/java/io/dataease/service/dataset/DataSetTableTaskService.java
浏览文件 @
afd2a7ae
...
@@ -108,26 +108,32 @@ public class DataSetTableTaskService {
...
@@ -108,26 +108,32 @@ public class DataSetTableTaskService {
DataEaseException
.
throwException
(
Translator
.
get
(
"i18n_not_exec_add_sync"
));
DataEaseException
.
throwException
(
Translator
.
get
(
"i18n_not_exec_add_sync"
));
}
}
}
}
if
(
existSyncTask
(
data
SetTableService
.
get
(
datasetTableTask
.
getTableId
()),
datasetTableTask
))
{
if
(
existSyncTask
(
data
setTableTask
.
getTableId
(),
datasetTableTask
.
getId
()
))
{
DataEaseException
.
throwException
(
Translator
.
get
(
"i18n_sync_job_exists"
));
DataEaseException
.
throwException
(
Translator
.
get
(
"i18n_sync_job_exists"
));
}
}
}
}
private
synchronized
boolean
existSyncTask
(
DatasetTable
datasetTable
,
DatasetTableTask
datasetTableTask
)
{
private
synchronized
boolean
existSyncTask
(
String
datasetTableId
,
String
datasetTableTaskId
)
{
datasetTable
.
setSyncStatus
(
JobStatus
.
Underway
.
name
());
DatasetTable
record
=
new
DatasetTable
();
record
.
setSyncStatus
(
JobStatus
.
Underway
.
name
());
DatasetTableExample
example
=
new
DatasetTableExample
();
DatasetTableExample
example
=
new
DatasetTableExample
();
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
.
getId
()
).
andSyncStatusNotEqualTo
(
JobStatus
.
Underway
.
name
());
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
Id
).
andSyncStatusNotEqualTo
(
JobStatus
.
Underway
.
name
());
example
.
or
(
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
.
getId
()
).
andSyncStatusIsNull
());
example
.
or
(
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
Id
).
andSyncStatusIsNull
());
Boolean
existSyncTask
=
datasetTableMapper
.
updateByExampleSelective
(
datasetTable
,
example
)
==
0
;
Boolean
existSyncTask
=
datasetTableMapper
.
updateByExampleSelective
(
record
,
example
)
==
0
;
if
(!
existSyncTask
)
{
if
(!
existSyncTask
)
{
Long
startTime
=
System
.
currentTimeMillis
();
Long
startTime
=
System
.
currentTimeMillis
();
datasetTableTask
.
setLastExecTime
(
startTime
);
datasetTableTask
.
setLastExecStatus
(
JobStatus
.
Underway
.
name
());
DatasetTableTask
datasetTableTaskRecord
=
new
DatasetTableTask
();
datasetTableTask
.
setStatus
(
TaskStatus
.
Exec
.
name
());
datasetTableTaskRecord
.
setLastExecTime
(
startTime
);
update
(
datasetTableTask
);
datasetTableTaskRecord
.
setLastExecStatus
(
JobStatus
.
Underway
.
name
());
datasetTableTaskRecord
.
setStatus
(
TaskStatus
.
Exec
.
name
());
DatasetTableTaskExample
datasetTableTaskExample
=
new
DatasetTableTaskExample
();
datasetTableTaskExample
.
createCriteria
().
andIdEqualTo
(
datasetTableTaskId
);
updateByExampleSelective
(
datasetTableTaskRecord
,
datasetTableTaskExample
);
DatasetTableTaskLog
datasetTableTaskLog
=
new
DatasetTableTaskLog
();
DatasetTableTaskLog
datasetTableTaskLog
=
new
DatasetTableTaskLog
();
datasetTableTaskLog
.
setTableId
(
datasetTable
Task
.
getTableId
()
);
datasetTableTaskLog
.
setTableId
(
datasetTable
Id
);
datasetTableTaskLog
.
setTaskId
(
datasetTableTask
.
getId
()
);
datasetTableTaskLog
.
setTaskId
(
datasetTableTask
Id
);
datasetTableTaskLog
.
setStatus
(
JobStatus
.
Underway
.
name
());
datasetTableTaskLog
.
setStatus
(
JobStatus
.
Underway
.
name
());
datasetTableTaskLog
.
setStartTime
(
startTime
);
datasetTableTaskLog
.
setStartTime
(
startTime
);
datasetTableTaskLog
.
setTriggerType
(
TriggerType
.
Custom
.
name
());
datasetTableTaskLog
.
setTriggerType
(
TriggerType
.
Custom
.
name
());
...
@@ -173,7 +179,7 @@ public class DataSetTableTaskService {
...
@@ -173,7 +179,7 @@ public class DataSetTableTaskService {
}
}
}
}
public
void
checkTaskIsStopped
(
DatasetTableTask
datasetTableTask
)
{
public
void
checkTaskIsStopped
(
final
DatasetTableTask
datasetTableTask
)
{
if
(
StringUtils
.
isNotEmpty
(
datasetTableTask
.
getEnd
())
&&
datasetTableTask
.
getEnd
().
equalsIgnoreCase
(
"1"
))
{
if
(
StringUtils
.
isNotEmpty
(
datasetTableTask
.
getEnd
())
&&
datasetTableTask
.
getEnd
().
equalsIgnoreCase
(
"1"
))
{
BaseGridRequest
request
=
new
BaseGridRequest
();
BaseGridRequest
request
=
new
BaseGridRequest
();
ConditionEntity
conditionEntity
=
new
ConditionEntity
();
ConditionEntity
conditionEntity
=
new
ConditionEntity
();
...
@@ -186,24 +192,30 @@ public class DataSetTableTaskService {
...
@@ -186,24 +192,30 @@ public class DataSetTableTaskService {
return
;
return
;
}
}
if
(
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
==
null
||
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
<=
0
)
{
if
(
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
==
null
||
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
<=
0
)
{
datasetTableTask
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
DatasetTableTask
record
=
new
DatasetTableTask
();
update
(
datasetTableTask
);
record
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
DatasetTableTaskExample
datasetTableTaskExample
=
new
DatasetTableTaskExample
();
datasetTableTaskExample
.
createCriteria
().
andIdEqualTo
(
datasetTableTask
.
getId
());
updateByExampleSelective
(
record
,
datasetTableTaskExample
);
return
;
return
;
}
}
if
(
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
>
datasetTableTask
.
getEndTime
())
{
if
(
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
>
datasetTableTask
.
getEndTime
())
{
datasetTableTask
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
DatasetTableTask
record
=
new
DatasetTableTask
();
update
(
datasetTableTask
);
record
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
DatasetTableTaskExample
datasetTableTaskExample
=
new
DatasetTableTaskExample
();
datasetTableTaskExample
.
createCriteria
().
andIdEqualTo
(
datasetTableTask
.
getId
());
updateByExampleSelective
(
record
,
datasetTableTaskExample
);
}
}
}
}
}
}
public
void
updateTaskStatus
(
DatasetTableTask
datasetTableTask
,
JobStatus
lastExecStatus
)
{
public
void
updateTaskStatus
(
DatasetTableTask
datasetTableTask
,
JobStatus
lastExecStatus
)
{
datasetTableTask
.
setLastExecStatus
(
lastExecStatus
.
name
());
DatasetTableTask
recore
=
new
DatasetTableTask
();
recore
.
setLastExecStatus
(
lastExecStatus
.
name
());
if
(
datasetTableTask
.
getRate
().
equalsIgnoreCase
(
ScheduleType
.
SIMPLE
.
name
()))
{
if
(
datasetTableTask
.
getRate
().
equalsIgnoreCase
(
ScheduleType
.
SIMPLE
.
name
()))
{
datasetTableTask
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
recore
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
}
else
{
}
else
{
// datasetTableTask = datasetTableTaskMapper.selectByPrimaryKey(datasetTableTask.getId());
recore
.
setLastExecStatus
(
lastExecStatus
.
name
());
datasetTableTask
.
setLastExecStatus
(
lastExecStatus
.
name
());
if
(
StringUtils
.
isNotEmpty
(
datasetTableTask
.
getEnd
())
&&
datasetTableTask
.
getEnd
().
equalsIgnoreCase
(
"1"
))
{
if
(
StringUtils
.
isNotEmpty
(
datasetTableTask
.
getEnd
())
&&
datasetTableTask
.
getEnd
().
equalsIgnoreCase
(
"1"
))
{
BaseGridRequest
request
=
new
BaseGridRequest
();
BaseGridRequest
request
=
new
BaseGridRequest
();
ConditionEntity
conditionEntity
=
new
ConditionEntity
();
ConditionEntity
conditionEntity
=
new
ConditionEntity
();
...
@@ -216,19 +228,25 @@ public class DataSetTableTaskService {
...
@@ -216,19 +228,25 @@ public class DataSetTableTaskService {
return
;
return
;
}
}
if
(
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
==
null
||
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
<=
0
)
{
if
(
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
==
null
||
dataSetTaskDTOS
.
get
(
0
).
getNextExecTime
()
<=
0
)
{
datasetTableTask
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
recore
.
setStatus
(
TaskStatus
.
Stopped
.
name
());
}
else
{
}
else
{
datasetTableTask
.
setStatus
(
TaskStatus
.
Underway
.
name
());
recore
.
setStatus
(
TaskStatus
.
Underway
.
name
());
}
}
}
else
{
}
else
{
datasetTableTask
.
setStatus
(
TaskStatus
.
Underway
.
name
());
recore
.
setStatus
(
TaskStatus
.
Underway
.
name
());
}
}
}
}
update
(
datasetTableTask
);
DatasetTableTaskExample
datasetTableTaskExample
=
new
DatasetTableTaskExample
();
datasetTableTaskExample
.
createCriteria
().
andIdEqualTo
(
datasetTableTask
.
getId
());
updateByExampleSelective
(
recore
,
datasetTableTaskExample
);
}
public
DatasetTableTask
selectByPrimaryKey
(
String
id
)
{
return
datasetTableTaskMapper
.
selectByPrimaryKey
(
id
);
}
}
public
void
update
(
DatasetTableTask
datasetTableTask
)
{
public
void
update
ByExampleSelective
(
DatasetTableTask
datasetTableTask
,
DatasetTableTaskExample
datasetTableTaskExample
)
{
datasetTableTaskMapper
.
updateBy
PrimaryKeySelective
(
datasetTableTask
);
datasetTableTaskMapper
.
updateBy
ExampleSelective
(
datasetTableTask
,
datasetTableTaskExample
);
}
}
public
List
<
DatasetTableTask
>
list
(
DatasetTableTask
datasetTableTask
)
{
public
List
<
DatasetTableTask
>
list
(
DatasetTableTask
datasetTableTask
)
{
...
...
backend/src/main/java/io/dataease/service/dataset/ExtractDataService.java
浏览文件 @
afd2a7ae
...
@@ -65,6 +65,7 @@ import org.quartz.JobExecutionContext;
...
@@ -65,6 +65,7 @@ import org.quartz.JobExecutionContext;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.context.annotation.Lazy
;
import
org.springframework.context.annotation.Lazy
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.Resource
;
import
javax.annotation.Resource
;
import
java.io.*
;
import
java.io.*
;
import
java.util.*
;
import
java.util.*
;
...
@@ -85,12 +86,6 @@ public class ExtractDataService {
...
@@ -85,12 +86,6 @@ public class ExtractDataService {
@Lazy
@Lazy
private
DataSetTableTaskService
dataSetTableTaskService
;
private
DataSetTableTaskService
dataSetTableTaskService
;
@Resource
@Resource
private
DatasourceMapper
datasourceMapper
;
@Resource
private
DatasetTableMapper
datasetTableMapper
;
@Resource
private
DatasetTableTaskMapper
datasetTableTaskMapper
;
@Resource
private
DatasourceService
datasourceService
;
private
DatasourceService
datasourceService
;
@Resource
@Resource
private
ExtChartViewMapper
extChartViewMapper
;
private
ExtChartViewMapper
extChartViewMapper
;
...
@@ -133,24 +128,28 @@ public class ExtractDataService {
...
@@ -133,24 +128,28 @@ public class ExtractDataService {
" exit 1\n"
+
" exit 1\n"
+
"fi\n"
;
"fi\n"
;
public
synchronized
boolean
existSyncTask
(
DatasetTable
datasetTable
,
DatasetTableTask
datasetTableTask
,
Long
startTime
)
{
public
synchronized
boolean
existSyncTask
(
String
datasetTableId
,
String
datasetTableTaskId
,
Long
startTime
)
{
datasetTable
.
setSyncStatus
(
JobStatus
.
Underway
.
name
());
DatasetTable
datasetTableRecord
=
new
DatasetTable
();
datasetTableRecord
.
setSyncStatus
(
JobStatus
.
Underway
.
name
());
DatasetTableExample
example
=
new
DatasetTableExample
();
DatasetTableExample
example
=
new
DatasetTableExample
();
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
.
getId
()
).
andSyncStatusNotEqualTo
(
JobStatus
.
Underway
.
name
());
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
Id
).
andSyncStatusNotEqualTo
(
JobStatus
.
Underway
.
name
());
example
.
or
(
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
.
getId
()
).
andSyncStatusIsNull
());
example
.
or
(
example
.
createCriteria
().
andIdEqualTo
(
datasetTable
Id
).
andSyncStatusIsNull
());
boolean
existSyncTask
=
data
setTableMapper
.
updateByExampleSelective
(
datasetTable
,
example
)
==
0
;
boolean
existSyncTask
=
data
SetTableService
.
updateByExampleSelective
(
datasetTableRecord
,
example
)
==
0
;
if
(
existSyncTask
)
{
if
(
existSyncTask
)
{
DatasetTableTaskLog
datasetTableTaskLog
=
new
DatasetTableTaskLog
();
DatasetTableTaskLog
datasetTableTaskLog
=
new
DatasetTableTaskLog
();
datasetTableTaskLog
.
setTaskId
(
datasetTableTask
.
getId
()
);
datasetTableTaskLog
.
setTaskId
(
datasetTableTask
Id
);
datasetTableTaskLog
.
setTableId
(
datasetTable
.
getId
()
);
datasetTableTaskLog
.
setTableId
(
datasetTable
Id
);
datasetTableTaskLog
.
setStatus
(
JobStatus
.
Underway
.
name
());
datasetTableTaskLog
.
setStatus
(
JobStatus
.
Underway
.
name
());
List
<
DatasetTableTaskLog
>
datasetTableTaskLogs
=
dataSetTableTaskLogService
.
select
(
datasetTableTaskLog
);
List
<
DatasetTableTaskLog
>
datasetTableTaskLogs
=
dataSetTableTaskLogService
.
select
(
datasetTableTaskLog
);
return
CollectionUtils
.
isEmpty
(
datasetTableTaskLogs
)
||
!
datasetTableTaskLogs
.
get
(
0
).
getTriggerType
().
equalsIgnoreCase
(
TriggerType
.
Custom
.
name
());
return
CollectionUtils
.
isEmpty
(
datasetTableTaskLogs
)
||
!
datasetTableTaskLogs
.
get
(
0
).
getTriggerType
().
equalsIgnoreCase
(
TriggerType
.
Custom
.
name
());
}
else
{
}
else
{
datasetTableTask
.
setLastExecTime
(
startTime
);
DatasetTableTask
record
=
new
DatasetTableTask
();
datasetTableTask
.
setLastExecStatus
(
JobStatus
.
Underway
.
name
());
record
.
setLastExecTime
(
startTime
);
datasetTableTask
.
setStatus
(
TaskStatus
.
Exec
.
name
());
record
.
setLastExecStatus
(
JobStatus
.
Underway
.
name
());
dataSetTableTaskService
.
update
(
datasetTableTask
);
record
.
setStatus
(
TaskStatus
.
Exec
.
name
());
DatasetTableTaskExample
datasetTableTaskExample
=
new
DatasetTableTaskExample
();
datasetTableTaskExample
.
createCriteria
().
andIdEqualTo
(
datasetTableTaskId
);
dataSetTableTaskService
.
updateByExampleSelective
(
record
,
datasetTableTaskExample
);
return
false
;
return
false
;
}
}
}
}
...
@@ -179,7 +178,7 @@ public class ExtractDataService {
...
@@ -179,7 +178,7 @@ public class ExtractDataService {
return
o1
.
getColumnIndex
().
compareTo
(
o2
.
getColumnIndex
());
return
o1
.
getColumnIndex
().
compareTo
(
o2
.
getColumnIndex
());
});
});
DatasetTableTaskLog
datasetTableTaskLog
=
writeDatasetTableTaskLog
(
datasetTableId
,
ops
);
DatasetTableTaskLog
datasetTableTaskLog
=
writeDatasetTableTaskLog
(
datasetTableId
,
ops
);
switch
(
updateType
)
{
switch
(
updateType
)
{
case
all_scope:
// 全量更新
case
all_scope:
// 全量更新
try
{
try
{
...
@@ -195,7 +194,7 @@ public class ExtractDataService {
...
@@ -195,7 +194,7 @@ public class ExtractDataService {
}
}
replaceTable
(
TableUtils
.
tableName
(
datasetTableId
));
replaceTable
(
TableUtils
.
tableName
(
datasetTableId
));
saveSuccessLog
(
datasetTableTaskLog
);
saveSuccessLog
(
datasetTableTaskLog
);
updateTableStatus
(
datasetTableId
,
datasetTable
,
JobStatus
.
Completed
,
execTime
);
updateTableStatus
(
datasetTableId
,
JobStatus
.
Completed
,
execTime
);
if
(
ops
.
equalsIgnoreCase
(
"替换"
))
{
if
(
ops
.
equalsIgnoreCase
(
"替换"
))
{
List
<
DatasetTableField
>
oldFileds
=
getDatasetTableFields
(
datasetTable
.
getId
());
List
<
DatasetTableField
>
oldFileds
=
getDatasetTableFields
(
datasetTable
.
getId
());
List
<
DatasetTableField
>
toAdd
=
new
ArrayList
<>();
List
<
DatasetTableField
>
toAdd
=
new
ArrayList
<>();
...
@@ -228,7 +227,7 @@ public class ExtractDataService {
...
@@ -228,7 +227,7 @@ public class ExtractDataService {
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
saveErrorLog
(
datasetTableTaskLog
,
e
);
saveErrorLog
(
datasetTableTaskLog
,
e
);
updateTableStatus
(
datasetTableId
,
datasetTable
,
JobStatus
.
Error
,
null
);
updateTableStatus
(
datasetTableId
,
JobStatus
.
Error
,
null
);
dropDorisTable
(
TableUtils
.
tmpName
(
TableUtils
.
tableName
(
datasetTableId
)));
dropDorisTable
(
TableUtils
.
tmpName
(
TableUtils
.
tableName
(
datasetTableId
)));
}
finally
{
}
finally
{
deleteFile
(
"all_scope"
,
datasetTableId
);
deleteFile
(
"all_scope"
,
datasetTableId
);
...
@@ -247,10 +246,10 @@ public class ExtractDataService {
...
@@ -247,10 +246,10 @@ public class ExtractDataService {
extractExcelDataForSimpleMode
(
datasetTable
,
"incremental_add"
);
extractExcelDataForSimpleMode
(
datasetTable
,
"incremental_add"
);
}
}
saveSuccessLog
(
datasetTableTaskLog
);
saveSuccessLog
(
datasetTableTaskLog
);
updateTableStatus
(
datasetTableId
,
datasetTable
,
JobStatus
.
Completed
,
execTime
);
updateTableStatus
(
datasetTableId
,
JobStatus
.
Completed
,
execTime
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
saveErrorLog
(
datasetTableTaskLog
,
e
);
saveErrorLog
(
datasetTableTaskLog
,
e
);
updateTableStatus
(
datasetTableId
,
datasetTable
,
JobStatus
.
Error
,
null
);
updateTableStatus
(
datasetTableId
,
JobStatus
.
Error
,
null
);
}
finally
{
}
finally
{
deleteFile
(
"incremental_add"
,
datasetTableId
);
deleteFile
(
"incremental_add"
,
datasetTableId
);
deleteFile
(
"incremental_delete"
,
datasetTableId
);
deleteFile
(
"incremental_delete"
,
datasetTableId
);
...
@@ -273,7 +272,7 @@ public class ExtractDataService {
...
@@ -273,7 +272,7 @@ public class ExtractDataService {
LogUtil
.
error
(
"Can not find DatasetTable: "
+
datasetTableId
);
LogUtil
.
error
(
"Can not find DatasetTable: "
+
datasetTableId
);
return
;
return
;
}
}
DatasetTableTask
datasetTableTask
=
data
setTableTaskMapper
.
selectByPrimaryKey
(
taskId
);
DatasetTableTask
datasetTableTask
=
data
SetTableTaskService
.
selectByPrimaryKey
(
taskId
);
if
(
datasetTableTask
==
null
)
{
if
(
datasetTableTask
==
null
)
{
return
;
return
;
}
}
...
@@ -284,19 +283,22 @@ public class ExtractDataService {
...
@@ -284,19 +283,22 @@ public class ExtractDataService {
}
}
Long
startTime
=
System
.
currentTimeMillis
();
Long
startTime
=
System
.
currentTimeMillis
();
if
(
existSyncTask
(
datasetTable
,
datasetTableTask
,
startTime
))
{
if
(
existSyncTask
(
datasetTable
.
getId
(),
datasetTableTask
.
getId
()
,
startTime
))
{
LogUtil
.
info
(
"Skip synchronization task for dataset due to exist others, dataset ID : "
+
datasetTableId
);
LogUtil
.
info
(
"Skip synchronization task for dataset due to exist others, dataset ID : "
+
datasetTableId
);
return
;
return
;
}
}
DatasetTableTaskLog
datasetTableTaskLog
=
getDatasetTableTaskLog
(
datasetTableId
,
taskId
,
startTime
);
DatasetTableTaskLog
datasetTableTaskLog
=
getDatasetTableTaskLog
(
datasetTableId
,
taskId
,
startTime
);
UpdateType
updateType
=
UpdateType
.
valueOf
(
type
);
UpdateType
updateType
=
UpdateType
.
valueOf
(
type
);
if
(
context
!=
null
)
{
if
(
context
!=
null
)
{
datasetTable
.
setQrtzInstance
(
context
.
getFireInstanceId
());
DatasetTable
datasetTableRecord
=
new
DatasetTable
();
datasetTableMapper
.
updateByPrimaryKeySelective
(
datasetTable
);
datasetTableRecord
.
setQrtzInstance
(
context
.
getFireInstanceId
());
DatasetTableExample
example
=
new
DatasetTableExample
();
example
.
createCriteria
().
andIdEqualTo
(
datasetTableId
);
dataSetTableService
.
updateByExampleSelective
(
datasetTableRecord
,
example
);
}
}
Datasource
datasource
=
new
Datasource
();
Datasource
datasource
=
new
Datasource
();
if
(
StringUtils
.
isNotEmpty
(
datasetTable
.
getDataSourceId
()))
{
if
(
StringUtils
.
isNotEmpty
(
datasetTable
.
getDataSourceId
()))
{
datasource
=
datasource
Mapper
.
selectByPrimaryKey
(
datasetTable
.
getDataSourceId
());
datasource
=
datasource
Service
.
get
(
datasetTable
.
getDataSourceId
());
}
else
{
}
else
{
datasource
.
setType
(
datasetTable
.
getType
());
datasource
.
setType
(
datasetTable
.
getType
());
}
}
...
@@ -337,7 +339,7 @@ public class ExtractDataService {
...
@@ -337,7 +339,7 @@ public class ExtractDataService {
System
.
out
.
println
(
ignore
.
getMessage
());
System
.
out
.
println
(
ignore
.
getMessage
());
}
}
try
{
try
{
updateTableStatus
(
datasetTableId
,
datasetTable
,
lastExecStatus
,
execTime
);
updateTableStatus
(
datasetTableId
,
lastExecStatus
,
execTime
);
}
catch
(
Exception
ignore
)
{
}
catch
(
Exception
ignore
)
{
System
.
out
.
println
(
ignore
.
getMessage
());
System
.
out
.
println
(
ignore
.
getMessage
());
}
}
...
@@ -395,7 +397,7 @@ public class ExtractDataService {
...
@@ -395,7 +397,7 @@ public class ExtractDataService {
}
catch
(
Exception
ignore
)
{
}
catch
(
Exception
ignore
)
{
}
}
try
{
try
{
updateTableStatus
(
datasetTableId
,
datasetTable
,
lastExecStatus
,
execTime
);
updateTableStatus
(
datasetTableId
,
lastExecStatus
,
execTime
);
}
catch
(
Exception
ignore
)
{
}
catch
(
Exception
ignore
)
{
}
}
}
}
...
@@ -459,9 +461,9 @@ public class ExtractDataService {
...
@@ -459,9 +461,9 @@ public class ExtractDataService {
String
dataFile
=
null
;
String
dataFile
=
null
;
String
script
=
null
;
String
script
=
null
;
String
streamLoadScript
=
""
;
String
streamLoadScript
=
""
;
if
(
kettleFilesKeep
)
{
if
(
kettleFilesKeep
)
{
streamLoadScript
=
shellScript
;
streamLoadScript
=
shellScript
;
}
else
{
}
else
{
streamLoadScript
=
shellScriptForDeleteFile
;
streamLoadScript
=
shellScriptForDeleteFile
;
}
}
switch
(
extractType
)
{
switch
(
extractType
)
{
...
@@ -586,14 +588,16 @@ public class ExtractDataService {
...
@@ -586,14 +588,16 @@ public class ExtractDataService {
});
});
}
}
private
void
updateTableStatus
(
String
datasetTableId
,
DatasetTable
datasetTable
,
JobStatus
completed
,
Long
execTime
)
{
private
void
updateTableStatus
(
String
datasetTableId
,
JobStatus
jobStatus
,
Long
execTime
)
{
datasetTable
.
setSyncStatus
(
completed
.
name
());
DatasetTable
datasetTableRecord
=
new
DatasetTable
();
datasetTableRecord
.
setId
(
datasetTableId
);
datasetTableRecord
.
setSyncStatus
(
jobStatus
.
name
());
if
(
execTime
!=
null
)
{
if
(
execTime
!=
null
)
{
datasetTable
.
setLastUpdateTime
(
execTime
);
datasetTable
Record
.
setLastUpdateTime
(
execTime
);
}
}
DatasetTableExample
example
=
new
DatasetTableExample
();
DatasetTableExample
example
=
new
DatasetTableExample
();
example
.
createCriteria
().
andIdEqualTo
(
datasetTableId
);
example
.
createCriteria
().
andIdEqualTo
(
datasetTableId
);
data
setTableMapper
.
updateByExampleSelective
(
datasetTable
,
example
);
data
SetTableService
.
updateByExampleSelective
(
datasetTableRecord
,
example
);
}
}
private
void
saveSuccessLog
(
DatasetTableTaskLog
datasetTableTaskLog
)
{
private
void
saveSuccessLog
(
DatasetTableTaskLog
datasetTableTaskLog
)
{
...
@@ -706,7 +710,7 @@ public class ExtractDataService {
...
@@ -706,7 +710,7 @@ public class ExtractDataService {
for
(
ExcelSheetData
sheet
:
excelXlsxReader
.
totalSheets
)
{
for
(
ExcelSheetData
sheet
:
excelXlsxReader
.
totalSheets
)
{
if
(
sheet
.
getExcelLable
().
equalsIgnoreCase
(
excelSheetData
.
getExcelLable
()))
{
if
(
sheet
.
getExcelLable
().
equalsIgnoreCase
(
excelSheetData
.
getExcelLable
()))
{
for
(
List
<
String
>
dataItem
:
sheet
.
getData
())
{
for
(
List
<
String
>
dataItem
:
sheet
.
getData
())
{
if
(
dataItem
.
size
()>
0
)
{
if
(
dataItem
.
size
()
>
0
)
{
data
.
add
(
dataItem
.
toArray
(
new
String
[
dataItem
.
size
()]));
data
.
add
(
dataItem
.
toArray
(
new
String
[
dataItem
.
size
()]));
}
}
}
}
...
@@ -792,9 +796,9 @@ public class ExtractDataService {
...
@@ -792,9 +796,9 @@ public class ExtractDataService {
DorisConfiguration
dorisConfiguration
=
new
Gson
().
fromJson
(
dorisDatasource
.
getConfiguration
(),
DorisConfiguration
.
class
);
DorisConfiguration
dorisConfiguration
=
new
Gson
().
fromJson
(
dorisDatasource
.
getConfiguration
(),
DorisConfiguration
.
class
);
String
columns
=
columnFields
+
",dataease_uuid"
;
String
columns
=
columnFields
+
",dataease_uuid"
;
String
streamLoadScript
=
""
;
String
streamLoadScript
=
""
;
if
(
kettleFilesKeep
)
{
if
(
kettleFilesKeep
)
{
streamLoadScript
=
shellScript
;
streamLoadScript
=
shellScript
;
}
else
{
}
else
{
streamLoadScript
=
shellScriptForDeleteFile
;
streamLoadScript
=
shellScriptForDeleteFile
;
}
}
switch
(
extractType
)
{
switch
(
extractType
)
{
...
@@ -897,9 +901,9 @@ public class ExtractDataService {
...
@@ -897,9 +901,9 @@ public class ExtractDataService {
case
StarRocks:
case
StarRocks:
MysqlConfiguration
mysqlConfiguration
=
new
Gson
().
fromJson
(
datasource
.
getConfiguration
(),
MysqlConfiguration
.
class
);
MysqlConfiguration
mysqlConfiguration
=
new
Gson
().
fromJson
(
datasource
.
getConfiguration
(),
MysqlConfiguration
.
class
);
dataMeta
=
new
DatabaseMeta
(
"db"
,
"MYSQL"
,
"Native"
,
mysqlConfiguration
.
getHost
().
trim
(),
mysqlConfiguration
.
getDataBase
().
trim
(),
mysqlConfiguration
.
getPort
().
toString
(),
mysqlConfiguration
.
getUsername
(),
mysqlConfiguration
.
getPassword
());
dataMeta
=
new
DatabaseMeta
(
"db"
,
"MYSQL"
,
"Native"
,
mysqlConfiguration
.
getHost
().
trim
(),
mysqlConfiguration
.
getDataBase
().
trim
(),
mysqlConfiguration
.
getPort
().
toString
(),
mysqlConfiguration
.
getUsername
(),
mysqlConfiguration
.
getPassword
());
if
(
StringUtils
.
isNotEmpty
(
mysqlConfiguration
.
getExtraParams
())
&&
mysqlConfiguration
.
getExtraParams
().
split
(
"&"
).
length
>
0
)
{
if
(
StringUtils
.
isNotEmpty
(
mysqlConfiguration
.
getExtraParams
())
&&
mysqlConfiguration
.
getExtraParams
().
split
(
"&"
).
length
>
0
)
{
String
[]
params
=
mysqlConfiguration
.
getExtraParams
().
split
(
"&"
);
String
[]
params
=
mysqlConfiguration
.
getExtraParams
().
split
(
"&"
);
for
(
int
i
=
0
;
i
<
params
.
length
;
i
++)
{
for
(
int
i
=
0
;
i
<
params
.
length
;
i
++)
{
dataMeta
.
addExtraOption
(
"MYSQL"
,
params
[
i
].
split
(
"="
)[
0
],
params
[
i
].
split
(
"="
)[
1
]);
dataMeta
.
addExtraOption
(
"MYSQL"
,
params
[
i
].
split
(
"="
)[
0
],
params
[
i
].
split
(
"="
)[
1
]);
}
}
}
}
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论