elastic-job源码(1)- job自动装配

版本:3.1.0-SNAPSHOT
 
Maven 坐标
<dependency>     <groupId>org.apache.shardingsphere.elasticjob</groupId>     <artifactId>elasticjob-lite-spring-boot-starter</artifactId>     <version>${latest.version}</version> </dependency>

 
Spring.factories配置
org.springframework.boot.autoconfigure.EnableAutoConfiguration=   org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration

在添加elasticjob-lite-spring-boot-starter启动类的时候,会自动加载ElasticJobLiteAutoConfiguration,接下来看下ElasticJobLiteAutoConfiguration中所做的处理。
 

ElasticJobLiteAutoConfiguration.java
/**  * ElasticJob-Lite auto configuration.  */ @Configuration(proxyBeanMethods = false) @AutoConfigureAfter(DataSourceAutoConfiguration.class)   /**  * elastic job 开关  * elasticjob.enabled.ture默认为true  */ @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true", matchIfMissing = true)   /**  * 导入  * ElasticJobRegistryCenterConfiguration.class 注册中心配置  * ElasticJobTracingConfiguration.class job事件追踪配置  * ElasticJobSnapshotServiceConfiguration.class 快照配置  */ @Import({ElasticJobRegistryCenterConfiguration.class, ElasticJobTracingConfiguration.class, ElasticJobSnapshotServiceConfiguration.class})   /**  * job相关配置信息  */ @EnableConfigurationProperties(ElasticJobProperties.class) public class ElasticJobLiteAutoConfiguration {          @Configuration(proxyBeanMethods = false)     /**      * ElasticJobBootstrapConfiguration.class  创建job beans 注入spring容器      * ScheduleJobBootstrapStartupRunner.class  执行类型为ScheduleJobBootstrap.class 的job开始运行      */     @Import({ElasticJobBootstrapConfiguration.class, ScheduleJobBootstrapStartupRunner.class})     protected static class ElasticJobConfiguration {     } }

Elastic-job 是利用zookeeper 实现分布式job的功能,所以在自动装配的时候,需要有zookeeper注册中心的配置。
自动装配主要做了4件事事
1.配置zookeeper 客户端信息,启动连接zookeeper.
2.配置事件追踪数据库,用于保存job运行记录
3.解析所有job配置文件,将所有job的bean放置在spring 单例bean中
4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.
 
第一件事:配置zookeeper 客户端信息,启动连接zookeeper.
ZookeeperRegistryCenter.class
public void init() {     log.debug("Elastic job: zookeeper registry center init, server lists is: {}.", zkConfig.getServerLists());     CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()             //设置zookeeper 服务器地址             .connectString(zkConfig.getServerLists())             //设置重试机制             .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()))             //设置命名空间,zookeeper节点名称             .namespace(zkConfig.getNamespace());     //设置session超时时间     if (0 != zkConfig.getSessionTimeoutMilliseconds()) {         builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());     }     //设置连接超时时间     if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {         builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());     }     if (!Strings.isNullOrEmpty(zkConfig.getDigest())) {         builder.authorization("digest", zkConfig.getDigest().getBytes(StandardCharsets.UTF_8))                 .aclProvider(new ACLProvider() {                                      @Override                     public List<ACL> getDefaultAcl() {                         return ZooDefs.Ids.CREATOR_ALL_ACL;                     }                                      @Override                     public List<ACL> getAclForPath(final String path) {                         return ZooDefs.Ids.CREATOR_ALL_ACL;                     }                 });     }     client = builder.build();     //zookeeper 客户端开始启动     client.start();     try {         //zookeeper 客户端一直连接         if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {             client.close();             throw new KeeperException.OperationTimeoutException();         }         //CHECKSTYLE:OFF     } catch (final Exception ex) {         //CHECKSTYLE:ON         RegExceptionHandler.handleException(ex);     } }

 

第二件事: 配置事件追踪数据库,用于保存job运行记录

ElasticJobTracingConfiguration.java

 

/**  * Create a bean of tracing DataSource.  *  * @param tracingProperties tracing Properties  * @return tracing DataSource  */ @Bean("tracingDataSource") //spring中注入bean name 为tracingDataSource的job数据库连接信息 public DataSource tracingDataSource(final TracingProperties tracingProperties) {     //获取elastic-job 数据库配置     DataSourceProperties dataSource = tracingProperties.getDataSource();     if (dataSource == null) {         return null;     }     HikariDataSource tracingDataSource = new HikariDataSource();     tracingDataSource.setJdbcUrl(dataSource.getUrl());     BeanUtils.copyProperties(dataSource, tracingDataSource);     return tracingDataSource; }   /**  * Create a bean of tracing configuration.  *  * @param dataSource required by constructor  * @param tracingDataSource tracing ataSource  * @return a bean of tracing configuration  */ @Bean @ConditionalOnBean(DataSource.class) @ConditionalOnProperty(name = "elasticjob.tracing.type", havingValue = "RDB") public TracingConfiguration<DataSource> tracingConfiguration(final DataSource dataSource, @Nullable final DataSource tracingDataSource) {     /**      * dataSource 是业务数据库      * tracingDataSource 是job数据库      * 当配置elasticjob.tracing.type = RDB时,如果单独配置job数据库是,默认使用job数据库作为job运行轨迹的记录      * 但这边同时业务数据库和job追踪数据库同时注入是,mybatis-plus 结合@Table 使用的时候,很有可能找不到正确对应的数据源      */     DataSource ds = tracingDataSource;     if (ds == null) {         ds = dataSource;     }     return new TracingConfiguration<>("RDB", ds); }

 

通过elasticjob.tracing.type=RDB的配置开启事件追踪功能,这边job的事件追踪数据源可以和业务数据源配置不一样。

 

第三件事:解析所有job配置文件

ElasticJobBootstrapConfiguration.class

 

public void createJobBootstrapBeans() {     //获取job配置     ElasticJobProperties elasticJobProperties = applicationContext.getBean(ElasticJobProperties.class);     //获取单利注册对象     SingletonBeanRegistry singletonBeanRegistry = ((ConfigurableApplicationContext) applicationContext).getBeanFactory();     //获取注入zookeeper 客户端     CoordinatorRegistryCenter registryCenter = applicationContext.getBean(CoordinatorRegistryCenter.class);     //获取job事件追踪     TracingConfiguration<?> tracingConfig = getTracingConfiguration();     //构造JobBootstraps     constructJobBootstraps(elasticJobProperties, singletonBeanRegistry, registryCenter, tracingConfig); }

重要的是constructJobBootstraps 这个方法,来看下

private void constructJobBootstraps(final ElasticJobProperties elasticJobProperties, final SingletonBeanRegistry singletonBeanRegistry,                                     final CoordinatorRegistryCenter registryCenter, final TracingConfiguration<?> tracingConfig) {     //遍历配置的每一个job     for (Map.Entry<String, ElasticJobConfigurationProperties> entry : elasticJobProperties.getJobs().entrySet()) {         ElasticJobConfigurationProperties jobConfigurationProperties = entry.getValue();         //校验 job class 和 type 都为空抛异常         Preconditions.checkArgument(null != jobConfigurationProperties.getElasticJobClass()                         || !Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),                 "Please specific [elasticJobClass] or [elasticJobType] under job configuration.");         //校验 job class 和 type 都有 报相互排斥         Preconditions.checkArgument(null == jobConfigurationProperties.getElasticJobClass()                         || Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType()),                 "[elasticJobClass] and [elasticJobType] are mutually exclusive.");           if (null != jobConfigurationProperties.getElasticJobClass()) {             //通过class 注入job             registerClassedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);         } else if (!Strings.isNullOrEmpty(jobConfigurationProperties.getElasticJobType())) {             //通过type 注入job             registerTypedJob(entry.getKey(), entry.getValue().getJobBootstrapBeanName(), singletonBeanRegistry, registryCenter, tracingConfig, jobConfigurationProperties);         }     } }

Job 有两种类型的注入,第一种是是class,配置成job的全路径信息注入
 
再来看看registerClassedJob 方法里的内容
private void registerClassedJob(final String jobName, final String jobBootstrapBeanName, final SingletonBeanRegistry singletonBeanRegistry, final CoordinatorRegistryCenter registryCenter,                                 final TracingConfiguration<?> tracingConfig, final ElasticJobConfigurationProperties jobConfigurationProperties) {     //获取job配置     JobConfiguration jobConfig = jobConfigurationProperties.toJobConfiguration(jobName);     //配置job事件追踪     jobExtraConfigurations(jobConfig, tracingConfig);     //获取job类型     ElasticJob elasticJob = applicationContext.getBean(jobConfigurationProperties.getElasticJobClass());     //没有配置cron表达式 就初始化为OneOffJobBootstrap对象,一次性任务     if (Strings.isNullOrEmpty(jobConfig.getCron())) {         Preconditions.checkArgument(!Strings.isNullOrEmpty(jobBootstrapBeanName), "The property [jobBootstrapBeanName] is required for One-off job.");         singletonBeanRegistry.registerSingleton(jobBootstrapBeanName, new OneOffJobBootstrap(registryCenter, elasticJob, jobConfig));     } else {         //有配置cron表达式 就初始化为ScheduleJobBootstrap对象,定时任务         //设置bean name         String beanName = !Strings.isNullOrEmpty(jobBootstrapBeanName) ? jobBootstrapBeanName : jobConfig.getJobName() + "ScheduleJobBootstrap";         //注入ScheduleJobBootstrap对象为单利对象         singletonBeanRegistry.registerSingleton(beanName, new ScheduleJobBootstrap(registryCenter, elasticJob, jobConfig));     } }

Class 类型注入的job有两种类型
1.ScheduleJobBootstrap:定时任务类型的job。
2.OneOffJobBootstrap:一定次job类型。
 
看下定义的new ScheduleJobBootstrap 方法
public JobScheduler(final CoordinatorRegistryCenter regCenter, final ElasticJob elasticJob, final JobConfiguration jobConfig) {     Preconditions.checkArgument(null != elasticJob, "Elastic job cannot be null.");     this.regCenter = regCenter;     //获取job监听器     Collection<ElasticJobListener> jobListeners = getElasticJobListeners(jobConfig);     // 集成所有操作zookeeper 节点的services,job 监听器     setUpFacade = new SetUpFacade(regCenter, jobConfig.getJobName(), jobListeners);     //获取当前job名称     String jobClassName = JobClassNameProviderFactory.getProvider().getJobClassName(elasticJob);     //zookeeper节点 {namespace}/{jobclassname}/config 放置job配置信息     this.jobConfig = setUpFacade.setUpJobConfiguration(jobClassName, jobConfig);     // 集成所有操作zookeeper 节点的services     schedulerFacade = new SchedulerFacade(regCenter, jobConfig.getJobName());     jobFacade = new LiteJobFacade(regCenter, jobConfig.getJobName(), jobListeners, findTracingConfiguration().orElse(null));     //检验job配置     validateJobProperties();     //定义job执行器     jobExecutor = new ElasticJobExecutor(elasticJob, this.jobConfig, jobFacade);     //监听器里注入GuaranteeService     setGuaranteeServiceForElasticJobListeners(regCenter, jobListeners);     //创建定时任务,开始执行     jobScheduleController = createJobScheduleController(); }

 

看下createJobScheduleController

private JobScheduleController createJobScheduleController() {     JobScheduleController result = new JobScheduleController(createScheduler(), createJobDetail(), getJobConfig().getJobName());     //注册job     JobRegistry.getInstance().registerJob(getJobConfig().getJobName(), result);     //注册器开始运行     registerStartUpInfo();     return result; }

看下registerStartUpInfo方法

public void registerStartUpInfo(final boolean enabled) {     //开始所有的监听器     listenerManager.startAllListeners();     //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器     leaderService.electLeader();     //{namespace}/{ipservers} 设置enable处理     serverService.persistOnline(enabled);     //临时节点   /{namespave}/instances 放置运行服务实例信息     instanceService.persistOnline();     //开启一个异步服务     if (!reconcileService.isRunning()) {         reconcileService.startAsync();     } }

这里实行的操作:
1.开启所有监听器处理
2.leader选举
3.持久化节点数据
4.开启异步服务
 
第四步:4.识别job类型,在zookeeper节点上处理job节点数据,运行定时任务job.

 

@Override public void run(final String... args) {     log.info("Starting ElasticJob Bootstrap.");     applicationContext.getBeansOfType(ScheduleJobBootstrap.class).values().forEach(ScheduleJobBootstrap::schedule);     log.info("ElasticJob Bootstrap started."); }

获取到所有的定时任务job(ScheduleJobBootstrap类型),执行schedule方法,底层实际使用quartz框架运行定时任务。

 
 

 
 

 

发表评论

评论已关闭。

相关文章

当前内容话题