# 分布式集群架构场景化解决方案(三)-分布式调度
[TOC]
分布式调度可以说是在分布式集群环境下如何去执行定时任务
## 什么是定时任务?
定时任务形式:每隔⼀定时间/或者特殊时刻执行
例如:
- 每日的数据汇总报表
- 定时的备份数据
- 定时的日志清理
- 订单超时自动取消付款
- 收货超期自动确认收货
## 什么是分布式调度?
在单机环境下,我们的定时任务,只需要分发给一台机器执行就可以了,但是在分布式环境下,我们的节点都是一样的,如果不做特殊处理,就会导致定时任务重复执行,数据执行不准确等问题。并且我们的大任务也应该可以拆分成小任务,由多个服务同时进行处理,提升大任务的处理效率. 若某一个实例宕机,也不影响其他实例来执行任务。
综合上述可得,分布式调度有以下含义:
- 一个相同的任务只由一个服务进行执行,不能多个服务执行同一个相同的任务
- 将大任务拆分成多个小任务,由多个节点同时进行查询,即将任务进行分片
- 高可用,单个服务失败不影响整体的任务执行
## 定时任务与消息队列的区别
**共同点**:
- 异步处理
- 应用解耦
- 可以将两个服务之间的耦合度降低,使用定时任务或者MQ作为中间齿轮,进行两者服务之前的数据的中转
- 流量削峰
- MQ可以将消息放入队列中慢慢进行消费,而定时任务则可以定时做批量处理,提升处理效率
**不同点**:
- 定时任务作业是时间驱动,而MQ是事件驱动;
- 定时任务是每隔一定时间/特定时间触发一次,而MQ是接收到了事件消息才会触发一次
- 定时任务一般用于批量处理数据,MQ一般用户接收到了事件即单条处理
## 分布式调度框架Elastic-Job
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供最轻量级的分布式任务的协调服务,外部依赖仅Zookeeper。

**去中心化**?
- 各个执行节点对等
- 无作业调度中心节点,而是基于部署作业框架的程序在到达相应时间点时各自触发调度。
- 主节点不固定,下线后可自动重新进行选举
**Elastic-Job的功能**
- 分布式调度协调
- 弹性扩容缩容
- 失效转移
- 错过执行作业重触发
- 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
- 自诊断并修复分布式不稳定造成的问题
- 支持并行调度
- 支持作业生命周期操作
- 丰富的作业类型
- Spring整合以及命名空间提供
- 运维平台
### 什么是任务分片?
比如一次有100万条数据,我们这时候有5个服务,这样就可以至少将数据分片为5个或者是更多个,这样可以每个服务处理一份数据或多份更小的数据,如果一台机器上分发到了多个分片,那么将会启动多个线程来处理不同的分片数据,达到提升我们数据处理效率的目的.
### Elastic-Job-Lite简单应用
**安装zookeeper并启动后,在maven项目中引入Elastic-job核心jar包**
```xml
<!-- 引入elastic-job-lite核心模块 -->
<dependency>
<groupId>io.elasticjob</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${latest.release.version}</version>
</dependency>
<!-- 使用springframework自定义命名空间时引入 -->
<dependency>
<groupId>io.elasticjob</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${latest.release.version}</version>
</dependency>
```
**编写java类,实现SimpleJob接口,并在execute中编写相应处理逻辑**:
```java
public class MyElasticJob implements SimpleJob {
@Override
public void execute(ShardingContext context) {
switch (context.getShardingItem()) {
case 0:
// do something by sharding item 0
break;
case 1:
// do something by sharding item 1
break;
case 2:
// do something by sharding item 2
break;
// case n: ...
}
}
}
```
**调度配置**:
```xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<!--配置作业注册中心 -->
<reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
<!-- 配置作业-->
<job:simple id="oneOffElasticJob" class="xxx.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
</beans>
```
### Elastic-Job启用运维平台
[elastic-job-lite如何启用运维管理界面](https://zhengyao.space/archives/elastic-job-console)
### Elastic-Job弹性扩容缩容原理
- 第一台服务器上线触发主服务器选举。主服务器一旦下线,则重新触发选举,选举过程中阻塞,只有主服务器选举完成,才会执行其他任务。
- 某作业服务器上线时会自动将服务器信息注册到注册中心,下线时会自动更新服务器状态。
- 主节点选举,服务器上下线,分片总数变更均更新重新分片标记。
- 定时任务触发时,如需重新分片,则通过主服务器分片,分片过程中阻塞,分片结束后才可执行任务。如分片过程中主服务器下线,则先选举主服务器,再分片。
- 通过上一项说明可知,为了维持作业运行时的稳定性,运行过程中只会标记分片状态,不会重新分片。分片仅可能发生在下次任务触发前。
- 每次分片都会按服务器IP排序,保证分片结果不会产生较大波动。
- 实现失效转移功能,在某台服务器执行完毕后主动抓取未分配的分片,并且在某台服务器下线后主动寻找可用的服务器执行任务。
**作业启动的流程图**:

**作业运行的流程图**:

分布式集群架构场景化解决方案(三)-分布式调度