Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
D
dataease
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
zhu
dataease
Commits
624bffd8
Unverified
提交
624bffd8
authored
1月 13, 2022
作者:
fit2cloud-chenyw
提交者:
GitHub
1月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1638 from dataease/pr@dev@feat_priority_task_pool
feat: 定时报告使用优先级线程池
上级
723d8fad
9cd89155
隐藏空白字符变更
内嵌
并排
正在显示
10 个修改的文件
包含
269 行增加
和
46 行删除
+269
-46
ExtTaskMapper.java
.../main/java/io/dataease/base/mapper/ext/ExtTaskMapper.java
+9
-0
ExtTaskMapper.xml
...c/main/java/io/dataease/base/mapper/ext/ExtTaskMapper.xml
+13
-0
PriorityThreadPoolExecutor.java
.../io/dataease/commons/pool/PriorityThreadPoolExecutor.java
+113
-0
PriorityThreadPoolProperties.java
...o/dataease/commons/pool/PriorityThreadPoolProperties.java
+17
-0
AsyncConfig.java
backend/src/main/java/io/dataease/config/AsyncConfig.java
+24
-0
ScheduleManager.java
.../main/java/io/dataease/job/sechedule/ScheduleManager.java
+20
-16
TaskHandler.java
.../java/io/dataease/job/sechedule/strategy/TaskHandler.java
+20
-24
EmailTaskHandler.java
...ataease/job/sechedule/strategy/impl/EmailTaskHandler.java
+32
-5
GlobalTaskStartListener.java
...in/java/io/dataease/listener/GlobalTaskStartListener.java
+1
-0
XEmailTaskServer.java
...ain/java/io/dataease/plugins/server/XEmailTaskServer.java
+20
-1
没有找到文件。
backend/src/main/java/io/dataease/base/mapper/ext/ExtTaskMapper.java
0 → 100644
浏览文件 @
624bffd8
package
io
.
dataease
.
base
.
mapper
.
ext
;
public
interface
ExtTaskMapper
{
int
runningCount
(
Long
taskId
);
void
resetRunnings
(
Long
taskId
);
}
backend/src/main/java/io/dataease/base/mapper/ext/ExtTaskMapper.xml
0 → 100644
浏览文件 @
624bffd8
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper
namespace=
"io.dataease.base.mapper.ext.ExtTaskMapper"
>
<select
id=
"runningCount"
resultType=
"java.lang.Integer"
>
select count(*) as count from sys_task_instance where task_id = #{taskId} and status = 0
</select>
<update
id=
"resetRunnings"
>
update sys_task_instance set status = -1, info = 'System Interrupt Error' where task_id = #{taskId} and status = 0
</update>
</mapper>
backend/src/main/java/io/dataease/commons/pool/PriorityThreadPoolExecutor.java
0 → 100644
浏览文件 @
624bffd8
package
io
.
dataease
.
commons
.
pool
;
import
java.util.concurrent.*
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
java.util.concurrent.atomic.AtomicLong
;
import
io.dataease.commons.utils.LogUtil
;
public
class
PriorityThreadPoolExecutor
extends
ThreadPoolExecutor
{
public
static
AtomicInteger
globaInteger
=
new
AtomicInteger
(
1
);
private
ThreadLocal
<
Integer
>
local
=
new
ThreadLocal
<
Integer
>()
{
@Override
protected
Integer
initialValue
()
{
return
1
;
}
};
public
PriorityThreadPoolExecutor
(
int
corePoolSize
,
int
maximumPoolSize
,
long
keepAliveTime
,
TimeUnit
unit
)
{
super
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
unit
,
getWorkQueue
());
}
public
PriorityThreadPoolExecutor
(
int
corePoolSize
,
int
maximumPoolSize
,
long
keepAliveTime
,
TimeUnit
unit
,
ThreadFactory
threadFactory
)
{
super
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
unit
,
getWorkQueue
(),
threadFactory
);
}
public
PriorityThreadPoolExecutor
(
int
corePoolSize
,
int
maximumPoolSize
,
long
keepAliveTime
,
TimeUnit
unit
,
RejectedExecutionHandler
handler
)
{
super
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
unit
,
getWorkQueue
(),
handler
);
}
public
PriorityThreadPoolExecutor
(
int
corePoolSize
,
int
maximumPoolSize
,
long
keepAliveTime
,
TimeUnit
unit
,
ThreadFactory
threadFactory
,
RejectedExecutionHandler
handler
)
{
super
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
unit
,
getWorkQueue
(),
threadFactory
,
handler
);
}
protected
static
PriorityBlockingQueue
getWorkQueue
()
{
return
new
PriorityBlockingQueue
();
}
@Override
public
void
execute
(
Runnable
command
)
{
int
andIncrement
=
globaInteger
.
getAndIncrement
();
Integer
theadInteger
=
local
.
get
();
try
{
if
(
theadInteger
==
0
)
{
this
.
execute
(
command
,
0
);
}
else
{
this
.
execute
(
command
,
andIncrement
);
}
}
finally
{
local
.
set
(
1
);
}
}
public
void
execute
(
Runnable
command
,
int
priority
)
{
super
.
execute
(
new
PriorityRunnable
(
command
,
priority
));
}
public
<
T
>
Future
<
T
>
submit
(
Callable
<
T
>
task
,
int
priority
)
{
local
.
set
(
priority
);
return
super
.
submit
(
task
);
}
protected
static
class
PriorityRunnable
<
E
extends
Comparable
<?
super
E
>>
implements
Runnable
,
Comparable
<
PriorityRunnable
<
E
>>
{
private
final
static
AtomicLong
seq
=
new
AtomicLong
();
private
final
long
seqNum
;
Runnable
run
;
private
int
priority
;
public
PriorityRunnable
(
Runnable
run
,
int
priority
)
{
seqNum
=
seq
.
getAndIncrement
();
this
.
run
=
run
;
this
.
priority
=
priority
;
}
public
int
getPriority
()
{
return
priority
;
}
public
void
setPriority
(
int
priority
)
{
this
.
priority
=
priority
;
}
public
Runnable
getRun
()
{
return
run
;
}
@Override
public
void
run
()
{
LogUtil
.
info
(
"number "
+
priority
+
" is starting..."
);
this
.
run
.
run
();
}
@Override
public
int
compareTo
(
PriorityRunnable
<
E
>
other
)
{
int
res
=
0
;
if
(
this
.
priority
==
other
.
priority
)
{
if
(
other
.
run
!=
this
.
run
)
{
// ASC
res
=
(
seqNum
<
other
.
seqNum
?
-
1
:
1
);
}
}
else
{
// DESC
res
=
this
.
priority
>
other
.
priority
?
1
:
-
1
;
}
return
res
;
}
}
}
backend/src/main/java/io/dataease/commons/pool/PriorityThreadPoolProperties.java
0 → 100644
浏览文件 @
624bffd8
package
io
.
dataease
.
commons
.
pool
;
import
org.springframework.boot.context.properties.ConfigurationProperties
;
import
org.springframework.stereotype.Component
;
import
lombok.Data
;
@ConfigurationProperties
(
prefix
=
"detask"
,
ignoreInvalidFields
=
true
)
@Data
@Component
public
class
PriorityThreadPoolProperties
{
private
int
corePoolSize
=
2
;
private
int
maximumPoolSize
=
100
;
private
int
keepAliveTime
=
60
;
}
backend/src/main/java/io/dataease/config/AsyncConfig.java
浏览文件 @
624bffd8
package
io
.
dataease
.
config
;
package
io
.
dataease
.
config
;
import
java.util.concurrent.TimeUnit
;
import
javax.annotation.Resource
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Bean
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.core.task.AsyncTaskExecutor
;
import
org.springframework.core.task.AsyncTaskExecutor
;
import
org.springframework.scheduling.annotation.EnableAsync
;
import
org.springframework.scheduling.annotation.EnableAsync
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
;
import
io.dataease.commons.pool.PriorityThreadPoolExecutor
;
import
io.dataease.commons.pool.PriorityThreadPoolProperties
;
@EnableAsync
(
proxyTargetClass
=
true
)
@EnableAsync
(
proxyTargetClass
=
true
)
@Configuration
@Configuration
public
class
AsyncConfig
{
public
class
AsyncConfig
{
@Resource
private
PriorityThreadPoolProperties
priorityThreadPoolProperties
;
@Bean
@Bean
public
AsyncTaskExecutor
taskExecutor
()
{
public
AsyncTaskExecutor
taskExecutor
()
{
ThreadPoolTaskExecutor
executor
=
new
ThreadPoolTaskExecutor
();
ThreadPoolTaskExecutor
executor
=
new
ThreadPoolTaskExecutor
();
...
@@ -18,4 +28,18 @@ public class AsyncConfig {
...
@@ -18,4 +28,18 @@ public class AsyncConfig {
executor
.
setMaxPoolSize
(
10
);
executor
.
setMaxPoolSize
(
10
);
return
executor
;
return
executor
;
}
}
@Bean
public
PriorityThreadPoolExecutor
priorityExecutor
()
{
int
corePoolSize
=
priorityThreadPoolProperties
.
getCorePoolSize
();
int
maximumPoolSize
=
priorityThreadPoolProperties
.
getMaximumPoolSize
();
int
keepAliveTime
=
priorityThreadPoolProperties
.
getKeepAliveTime
();
PriorityThreadPoolExecutor
executor
=
new
PriorityThreadPoolExecutor
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
TimeUnit
.
SECONDS
);
return
executor
;
}
}
}
backend/src/main/java/io/dataease/job/sechedule/ScheduleManager.java
浏览文件 @
624bffd8
...
@@ -28,7 +28,7 @@ public class ScheduleManager {
...
@@ -28,7 +28,7 @@ public class ScheduleManager {
* @throws SchedulerException
* @throws SchedulerException
*/
*/
public
void
addSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
<?
extends
Job
>
cls
,
int
repeatIntervalTime
,
public
void
addSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
<?
extends
Job
>
cls
,
int
repeatIntervalTime
,
JobDataMap
jobDataMap
)
throws
SchedulerException
{
JobDataMap
jobDataMap
)
throws
SchedulerException
{
JobBuilder
jobBuilder
=
JobBuilder
.
newJob
(
cls
).
withIdentity
(
jobKey
);
JobBuilder
jobBuilder
=
JobBuilder
.
newJob
(
cls
).
withIdentity
(
jobKey
);
...
@@ -46,7 +46,8 @@ public class ScheduleManager {
...
@@ -46,7 +46,8 @@ public class ScheduleManager {
scheduler
.
scheduleJob
(
jd
,
trigger
);
scheduler
.
scheduleJob
(
jd
,
trigger
);
}
}
public
void
addSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
<?
extends
Job
>
cls
,
int
repeatIntervalTime
)
throws
SchedulerException
{
public
void
addSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
<?
extends
Job
>
cls
,
int
repeatIntervalTime
)
throws
SchedulerException
{
addSimpleJob
(
jobKey
,
triggerKey
,
cls
,
repeatIntervalTime
);
addSimpleJob
(
jobKey
,
triggerKey
,
cls
,
repeatIntervalTime
);
}
}
...
@@ -59,7 +60,8 @@ public class ScheduleManager {
...
@@ -59,7 +60,8 @@ public class ScheduleManager {
* @param cron
* @param cron
* @param jobDataMap
* @param jobDataMap
*/
*/
public
void
addCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
,
JobDataMap
jobDataMap
)
{
public
void
addCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
,
JobDataMap
jobDataMap
)
{
try
{
try
{
LogUtil
.
info
(
"addCronJob: "
+
triggerKey
.
getName
()
+
","
+
triggerKey
.
getGroup
());
LogUtil
.
info
(
"addCronJob: "
+
triggerKey
.
getName
()
+
","
+
triggerKey
.
getGroup
());
...
@@ -99,7 +101,8 @@ public class ScheduleManager {
...
@@ -99,7 +101,8 @@ public class ScheduleManager {
}
}
}
}
public
void
addCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
)
{
public
void
addCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
)
{
addCronJob
(
jobKey
,
triggerKey
,
jobClass
,
cron
,
startTime
,
endTime
,
null
);
addCronJob
(
jobKey
,
triggerKey
,
jobClass
,
cron
,
startTime
,
endTime
,
null
);
}
}
...
@@ -140,7 +143,8 @@ public class ScheduleManager {
...
@@ -140,7 +143,8 @@ public class ScheduleManager {
* @param cron
* @param cron
* @throws SchedulerException
* @throws SchedulerException
*/
*/
public
void
modifyCronJobTime
(
TriggerKey
triggerKey
,
String
cron
,
Date
startTime
,
Date
endTime
)
throws
SchedulerException
{
public
void
modifyCronJobTime
(
TriggerKey
triggerKey
,
String
cron
,
Date
startTime
,
Date
endTime
)
throws
SchedulerException
{
LogUtil
.
info
(
"modifyCronJobTime: "
+
triggerKey
.
getName
()
+
","
+
triggerKey
.
getGroup
());
LogUtil
.
info
(
"modifyCronJobTime: "
+
triggerKey
.
getName
()
+
","
+
triggerKey
.
getGroup
());
...
@@ -151,7 +155,6 @@ public class ScheduleManager {
...
@@ -151,7 +155,6 @@ public class ScheduleManager {
return
;
return
;
}
}
/** 方式一 :调用 rescheduleJob 开始 */
/** 方式一 :调用 rescheduleJob 开始 */
TriggerBuilder
<
Trigger
>
triggerBuilder
=
TriggerBuilder
.
newTrigger
();
// 触发器
TriggerBuilder
<
Trigger
>
triggerBuilder
=
TriggerBuilder
.
newTrigger
();
// 触发器
...
@@ -279,7 +282,6 @@ public class ScheduleManager {
...
@@ -279,7 +282,6 @@ public class ScheduleManager {
}
}
}
}
public
static
void
startJobs
(
Scheduler
sched
)
{
public
static
void
startJobs
(
Scheduler
sched
)
{
try
{
try
{
sched
.
start
();
sched
.
start
();
...
@@ -289,7 +291,6 @@ public class ScheduleManager {
...
@@ -289,7 +291,6 @@ public class ScheduleManager {
}
}
}
}
public
void
shutdownJobs
(
Scheduler
sched
)
{
public
void
shutdownJobs
(
Scheduler
sched
)
{
try
{
try
{
if
(!
sched
.
isShutdown
())
{
if
(!
sched
.
isShutdown
())
{
...
@@ -312,7 +313,7 @@ public class ScheduleManager {
...
@@ -312,7 +313,7 @@ public class ScheduleManager {
* @throws SchedulerException
* @throws SchedulerException
*/
*/
public
void
addOrUpdateSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
public
void
addOrUpdateSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
int
intervalTime
,
JobDataMap
jobDataMap
)
throws
SchedulerException
{
int
intervalTime
,
JobDataMap
jobDataMap
)
throws
SchedulerException
{
if
(
scheduler
.
checkExists
(
triggerKey
))
{
if
(
scheduler
.
checkExists
(
triggerKey
))
{
modifySimpleJobTime
(
triggerKey
,
intervalTime
);
modifySimpleJobTime
(
triggerKey
,
intervalTime
);
...
@@ -323,7 +324,7 @@ public class ScheduleManager {
...
@@ -323,7 +324,7 @@ public class ScheduleManager {
}
}
public
void
addOrUpdateSingleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
public
void
addOrUpdateSingleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
Date
date
,
JobDataMap
jobDataMap
)
throws
SchedulerException
{
Date
date
,
JobDataMap
jobDataMap
)
throws
SchedulerException
{
if
(
scheduler
.
checkExists
(
triggerKey
))
{
if
(
scheduler
.
checkExists
(
triggerKey
))
{
modifySingleJobTime
(
triggerKey
,
date
);
modifySingleJobTime
(
triggerKey
,
date
);
}
else
{
}
else
{
...
@@ -333,15 +334,15 @@ public class ScheduleManager {
...
@@ -333,15 +334,15 @@ public class ScheduleManager {
}
}
public
void
addOrUpdateSingleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
public
void
addOrUpdateSingleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
Date
date
)
throws
SchedulerException
{
Date
date
)
throws
SchedulerException
{
addOrUpdateSingleJob
(
jobKey
,
triggerKey
,
clz
,
date
,
null
);
addOrUpdateSingleJob
(
jobKey
,
triggerKey
,
clz
,
date
,
null
);
}
}
public
void
addOrUpdateSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
int
intervalTime
)
throws
SchedulerException
{
public
void
addOrUpdateSimpleJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
clz
,
int
intervalTime
)
throws
SchedulerException
{
addOrUpdateSimpleJob
(
jobKey
,
triggerKey
,
clz
,
intervalTime
,
null
);
addOrUpdateSimpleJob
(
jobKey
,
triggerKey
,
clz
,
intervalTime
,
null
);
}
}
/**
/**
* 添加或修改 cronJob
* 添加或修改 cronJob
*
*
...
@@ -352,7 +353,8 @@ public class ScheduleManager {
...
@@ -352,7 +353,8 @@ public class ScheduleManager {
* @param jobDataMap
* @param jobDataMap
* @throws SchedulerException
* @throws SchedulerException
*/
*/
public
void
addOrUpdateCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
,
JobDataMap
jobDataMap
)
throws
SchedulerException
{
public
void
addOrUpdateCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
,
JobDataMap
jobDataMap
)
throws
SchedulerException
{
LogUtil
.
info
(
"AddOrUpdateCronJob: "
+
jobKey
.
getName
()
+
","
+
triggerKey
.
getGroup
());
LogUtil
.
info
(
"AddOrUpdateCronJob: "
+
jobKey
.
getName
()
+
","
+
triggerKey
.
getGroup
());
...
@@ -363,7 +365,8 @@ public class ScheduleManager {
...
@@ -363,7 +365,8 @@ public class ScheduleManager {
}
}
}
}
public
void
addOrUpdateCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
)
throws
SchedulerException
{
public
void
addOrUpdateCronJob
(
JobKey
jobKey
,
TriggerKey
triggerKey
,
Class
jobClass
,
String
cron
,
Date
startTime
,
Date
endTime
)
throws
SchedulerException
{
addOrUpdateCronJob
(
jobKey
,
triggerKey
,
jobClass
,
cron
,
startTime
,
endTime
,
null
);
addOrUpdateCronJob
(
jobKey
,
triggerKey
,
jobClass
,
cron
,
startTime
,
endTime
,
null
);
}
}
...
@@ -398,7 +401,8 @@ public class ScheduleManager {
...
@@ -398,7 +401,8 @@ public class ScheduleManager {
if
(!
CronExpression
.
isValidExpression
(
cron
))
{
if
(!
CronExpression
.
isValidExpression
(
cron
))
{
DataEaseException
.
throwException
(
"cron :"
+
cron
+
" error"
);
DataEaseException
.
throwException
(
"cron :"
+
cron
+
" error"
);
}
}
return
TriggerBuilder
.
newTrigger
().
withIdentity
(
"Calculate Date"
).
withSchedule
(
CronScheduleBuilder
.
cronSchedule
(
cron
)).
build
();
return
TriggerBuilder
.
newTrigger
().
withIdentity
(
"Calculate Date"
)
.
withSchedule
(
CronScheduleBuilder
.
cronSchedule
(
cron
)).
build
();
}
}
...
...
backend/src/main/java/io/dataease/job/sechedule/strategy/TaskHandler.java
浏览文件 @
624bffd8
...
@@ -13,9 +13,6 @@ import java.util.Date;
...
@@ -13,9 +13,6 @@ import java.util.Date;
public
abstract
class
TaskHandler
implements
InitializingBean
{
public
abstract
class
TaskHandler
implements
InitializingBean
{
private
static
final
String
[]
week
=
{
"SUNDAY"
,
"MONDAY"
,
"TUESDAY"
,
"WEDNESDAY"
,
"THURSDAY"
,
"FRIDAY"
,
"SATURDAY"
};
public
void
addTask
(
ScheduleManager
scheduleManager
,
GlobalTaskEntity
taskEntity
)
throws
Exception
{
public
void
addTask
(
ScheduleManager
scheduleManager
,
GlobalTaskEntity
taskEntity
)
throws
Exception
{
// 1。首先看看是否过期
// 1。首先看看是否过期
Long
endTime
=
taskEntity
.
getEndTime
();
Long
endTime
=
taskEntity
.
getEndTime
();
...
@@ -30,12 +27,11 @@ public abstract class TaskHandler implements InitializingBean {
...
@@ -30,12 +27,11 @@ public abstract class TaskHandler implements InitializingBean {
if
(
ObjectUtils
.
isNotEmpty
(
taskEntity
.
getEndTime
()))
{
if
(
ObjectUtils
.
isNotEmpty
(
taskEntity
.
getEndTime
()))
{
new
Date
(
taskEntity
.
getEndTime
());
new
Date
(
taskEntity
.
getEndTime
());
}
}
Class
executor
=
this
.
getClass
();
Class
<?
extends
TaskHandler
>
executor
=
this
.
getClass
();
String
cron
=
cron
(
taskEntity
);
String
cron
=
cron
(
taskEntity
);
scheduleManager
.
addOrUpdateCronJob
(
jobKey
,
triggerKey
,
executor
,
cron
,
start
,
end
,
jobDataMap
(
taskEntity
));
scheduleManager
.
addOrUpdateCronJob
(
jobKey
,
triggerKey
,
executor
,
cron
,
start
,
end
,
jobDataMap
(
taskEntity
));
}
}
protected
abstract
JobDataMap
jobDataMap
(
GlobalTaskEntity
taskEntity
);
protected
abstract
JobDataMap
jobDataMap
(
GlobalTaskEntity
taskEntity
);
private
String
cron
(
GlobalTaskEntity
taskEntity
)
{
private
String
cron
(
GlobalTaskEntity
taskEntity
)
{
...
@@ -54,36 +50,34 @@ public abstract class TaskHandler implements InitializingBean {
...
@@ -54,36 +50,34 @@ public abstract class TaskHandler implements InitializingBean {
instance
.
setTime
(
date
);
instance
.
setTime
(
date
);
if
(
taskEntity
.
getRateType
()
==
0
)
{
if
(
taskEntity
.
getRateType
()
==
0
)
{
return
return
instance
.
get
(
Calendar
.
SECOND
)
+
" "
+
instance
.
get
(
Calendar
.
SECOND
)
+
" "
+
instance
.
get
(
Calendar
.
MINUTE
)
+
" "
+
instance
.
get
(
Calendar
.
MINUTE
)
+
" "
+
instance
.
get
(
Calendar
.
HOUR_OF_DAY
)
+
" * * ?"
;
instance
.
get
(
Calendar
.
HOUR_OF_DAY
)
+
" * * ?"
;
}
}
if
(
taskEntity
.
getRateType
()
==
1
)
{
if
(
taskEntity
.
getRateType
()
==
1
)
{
return
return
instance
.
get
(
Calendar
.
SECOND
)
+
" "
+
instance
.
get
(
Calendar
.
SECOND
)
+
" "
+
instance
.
get
(
Calendar
.
MINUTE
)
+
" "
+
instance
.
get
(
Calendar
.
MINUTE
)
+
" "
+
instance
.
get
(
Calendar
.
HOUR_OF_DAY
)
+
" ? * "
+
instance
.
get
(
Calendar
.
HOUR_OF_DAY
)
+
" ? * "
+
getDayOfWeek
(
instance
);
getDayOfWeek
(
instance
);
}
}
if
(
taskEntity
.
getRateType
()
==
2
)
{
if
(
taskEntity
.
getRateType
()
==
2
)
{
return
return
instance
.
get
(
Calendar
.
SECOND
)
+
" "
+
instance
.
get
(
Calendar
.
SECOND
)
+
" "
+
instance
.
get
(
Calendar
.
MINUTE
)
+
" "
+
instance
.
get
(
Calendar
.
MINUTE
)
+
" "
+
instance
.
get
(
Calendar
.
HOUR_OF_DAY
)
+
" "
+
instance
.
get
(
Calendar
.
HOUR_OF_DAY
)
+
" "
+
instance
.
get
(
Calendar
.
DATE
)
+
" * ?"
;
instance
.
get
(
Calendar
.
DATE
)
+
" * ?"
;
}
}
return
null
;
return
null
;
}
}
public
abstract
void
resetRunningInstance
(
Long
taskId
);
private
String
getDayOfWeek
(
Calendar
instance
)
{
private
String
getDayOfWeek
(
Calendar
instance
)
{
int
index
=
instance
.
get
(
Calendar
.
DAY_OF_WEEK
);
int
index
=
instance
.
get
(
Calendar
.
DAY_OF_WEEK
);
index
=
(
index
+
1
)
%
7
;
index
=
(
index
+
1
)
%
7
;
return
String
.
valueOf
(
index
);
return
String
.
valueOf
(
index
);
}
}
public
void
removeTask
(
ScheduleManager
scheduleManager
,
GlobalTaskEntity
taskEntity
)
{
public
void
removeTask
(
ScheduleManager
scheduleManager
,
GlobalTaskEntity
taskEntity
)
{
JobKey
jobKey
=
new
JobKey
(
taskEntity
.
getTaskId
().
toString
());
JobKey
jobKey
=
new
JobKey
(
taskEntity
.
getTaskId
().
toString
());
TriggerKey
triggerKey
=
new
TriggerKey
(
taskEntity
.
getTaskId
().
toString
());
TriggerKey
triggerKey
=
new
TriggerKey
(
taskEntity
.
getTaskId
().
toString
());
...
@@ -95,14 +89,16 @@ public abstract class TaskHandler implements InitializingBean {
...
@@ -95,14 +89,16 @@ public abstract class TaskHandler implements InitializingBean {
scheduleManager
.
fireNow
(
jobKey
);
scheduleManager
.
fireNow
(
jobKey
);
}
}
// 判断任务是否过期
//判断任务是否过期
public
Boolean
taskExpire
(
Long
endTime
)
{
public
Boolean
taskExpire
(
Long
endTime
)
{
if
(
ObjectUtils
.
isEmpty
(
endTime
))
return
false
;
if
(
ObjectUtils
.
isEmpty
(
endTime
))
return
false
;
Long
now
=
System
.
currentTimeMillis
();
Long
now
=
System
.
currentTimeMillis
();
return
now
>
endTime
;
return
now
>
endTime
;
}
}
protected
abstract
Boolean
taskIsRunning
(
Long
taskId
);
@Override
@Override
public
void
afterPropertiesSet
()
throws
Exception
{
public
void
afterPropertiesSet
()
throws
Exception
{
String
beanName
=
null
;
String
beanName
=
null
;
...
...
backend/src/main/java/io/dataease/job/sechedule/strategy/impl/EmailTaskHandler.java
浏览文件 @
624bffd8
...
@@ -5,6 +5,7 @@ import io.dataease.auth.entity.TokenInfo;
...
@@ -5,6 +5,7 @@ import io.dataease.auth.entity.TokenInfo;
import
io.dataease.auth.service.AuthUserService
;
import
io.dataease.auth.service.AuthUserService
;
import
io.dataease.auth.service.impl.AuthUserServiceImpl
;
import
io.dataease.auth.service.impl.AuthUserServiceImpl
;
import
io.dataease.auth.util.JWTUtils
;
import
io.dataease.auth.util.JWTUtils
;
import
io.dataease.base.mapper.ext.ExtTaskMapper
;
import
io.dataease.commons.utils.CommonBeanFactory
;
import
io.dataease.commons.utils.CommonBeanFactory
;
import
io.dataease.commons.utils.LogUtil
;
import
io.dataease.commons.utils.LogUtil
;
import
io.dataease.commons.utils.ServletUtils
;
import
io.dataease.commons.utils.ServletUtils
;
...
@@ -20,6 +21,7 @@ import io.dataease.service.system.EmailService;
...
@@ -20,6 +21,7 @@ import io.dataease.service.system.EmailService;
import
org.apache.commons.lang3.ObjectUtils
;
import
org.apache.commons.lang3.ObjectUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.quartz.*
;
import
org.quartz.*
;
import
org.springframework.scheduling.annotation.Async
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.Resource
;
import
javax.annotation.Resource
;
...
@@ -46,6 +48,16 @@ public class EmailTaskHandler extends TaskHandler implements Job {
...
@@ -46,6 +48,16 @@ public class EmailTaskHandler extends TaskHandler implements Job {
return
jobDataMap
;
return
jobDataMap
;
}
}
public
EmailTaskHandler
proxy
()
{
return
CommonBeanFactory
.
getBean
(
EmailTaskHandler
.
class
);
}
@Override
protected
Boolean
taskIsRunning
(
Long
taskId
)
{
ExtTaskMapper
extTaskMapper
=
CommonBeanFactory
.
getBean
(
ExtTaskMapper
.
class
);
return
extTaskMapper
.
runningCount
(
taskId
)
>
0
;
}
@Override
@Override
public
void
execute
(
JobExecutionContext
context
)
throws
JobExecutionException
{
public
void
execute
(
JobExecutionContext
context
)
throws
JobExecutionException
{
// 插件没有加载 空转
// 插件没有加载 空转
...
@@ -54,11 +66,16 @@ public class EmailTaskHandler extends TaskHandler implements Job {
...
@@ -54,11 +66,16 @@ public class EmailTaskHandler extends TaskHandler implements Job {
JobDataMap
jobDataMap
=
context
.
getJobDetail
().
getJobDataMap
();
JobDataMap
jobDataMap
=
context
.
getJobDetail
().
getJobDataMap
();
GlobalTaskEntity
taskEntity
=
(
GlobalTaskEntity
)
jobDataMap
.
get
(
"taskEntity"
);
GlobalTaskEntity
taskEntity
=
(
GlobalTaskEntity
)
jobDataMap
.
get
(
"taskEntity"
);
ScheduleManager
scheduleManager
=
SpringContextUtil
.
getBean
(
ScheduleManager
.
class
);
if
(
taskExpire
(
taskEntity
.
getEndTime
()))
{
if
(
taskExpire
(
taskEntity
.
getEndTime
()))
{
ScheduleManager
scheduleManager
=
SpringContextUtil
.
getBean
(
ScheduleManager
.
class
);
removeTask
(
scheduleManager
,
taskEntity
);
removeTask
(
scheduleManager
,
taskEntity
);
return
;
return
;
}
}
if
(
taskIsRunning
(
taskEntity
.
getTaskId
()))
{
LogUtil
.
info
(
"Skip synchronization task: {} ,due to task status is {}"
,
taskEntity
.
getTaskId
(),
"running"
);
return
;
}
GlobalTaskInstance
taskInstance
=
buildInstance
(
taskEntity
);
GlobalTaskInstance
taskInstance
=
buildInstance
(
taskEntity
);
Long
instanceId
=
saveInstance
(
taskInstance
);
Long
instanceId
=
saveInstance
(
taskInstance
);
...
@@ -67,10 +84,15 @@ public class EmailTaskHandler extends TaskHandler implements Job {
...
@@ -67,10 +84,15 @@ public class EmailTaskHandler extends TaskHandler implements Job {
XpackEmailTemplateDTO
emailTemplate
=
(
XpackEmailTemplateDTO
)
jobDataMap
.
get
(
"emailTemplate"
);
XpackEmailTemplateDTO
emailTemplate
=
(
XpackEmailTemplateDTO
)
jobDataMap
.
get
(
"emailTemplate"
);
SysUserEntity
creator
=
(
SysUserEntity
)
jobDataMap
.
get
(
"creator"
);
SysUserEntity
creator
=
(
SysUserEntity
)
jobDataMap
.
get
(
"creator"
);
LogUtil
.
info
(
"start execute send panel report task..."
);
LogUtil
.
info
(
"start execute send panel report task..."
);
sendReport
(
taskInstance
,
emailTemplate
,
creator
);
proxy
().
sendReport
(
taskInstance
,
emailTemplate
,
creator
);
}
}
@Override
public
void
resetRunningInstance
(
Long
taskId
)
{
ExtTaskMapper
extTaskMapper
=
CommonBeanFactory
.
getBean
(
ExtTaskMapper
.
class
);
extTaskMapper
.
resetRunnings
(
taskId
);
}
public
Long
saveInstance
(
GlobalTaskInstance
taskInstance
)
{
public
Long
saveInstance
(
GlobalTaskInstance
taskInstance
)
{
EmailXpackService
emailXpackService
=
SpringContextUtil
.
getBean
(
EmailXpackService
.
class
);
EmailXpackService
emailXpackService
=
SpringContextUtil
.
getBean
(
EmailXpackService
.
class
);
...
@@ -99,11 +121,12 @@ public class EmailTaskHandler extends TaskHandler implements Job {
...
@@ -99,11 +121,12 @@ public class EmailTaskHandler extends TaskHandler implements Job {
emailXpackService
.
saveInstance
(
taskInstance
);
emailXpackService
.
saveInstance
(
taskInstance
);
}
}
@Async
(
"priorityExecutor"
)
public
void
sendReport
(
GlobalTaskInstance
taskInstance
,
XpackEmailTemplateDTO
emailTemplateDTO
,
public
void
sendReport
(
GlobalTaskInstance
taskInstance
,
XpackEmailTemplateDTO
emailTemplateDTO
,
SysUserEntity
user
)
{
SysUserEntity
user
)
{
EmailXpackService
emailXpackService
=
SpringContextUtil
.
getBean
(
EmailXpackService
.
class
);
EmailXpackService
emailXpackService
=
SpringContextUtil
.
getBean
(
EmailXpackService
.
class
);
try
{
try
{
String
panelId
=
emailTemplateDTO
.
getPanelId
();
String
panelId
=
emailTemplateDTO
.
getPanelId
();
String
url
=
panelUrl
(
panelId
);
String
url
=
panelUrl
(
panelId
);
String
token
=
tokenByUser
(
user
);
String
token
=
tokenByUser
(
user
);
...
@@ -116,11 +139,15 @@ public class EmailTaskHandler extends TaskHandler implements Job {
...
@@ -116,11 +139,15 @@ public class EmailTaskHandler extends TaskHandler implements Job {
String
recipients
=
emailTemplateDTO
.
getRecipients
();
String
recipients
=
emailTemplateDTO
.
getRecipients
();
byte
[]
content
=
emailTemplateDTO
.
getContent
();
byte
[]
content
=
emailTemplateDTO
.
getContent
();
EmailService
emailService
=
SpringContextUtil
.
getBean
(
EmailService
.
class
);
EmailService
emailService
=
SpringContextUtil
.
getBean
(
EmailService
.
class
);
String
contentStr
=
""
;
String
contentStr
=
""
;
if
(
ObjectUtils
.
isNotEmpty
(
content
))
{
if
(
ObjectUtils
.
isNotEmpty
(
content
))
{
contentStr
=
new
String
(
content
,
"UTF-8"
);
contentStr
=
new
String
(
content
,
"UTF-8"
);
}
}
emailService
.
sendWithImage
(
recipients
,
emailTemplateDTO
.
getTitle
(),
contentStr
,
bytes
);
emailService
.
sendWithImage
(
recipients
,
emailTemplateDTO
.
getTitle
(),
contentStr
,
bytes
);
Thread
.
sleep
(
10000
);
success
(
taskInstance
);
success
(
taskInstance
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
error
(
taskInstance
,
e
);
error
(
taskInstance
,
e
);
...
...
backend/src/main/java/io/dataease/listener/GlobalTaskStartListener.java
浏览文件 @
624bffd8
...
@@ -32,6 +32,7 @@ public class GlobalTaskStartListener implements ApplicationListener<ApplicationR
...
@@ -32,6 +32,7 @@ public class GlobalTaskStartListener implements ApplicationListener<ApplicationR
tasks
.
stream
().
forEach
(
task
->
{
tasks
.
stream
().
forEach
(
task
->
{
TaskHandler
taskHandler
=
TaskStrategyFactory
.
getInvokeStrategy
(
task
.
getTaskType
());
TaskHandler
taskHandler
=
TaskStrategyFactory
.
getInvokeStrategy
(
task
.
getTaskType
());
try
{
try
{
taskHandler
.
resetRunningInstance
(
task
.
getTaskId
());
taskHandler
.
addTask
(
scheduleManager
,
task
);
taskHandler
.
addTask
(
scheduleManager
,
task
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
e
.
printStackTrace
();
...
...
backend/src/main/java/io/dataease/plugins/server/XEmailTaskServer.java
浏览文件 @
624bffd8
...
@@ -3,6 +3,7 @@ package io.dataease.plugins.server;
...
@@ -3,6 +3,7 @@ package io.dataease.plugins.server;
import
com.github.pagehelper.Page
;
import
com.github.pagehelper.Page
;
import
com.github.pagehelper.PageHelper
;
import
com.github.pagehelper.PageHelper
;
import
io.dataease.commons.exception.DEException
;
import
io.dataease.commons.exception.DEException
;
import
io.dataease.commons.pool.PriorityThreadPoolExecutor
;
import
io.dataease.commons.utils.*
;
import
io.dataease.commons.utils.*
;
import
io.dataease.plugins.common.entity.GlobalTaskEntity
;
import
io.dataease.plugins.common.entity.GlobalTaskEntity
;
import
io.dataease.plugins.common.entity.GlobalTaskInstance
;
import
io.dataease.plugins.common.entity.GlobalTaskInstance
;
...
@@ -23,6 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired;
...
@@ -23,6 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.web.bind.annotation.*
;
import
org.springframework.web.bind.annotation.*
;
import
java.util.List
;
import
java.util.List
;
import
java.util.concurrent.Future
;
import
javax.annotation.Resource
;
@Api
(
tags
=
"xpack:定时报告"
)
@Api
(
tags
=
"xpack:定时报告"
)
@RequestMapping
(
"/plugin/task"
)
@RequestMapping
(
"/plugin/task"
)
...
@@ -32,6 +36,9 @@ public class XEmailTaskServer {
...
@@ -32,6 +36,9 @@ public class XEmailTaskServer {
@Autowired
@Autowired
private
ScheduleService
scheduleService
;
private
ScheduleService
scheduleService
;
@Resource
private
PriorityThreadPoolExecutor
priorityExecutor
;
@PostMapping
(
"/queryTasks/{goPage}/{pageSize}"
)
@PostMapping
(
"/queryTasks/{goPage}/{pageSize}"
)
public
Pager
<
List
<
XpackTaskGridDTO
>>
queryTask
(
@PathVariable
int
goPage
,
@PathVariable
int
pageSize
,
public
Pager
<
List
<
XpackTaskGridDTO
>>
queryTask
(
@PathVariable
int
goPage
,
@PathVariable
int
pageSize
,
@RequestBody
XpackGridRequest
request
)
{
@RequestBody
XpackGridRequest
request
)
{
...
@@ -85,7 +92,19 @@ public class XEmailTaskServer {
...
@@ -85,7 +92,19 @@ public class XEmailTaskServer {
String
token
=
ServletUtils
.
getToken
();
String
token
=
ServletUtils
.
getToken
();
String
fileId
=
null
;
String
fileId
=
null
;
try
{
try
{
fileId
=
emailXpackService
.
print
(
url
,
token
,
buildPixel
(
request
.
getPixel
()));
Future
<?>
future
=
priorityExecutor
.
submit
(()
->
{
try
{
return
emailXpackService
.
print
(
url
,
token
,
buildPixel
(
request
.
getPixel
()));
}
catch
(
Exception
e
)
{
LogUtil
.
error
(
e
.
getMessage
(),
e
);
DEException
.
throwException
(
"预览失败,请联系管理员"
);
}
return
null
;
},
0
);
Object
object
=
future
.
get
();
if
(
ObjectUtils
.
isNotEmpty
(
object
))
{
fileId
=
object
.
toString
();
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
LogUtil
.
error
(
e
.
getMessage
(),
e
);
LogUtil
.
error
(
e
.
getMessage
(),
e
);
DEException
.
throwException
(
"预览失败,请联系管理员"
);
DEException
.
throwException
(
"预览失败,请联系管理员"
);
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论