ElasticJob

ElasticJob

来自官方的定义

ElasticJob

1
2
3
Elastic-Job is a distributed scheduling solution consisting of two separate projects, Lite and Cloud.

Elastic-Job-Lite is a lightweight, decentralized solution that provides distributed task sharding services. Elastic-Job-Cloud is a Mesos Framework which provides additional resource management, App distribution, process isolation and task aggregation features.

1、 基本概念

1.1. 分片概念

任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。

例如:有一个遍历数据库某张表的作业,现有2台服务器。为了快速的执行作业,那么每台服务器应执行作业的50%。 为满足此需求,可将作业分成2片,每台服务器执行1片。作业遍历数据的逻辑应为:服务器A遍历ID以奇数结尾的数据;服务器B遍历ID以偶数结尾的数据。 如果分成10片,则作业遍历数据的逻辑应为:每片分到的分片项应为ID%10,而服务器A被分配到分片项0,1,2,3,4;服务器B被分配到分片项5,6,7,8,9,直接的结果就是服务器A遍历ID以0-4结尾的数据;服务器B遍历ID以5-9结尾的数据。

1.2. 分片项与业务处理解耦

Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。

1.3. 个性化参数的适用场景

个性化参数即shardingItemParameter,可以和分片项匹配对应关系,用于将分片项的数字转换为更加可读的业务代码。

例如:按照地区水平拆分数据库,数据库A是北京的数据;数据库B是上海的数据;数据库C是广州的数据。 如果仅按照分片项配置,开发者需要了解0表示北京;1表示上海;2表示广州。 合理使用个性化参数可以让代码更可读,如果配置为0=北京,1=上海,2=广州,那么代码中直接使用北京,上海,广州的枚举值即可完成分片项和业务逻辑的对应关系。

2、 核心理念

2.1. 分布式调度

Elastic-Job-Lite并无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。

注册中心仅用于作业注册和监控信息存储。而主作业节点仅用于处理分片和清理等功能。

2.2. 作业高可用

Elastic-Job-Lite提供最安全的方式执行作业。将分片总数设置为1,并使用多于1台的服务器执行作业,作业将会以1主n从的方式执行。

一旦执行作业的服务器崩溃,等待执行的服务器将会在下次作业启动时替补执行。开启失效转移功能效果更好,可以保证在本次作业执行时崩溃,备机立即启动替补执行。

2.3. 最大限度利用资源

Elastic-Job-Lite也提供最灵活的方式,最大限度的提高执行作业的吞吐量。将分片项设置为大于服务器的数量,最好是大于服务器倍数的数量,作业将会合理的利用分布式资源,动态的分配分片项。

例如:3台服务器,分成10片,则分片项分配结果为服务器A=0,1,2;服务器B=3,4,5;服务器C=6,7,8,9。 如果服务器C崩溃,则分片项分配结果为服务器A=0,1,2,3,4;服务器B=5,6,7,8,9。在不丢失分片项的情况下,最大限度的利用现有资源提高吞吐量。

3、 功能列表

  • 分布式调度协调
  • 弹性扩容缩容
  • 失效转移
  • 错过执行作业重触发
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 自诊断并修复分布式不稳定造成的问题
  • 支持并行调度
  • 支持作业生命周期操作
  • 丰富的作业类型
  • Spring整合以及命名空间提供
  • 运维平台

4、 快速入门

以springBoot为例

4.1、 引入maven依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>

4.2、 作业开发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import com.dangdang.ddframe.job.api.ShardingContext;
import DynamicJobConfig;
import SimpleDeliveryJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

/**
* @Author: zy
* @Date: 2019/7/20/0020 10:55
*/
@Slf4j
@Component
public class InPolygonJob implements SimpleDeliveryJob {

@Value("${elastic-job.task.InPolygon.cron}")
private String cron;

@Value("${elastic-job.task.InPolygon.shardingTotalCount}")
private int shardingTotalCount;

@Value("${elastic-job.task.InPolygon.shardingItemParameters}")
private String shardingItemParameters;

@Value("${elastic-job.task.InPolygon.description}")
private String description;

private ApplicationContext applicationContext;
@Autowired
private ISyncInPolygonService syncInPolygonService;


@Override
public void execute(ShardingContext shardingContext) {
// do something
}

@Override
public void afterPropertiesSet() throws Exception {
DynamicJobConfig dynamicJobConfig = (DynamicJobConfig) applicationContext.getBean("dynamicJobConfig");
dynamicJobConfig.dynamicAddSimpleJobScheduler(this, cron, shardingTotalCount, shardingItemParameters, description);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
1
2
3
4
5
6
7
8
9
10
11
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContextAware;

/**
* @Description :
* @author: zy
* @Date : 2019-06-14 13:59
*/
public interface SimpleDeliveryJob extends SimpleJob, InitializingBean, ApplicationContextAware {
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.dangdang.ddframe.job.event.rdb.JobEventRdbConfiguration;
import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler;
import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;

/**
* @Description : 动态调度任务
* @author: zy
* @Date : 2019-06-12 18:11
*/
@Slf4j
@Component
public class DynamicJobConfig {

@Autowired
private ZookeeperRegistryCenter regCenter;

@Autowired
@Qualifier("taskDataSource")
private DataSource taskDataSource;

@Async("taskExecutor")
public void dynamicAddSimpleJobScheduler(SimpleJob simpleJob, String cron,
int shardingTotalCount, String shardingItemParameters, String description) {
new SpringJobScheduler(simpleJob, regCenter,
LiteJobConfigUtil.getLiteJobConfiguration(simpleJob.getClass(), cron, shardingTotalCount,shardingItemParameters, description),
new JobEventRdbConfiguration(taskDataSource)).init();
}

}

5、 作业配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
elastic-job:
datasource:
driver-class-name: com.mysql.jdbc.Driver
username: root
password: root
url: jdbc:mysql://192.168.0.1:3306/elastic_job?autoReconnect=true&allowMultiQueries=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false
type: com.alibaba.druid.pool.DruidDataSource
initialSize: 5
minIdle: 5
maxActive: 20
maxWait: 60000
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: SELECT 1 FROM DUAL
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
filters: stat,wall,log4j
logSlowSql: true

registry-center:
address: 192.168.0.2:2181
namespace: elastic-job

task:
test:
cron: 0/5 * * * * ?
#作业分片总数
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
description: 描述

其他方式见开发指南