SpringBoot定时任务@Scheduled源码解析
# SpringBoot定时任务使用
# 入口类声明启用定时任务
使用@EnableScheduling
@SpringBootApplication
@MapperScan("com.winrh.mapper")
@ServletComponentScan("com.winrh.filter")
@EnableScheduling
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
2
3
4
5
6
7
8
9
10
11
# 创建定时任务类Task
@Slf4j
@Component
public class Task {
@Scheduled(cron = "*/10 * * * * ?")
public void cron(){
log.info("每隔10秒执行一次");
}
@Scheduled(fixedDelay = 4000)
public void fixedDelay(){
log.info("循环调用fixedDelay,延迟为4s");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 源码分析
# @EnableScheduling
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class})
@Documented
public @interface EnableScheduling {
}
2
3
4
5
6
通过@Import注解注入ScheduilingConfiguration.class这个配置类。
@Configuration
@Role(2)
public class SchedulingConfiguration {
public SchedulingConfiguration() {
}
@Bean(
name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"}
)
@Role(2)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
定义了ScheduledAnnotationBeanPostProcessor这么一个后置处理器(因为实现了DestructionAwareBeanPostProcessor,而DestructionAwareBeanPostProcessor又继承了BeanPostProcessor),因此,它会针对每一个bean的创建,扫描下这个bean有没有@Scheduled的方法。 后置处理器做了什么?
① 通过构造器或工厂方法创建 Bean实例 ② 为 Bean 的属性设置值和对其他 Bean 的引用 ③ 将 Bean 实例传递给 Bean 后置处理器BeanPostProcessor的 postProcessBeforeInitialization 方法 ④ 调用 Bean 的初始化方法 ⑤ 将 Bean 实例传递给 Bean 后置处理器BeanPostProcessor的 postProcessAfterInitialization方法 ⑥ Bean可以使用了 ⑦ 当容器关闭时, 调用 Bean 的销毁方法
# 看下ScheduledAnnotationBeanPostProcessor有哪些属性
//string属性解析器,用来解析${}对应的配置文件的属性,aware接口注入
@Nullable
private StringValueResolver embeddedValueResolver;
@Nullable
private String beanName;
@Nullable
private BeanFactory beanFactory; //aware接口注入
@Nullable
private ApplicationContext applicationContext; //aware接口注入
@Nullable
//定时任务线程池,如果不为空使用这个scheduler当作ScheduledTaskRegistrar的线程池
private Object scheduler;
//定时任务的注册器,通过这个类将定时任务委托给定时任务线程池
private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
//已检测的没有scheduled注解的类的集合
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap(64));
//保存class与scheduled方法的映射
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap(16);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 解析@Scheduled注解
在每个Bean的属性填充完之后,调用postProcessAfterInitialization方法,将带有@Scheduled 注解的方法,在拿到@Scheduled注解的方法后,调用processScheduled
public Object postProcessAfterInitialization(Object bean, String beanName) {
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
// nonAnnotatedClasses 是一个缓存,用于记录处理过程中所发现的不包含任何被@Scheduled注解的方法的类
if (!this.nonAnnotatedClasses.contains(targetClass)) {
// 获取类targetClass上所有使用注解@Scheduled的方法
// 某个方法上可能同时使用多个注解@Scheduled,
// 故以下annotatedMethods的每个Entry是一个方法对应一个@cheduled集合
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (method) -> {
Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
return !scheduledMethods.isEmpty() ? scheduledMethods : null;
});
if (annotatedMethods.isEmpty()) {
// 如果当前类targetClass不包含任何使用注解@Scheduled的方法,将其添加到this.nonAnnotatedClasses
this.nonAnnotatedClasses.add(targetClass);
if (this.logger.isTraceEnabled()) {
this.logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
}
} else {
// 当前类targetClass上找到了使用注解@Scheduled的方法,记录在annotatedMethods中
annotatedMethods.forEach((method, scheduledMethods) -> {
scheduledMethods.forEach((scheduled) -> {
//这里调用
this.processScheduled(scheduled, method, bean);
});
});
if (this.logger.isDebugEnabled()) {
this.logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods);
}
}
}
return bean;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
下面的processScheduled方法,会处理方法上的每个@Scheduled注解,生成一个ScheduledTask并登记到this.scheduledTasks。
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
// 用于记录当前 @Scheduled 注解是否已经被处理,初始化为false
boolean processedSchedule = false;
String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
// 用于保存针对当前@Scheduled注解生成的ScheduledTask
Set<ScheduledTask> tasks = new LinkedHashSet(4);
long initialDelay = scheduled.initialDelay();
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0L, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = parseDelayAsLong(initialDelayString);
} catch (RuntimeException var25) {
throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// 检查这是否是一个 cron 表达式类型的注解
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1L, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
} else {
timeZone = TimeZone.getDefault();
}
//这行比较关键
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
//省略了一部分解析过程,和解析cron是一样的
...
...
Assert.isTrue(processedSchedule, errorMessage);
synchronized(this.scheduledTasks) {
Set<ScheduledTask> registeredTasks = (Set)this.scheduledTasks.get(bean);
if (registeredTasks == null) {
registeredTasks = new LinkedHashSet(4);
this.scheduledTasks.put(bean, registeredTasks);
}
((Set)registeredTasks).addAll(tasks);
}
} catch (IllegalArgumentException var26) {
throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + var26.getMessage());
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
跟踪到this.registrar.scheduleCronTask(),这里跳转到ScheduledTaskRegistrar类的scheduleCronTask()
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
//创建ScheduledTask
scheduledTask = new ScheduledTask(task);
newTask = true;
}
//可以看到ScheduledTaskRegistrar的初始化方法中没有对taskScheduler赋值
//所以此时this.taskScheduler = null
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
} else {
//进入这里
this.addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return newTask ? scheduledTask : null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
在所有单例的Bean实例化完成后,调用afterSingletonsInstantiated() ,在Spring容器初始化完成后,触发ContextRefreshedEvent 事件,调用onApplicationEvent方法,执行finishRegistration()
private void finishRegistration() {
//用容器中的SchedulingConfigurer配置ScheduledTaskRegistrar,这里是根据ScheduledTaskRegistrar的引用,调用其set方法设置一些属性
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
//如果此时ScheduledTaskRegistrar的scheduler还是空,就从容器中取TaskScheduler(byName和byType),如果没有取到就根据容器中的ScheduledExecutorService实例化TaskScheduler
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory)this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
Iterator var3 = configurers.iterator();
while(var3.hasNext()) {
SchedulingConfigurer configurer = (SchedulingConfigurer)var3.next();
configurer.configureTasks(this.registrar);
}
}
//this.registrar.afterPropertiesSet();
//所以在容器中注入TaskScheduler或ScheduledExecutorService的类或者实现SchedulingConfigurer接口都可以配置定时任务的线程池
if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
try {
this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
} catch (NoUniqueBeanDefinitionException var9) {
this.logger.debug("Could not find unique TaskScheduler bean", var9);
try {
this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
} catch (NoSuchBeanDefinitionException var8) {
if (this.logger.isInfoEnabled()) {
this.logger.info("More than one TaskScheduler bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var9.getBeanNamesFound());
}
}
} catch (NoSuchBeanDefinitionException var10) {
this.logger.debug("Could not find default TaskScheduler bean", var10);
try {
this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
} catch (NoUniqueBeanDefinitionException var6) {
this.logger.debug("Could not find unique ScheduledExecutorService bean", var6);
try {
this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
} catch (NoSuchBeanDefinitionException var5) {
if (this.logger.isInfoEnabled()) {
this.logger.info("More than one ScheduledExecutorService bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var6.getBeanNamesFound());
}
}
} catch (NoSuchBeanDefinitionException var7) {
this.logger.debug("Could not find default ScheduledExecutorService bean", var7);
this.logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
}
}
}
this.registrar.afterPropertiesSet();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# afterPropertiesSet
public void afterPropertiesSet() {
this.scheduleTasks();
}
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
//省略了内容
...
...
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
//进入这里
addScheduledTask(scheduleCronTask(task));
}
}
//省略了内容
...
...
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
又进入了scheduleCronTask,在这里将任务提交给taskScheduler
//包装任务
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
//把task提交到线程池ThreadPoolTaskScheduler
//taskScheduler 默认实现类是ThreadPoolTaskScheduler
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
} else {
this.addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return newTask ? scheduledTask : null;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
我们看下默认实现类是ThreadPoolTaskScheduler.schedule
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
ErrorHandler errorHandler = this.errorHandler;
if (errorHandler == null) {
errorHandler = TaskUtils.getDefaultErrorHandler(true);
}
//创建一个线程,提交到ThreadPoolTaskScheduler
//executor 是ThreadPoolTaskScheduler
return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
接下来就是ScheduledThreadPoolExecutor中执行定时任务了,这里就不再赘述了,大体是有一个任务队列WorkerQueue,这个队列是按小顶堆排序的,排序规则是任务执行的时间,每次取出任务时,将任务提交给线程池执行,在执行任务的时候,计算下一次执行的时间,提交队列…
# 总结
SpringBoot 定时任务默认是通过 ScheduledThreadPoolExecutor实现的,需要注意的是默认线程数是1,在生产环境中合理安排任务周期,要不有可能出现任务阻塞。