程序员子龙(Java面试 + Java学习) 程序员子龙(Java面试 + Java学习)
首页
学习指南
工具
开源项目
技术书籍

程序员子龙

Java 开发从业者
首页
学习指南
工具
开源项目
技术书籍
  • 基础

  • JVM

  • Spring

  • 并发编程

  • Mybatis

  • 网络编程

  • 数据库

  • 缓存

  • 设计模式

  • 分布式

  • 高并发

  • SpringBoot

    • SpringBoot 整合redis
    • SpringBoot 线程池
    • springboot下整合mybatis
    • spring boot 配置文件的加载顺序
    • springboot启动不加载bootstrap.yml文件的问题解决
    • SpringBoot设置动态定时任务
    • springboot整合hibernate
    • ApplicationRunner、InitializingBean、@PostConstruct使用详解
    • Spring Boot 优雅的参数校验方案
    • ELK处理 SpringBoot 日志,太优雅了!
    • SpringBoot配置数据源
    • Spring Boot 默认数据库连接池 —— HikariCP
    • 数据库连接池Hikari监控
    • Spring Boot中使用AOP统一处理Web请求日志
    • SpringBoot 三大开发工具,你都用过么?
    • Spring Boot 3.2 + CRaC = 王炸!
    • springboot启动的时候排除加载某些bean
    • spring boot中集成swagger
    • springboot项目引入这个包以后把原来的json报文改为了xml格式返回
    • SpringBoot中new对象不能自动注入对象和属性的问题
    • 使用 Spring Boot Actuator 监控应用
    • 记录一次springboot自动任务线上突然不执行问题排查过程
    • SpringBoot定时任务@Scheduled源码解析
      • SpringBoot定时任务使用
        • 入口类声明启用定时任务
        • 创建定时任务类Task
      • 源码分析
        • @EnableScheduling
        • 看下ScheduledAnnotationBeanPostProcessor有哪些属性
        • 解析@Scheduled注解
        • afterPropertiesSet
      • 总结
    • Spring Boot + Lua = 王炸!
    • Spring Boot 实现定时任务动态管理
    • SpringBoot的@Async注解有什么坑?
    • druid 参数配置详解
    • Spring Boot HandlerMethodArgumentResolver 使用和场景
    • SpringBoot数据加解密
    • 解决controller层注入的service为null
    • 在 Spring Boot 中通过 RequestBodyAdvice 统一解码请求体
    • SpringBoot之使用Redisson实现分布式锁(含完整例子)
  • SpringCloudAlibaba

  • Nginx

  • 面试

  • 生产问题

  • 系统设计

  • 消息中间件

  • Java
  • SpringBoot
程序员子龙
2024-03-08
目录

SpringBoot定时任务@Scheduled源码解析

# SpringBoot定时任务使用

# 入口类声明启用定时任务

使用@EnableScheduling

@SpringBootApplication
@MapperScan("com.winrh.mapper")
@ServletComponentScan("com.winrh.filter")
@EnableScheduling
public class DemoApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
 
}
1
2
3
4
5
6
7
8
9
10
11

# 创建定时任务类Task


@Slf4j
@Component
public class Task {
 
    @Scheduled(cron = "*/10 * * * * ?")
    public void cron(){
        log.info("每隔10秒执行一次");
    }
 
    @Scheduled(fixedDelay = 4000)
    public void fixedDelay(){
        log.info("循环调用fixedDelay,延迟为4s");
    }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 源码分析

# @EnableScheduling

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class})
@Documented
public @interface EnableScheduling {
}
1
2
3
4
5
6

通过@Import注解注入ScheduilingConfiguration.class这个配置类。

@Configuration
@Role(2)
public class SchedulingConfiguration {
    public SchedulingConfiguration() {
    }
 
    @Bean(
        name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"}
    )
    @Role(2)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

定义了ScheduledAnnotationBeanPostProcessor这么一个后置处理器(因为实现了DestructionAwareBeanPostProcessor,而DestructionAwareBeanPostProcessor又继承了BeanPostProcessor),因此,它会针对每一个bean的创建,扫描下这个bean有没有@Scheduled的方法。 后置处理器做了什么?

① 通过构造器或工厂方法创建 Bean实例 ② 为 Bean 的属性设置值和对其他 Bean 的引用 ③ 将 Bean 实例传递给 Bean 后置处理器BeanPostProcessor的 postProcessBeforeInitialization 方法 ④ 调用 Bean 的初始化方法 ⑤ 将 Bean 实例传递给 Bean 后置处理器BeanPostProcessor的 postProcessAfterInitialization方法 ⑥ Bean可以使用了 ⑦ 当容器关闭时, 调用 Bean 的销毁方法

# 看下ScheduledAnnotationBeanPostProcessor有哪些属性

//string属性解析器,用来解析${}对应的配置文件的属性,aware接口注入
@Nullable
private StringValueResolver embeddedValueResolver;
@Nullable
private String beanName;
@Nullable
private BeanFactory beanFactory;	//aware接口注入
@Nullable
private ApplicationContext applicationContext;	//aware接口注入
@Nullable
//定时任务线程池,如果不为空使用这个scheduler当作ScheduledTaskRegistrar的线程池
private Object scheduler;
//定时任务的注册器,通过这个类将定时任务委托给定时任务线程池
private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
//已检测的没有scheduled注解的类的集合
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap(64));
//保存class与scheduled方法的映射
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap(16);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 解析@Scheduled注解

在每个Bean的属性填充完之后,调用postProcessAfterInitialization方法,将带有@Scheduled 注解的方法,在拿到@Scheduled注解的方法后,调用processScheduled

public Object postProcessAfterInitialization(Object bean, String beanName) {
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    // nonAnnotatedClasses 是一个缓存,用于记录处理过程中所发现的不包含任何被@Scheduled注解的方法的类
    if (!this.nonAnnotatedClasses.contains(targetClass)) {
            // 获取类targetClass上所有使用注解@Scheduled的方法
            // 某个方法上可能同时使用多个注解@Scheduled,
            // 故以下annotatedMethods的每个Entry是一个方法对应一个@cheduled集合
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (method) -> {
            Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
            return !scheduledMethods.isEmpty() ? scheduledMethods : null;
        });
        if (annotatedMethods.isEmpty()) {
            // 如果当前类targetClass不包含任何使用注解@Scheduled的方法,将其添加到this.nonAnnotatedClasses
            this.nonAnnotatedClasses.add(targetClass);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
            }
        } else {
           // 当前类targetClass上找到了使用注解@Scheduled的方法,记录在annotatedMethods中
            annotatedMethods.forEach((method, scheduledMethods) -> {
                scheduledMethods.forEach((scheduled) -> {
                    //这里调用
                    this.processScheduled(scheduled, method, bean);
                });
            });
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods);
            }
        }
    }

    return bean;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

下面的processScheduled方法,会处理方法上的每个@Scheduled注解,生成一个ScheduledTask并登记到this.scheduledTasks。

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
        Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
        Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
        // 用于记录当前 @Scheduled 注解是否已经被处理,初始化为false
        boolean processedSchedule = false;
        String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
        
        // 用于保存针对当前@Scheduled注解生成的ScheduledTask
        Set<ScheduledTask> tasks = new LinkedHashSet(4);
        long initialDelay = scheduled.initialDelay();
        String initialDelayString = scheduled.initialDelayString();
        if (StringUtils.hasText(initialDelayString)) {
            Assert.isTrue(initialDelay < 0L, "Specify 'initialDelay' or 'initialDelayString', not both");
            if (this.embeddedValueResolver != null) {
                initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
            }

            if (StringUtils.hasLength(initialDelayString)) {
                try {
                    initialDelay = parseDelayAsLong(initialDelayString);
                } catch (RuntimeException var25) {
                    throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                }
            }
        }
        // 检查这是否是一个 cron 表达式类型的注解
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (this.embeddedValueResolver != null) {
                cron = this.embeddedValueResolver.resolveStringValue(cron);
                zone = this.embeddedValueResolver.resolveStringValue(zone);
            }

            if (StringUtils.hasLength(cron)) {
                Assert.isTrue(initialDelay == -1L, "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                TimeZone timeZone;
                if (StringUtils.hasText(zone)) {
                    timeZone = StringUtils.parseTimeZoneString(zone);
                } else {
                    timeZone = TimeZone.getDefault();
                }
				//这行比较关键
                tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
            }
        }
        //省略了一部分解析过程,和解析cron是一样的
        ...
            ...
            Assert.isTrue(processedSchedule, errorMessage);
        synchronized(this.scheduledTasks) {
            Set<ScheduledTask> registeredTasks = (Set)this.scheduledTasks.get(bean);
            if (registeredTasks == null) {
                registeredTasks = new LinkedHashSet(4);
                this.scheduledTasks.put(bean, registeredTasks);
            }

            ((Set)registeredTasks).addAll(tasks);
        }
    } catch (IllegalArgumentException var26) {
        throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + var26.getMessage());
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67

跟踪到this.registrar.scheduleCronTask(),这里跳转到ScheduledTaskRegistrar类的scheduleCronTask()

	
public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
        //创建ScheduledTask
        scheduledTask = new ScheduledTask(task);
        newTask = true;
    }
    //可以看到ScheduledTaskRegistrar的初始化方法中没有对taskScheduler赋值
    //所以此时this.taskScheduler = null
    if (this.taskScheduler != null) {
        scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    } else {
        //进入这里
        this.addCronTask(task);
        this.unresolvedTasks.put(task, scheduledTask);
    }

    return newTask ? scheduledTask : null;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

在所有单例的Bean实例化完成后,调用afterSingletonsInstantiated() ,在Spring容器初始化完成后,触发ContextRefreshedEvent 事件,调用onApplicationEvent方法,执行finishRegistration()

private void finishRegistration() {
    	//用容器中的SchedulingConfigurer配置ScheduledTaskRegistrar,这里是根据ScheduledTaskRegistrar的引用,调用其set方法设置一些属性
       if (this.scheduler != null) {
           this.registrar.setScheduler(this.scheduler);
       }
   	   //如果此时ScheduledTaskRegistrar的scheduler还是空,就从容器中取TaskScheduler(byName和byType),如果没有取到就根据容器中的ScheduledExecutorService实例化TaskScheduler
       if (this.beanFactory instanceof ListableBeanFactory) {
           Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory)this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
           List<SchedulingConfigurer> configurers = new ArrayList(beans.values());
           AnnotationAwareOrderComparator.sort(configurers);
           Iterator var3 = configurers.iterator();

           while(var3.hasNext()) {
               SchedulingConfigurer configurer = (SchedulingConfigurer)var3.next();
               configurer.configureTasks(this.registrar);
           }
       }
       	//this.registrar.afterPropertiesSet();
        //所以在容器中注入TaskScheduler或ScheduledExecutorService的类或者实现SchedulingConfigurer接口都可以配置定时任务的线程池
       if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
           Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");

           try {
               this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
           } catch (NoUniqueBeanDefinitionException var9) {
               this.logger.debug("Could not find unique TaskScheduler bean", var9);

               try {
                   this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
               } catch (NoSuchBeanDefinitionException var8) {
                   if (this.logger.isInfoEnabled()) {
                       this.logger.info("More than one TaskScheduler bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var9.getBeanNamesFound());
                   }
               }
           } catch (NoSuchBeanDefinitionException var10) {
               this.logger.debug("Could not find default TaskScheduler bean", var10);

               try {
                   this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
               } catch (NoUniqueBeanDefinitionException var6) {
                   this.logger.debug("Could not find unique ScheduledExecutorService bean", var6);

                   try {
                       this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                   } catch (NoSuchBeanDefinitionException var5) {
                       if (this.logger.isInfoEnabled()) {
                           this.logger.info("More than one ScheduledExecutorService bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var6.getBeanNamesFound());
                       }
                   }
               } catch (NoSuchBeanDefinitionException var7) {
                   this.logger.debug("Could not find default ScheduledExecutorService bean", var7);
                   this.logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
               }
           }
       }

       this.registrar.afterPropertiesSet();
   }

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# afterPropertiesSet

public void afterPropertiesSet() {
    this.scheduleTasks();
}
protected void scheduleTasks() {
    if (this.taskScheduler == null) {
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }

    //省略了内容
    ...
        ...

     	if (this.cronTasks != null) {
			for (CronTask task : this.cronTasks) {
                //进入这里
				addScheduledTask(scheduleCronTask(task));
			}
		}
    //省略了内容
    ...
    ...
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

又进入了scheduleCronTask,在这里将任务提交给taskScheduler

//包装任务
public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
        scheduledTask = new ScheduledTask(task);
        newTask = true;
    }

    if (this.taskScheduler != null) {
        //把task提交到线程池ThreadPoolTaskScheduler
        //taskScheduler 默认实现类是ThreadPoolTaskScheduler
        scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    } else {
        this.addCronTask(task);
        this.unresolvedTasks.put(task, scheduledTask);
    }

    return newTask ? scheduledTask : null;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

我们看下默认实现类是ThreadPoolTaskScheduler.schedule

	public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
		ScheduledExecutorService executor = getScheduledExecutor();
		try {
			ErrorHandler errorHandler = this.errorHandler;
			if (errorHandler == null) {
				errorHandler = TaskUtils.getDefaultErrorHandler(true);
			}
			//创建一个线程,提交到ThreadPoolTaskScheduler 
            //executor 是ThreadPoolTaskScheduler
			return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
		}
		catch (RejectedExecutionException ex) {
			throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
		}
	}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

接下来就是ScheduledThreadPoolExecutor中执行定时任务了,这里就不再赘述了,大体是有一个任务队列WorkerQueue,这个队列是按小顶堆排序的,排序规则是任务执行的时间,每次取出任务时,将任务提交给线程池执行,在执行任务的时候,计算下一次执行的时间,提交队列…

# 总结

SpringBoot 定时任务默认是通过 ScheduledThreadPoolExecutor实现的,需要注意的是默认线程数是1,在生产环境中合理安排任务周期,要不有可能出现任务阻塞。

上次更新: 2024/03/18, 15:55:19
记录一次springboot自动任务线上突然不执行问题排查过程
Spring Boot + Lua = 王炸!

← 记录一次springboot自动任务线上突然不执行问题排查过程 Spring Boot + Lua = 王炸!→

最近更新
01
一个注解,优雅的实现接口幂等性
11-17
02
MySQL事务(超详细!!!)
10-14
03
阿里二面:Kafka中如何保证消息的顺序性?这周被问到两次了
10-09
更多文章>
Theme by Vdoing | Copyright © 2024-2024

    辽ICP备2023001503号-2

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式