0x1、背景

hyperf框架通过swoole\processswoole\coroutine\sleep函数实现定时任务,代替linux环境中的crontab;该组件兼容crontab语法并支持秒级别定时任务。

本文侧重分析crontab组件如何实现多实例场景下保障任务单一执行。

0x2、执行流程

crontab 配置解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Hyperf\Crontab\Listener\CrontabRegisterListener::class
// 监听process进程事件
public function listen(): array
{
return [
BeforeProcessHandle::class,
BeforeCoroutineHandle::class,
];
}

// 解析crontab配置
public function process(object $event)
{
$crontabs = $this->parseCrontabs();
foreach ($crontabs as $crontab) {
if ($crontab instanceof Crontab) {
// 验证配置及是否开启crontab任务执行, crontab.enable状态控制
$this->crontabManager->register($crontab);
}
}
}

crontab 组件启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Hyperf\Crontab\Process\CrontabDispatcherProcess::class
public function handle(): void
{
// 这个事件貌似没实现具体逻辑
$this->event->dispatch(new CrontabDispatcherStarted());
// 死循环
while (ProcessManager::isRunning()) {
// 按照 秒 粒度执行,所以crontab组件执行秒级别定时任务
$this->sleep();
// 解析crontab配置 获取需要执行的任务
$crontabs = $this->scheduler->schedule();
while (! $crontabs->isEmpty()) {
$crontab = $crontabs->dequeue();
// 根据策略调度不同类型worker, 默认Hyperf\Crontab\Strategy\WorkerStrategy, 可通过配置重写该类
$this->strategy->dispatch($crontab);
}
}
}

private function sleep()
{
$current = date('s', time());
$sleep = 60 - $current;
$this->logger->debug('Crontab dispatcher sleep ' . $sleep . 's.');
$sleep > 0 && \Swoole\Coroutine::sleep($sleep);
}

任务执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// Hyperf\Crontab\Strategy\WorkerStrategy::class
// 调度逻辑
public function dispatch(Crontab $crontab)
{
$server = $this->serverFactory->getServer()->getServer();
if ($server instanceof Server && $crontab->getExecuteTime() instanceof Carbon) {
$workerId = $this->getNextWorkerId($server);
// 任务推送到Pipe中,Hyperf\Crontab\Listener\OnPipeMessageListener::class监听器处理
$server->sendMessage(new PipeMessage(
'callback',
[Executor::class, 'execute'],
$crontab
), $workerId);
}
}

// Hyperf\Crontab\Strategy\Executor::class负责执行不同策略
public function execute(Crontab $crontab)
{
// ...
// 默认采用callback
$diff = $crontab->getExecuteTime()->diffInRealSeconds(new Carbon());
$callback = null;
switch ($crontab->getType()) {
case 'callback':
[$class, $method] = $crontab->getCallback();
$parameters = $crontab->getCallback()[2] ?? null;
if ($class && $method && class_exists($class) && method_exists($class, $method)) {
$callback = function () use ($class, $method, $parameters, $crontab) {
// ...
// 根据配置不同执行互斥锁,默认是直接执行,不区分多实例场景
Coroutine::create($this->decorateRunnable($crontab, $runnable));
};
}
break;
// ...
}
}

// 配置策略
protected function decorateRunnable(Crontab $crontab, Closure $runnable): Closure
{
if ($crontab->isSingleton()) {
$runnable = $this->runInSingleton($crontab, $runnable);
}

if ($crontab->isOnOneServer()) {
$runnable = $this->runOnOneServer($crontab, $runnable);
}

return $runnable;
}

0x3、互斥锁

crontab组件实现两种互斥锁解决多实例部署场景

单例场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected function runInSingleton(Crontab $crontab, Closure $runnable): Closure
{
return function () use ($crontab, $runnable) {
$taskMutex = $this->getTaskMutex();
// 判断redis是否创建锁成功,采用`setnx`指令
if ($taskMutex->exists($crontab) || ! $taskMutex->create($crontab)) {
$this->logger->info(sprintf('Crontab task [%s] skipped execution at %s.', $crontab->getName(), date('Y-m-d H:i:s')));
return;
}
// 加锁->任务->删锁
// 这种类型存在缺陷,如果实例进程被Kill掉,无法执行 删除锁操作,导致其他实例无法获取锁
// 锁的默认时间:1h
try {
$runnable();
} finally {
$taskMutex->remove($crontab);
}
};
}

单服务场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected function runOnOneServer(Crontab $crontab, Closure $runnable): Closure
{
return function () use ($crontab, $runnable) {
$taskMutex = $this->getServerMutex();
// 尝试获取锁,失败则跳过
if (! $taskMutex->attempt($crontab)) {
$this->logger->info(sprintf('Crontab task [%s] skipped execution at %s.', $crontab->getName(), date('Y-m-d H:i:s')));
return;
}

$runnable();
};
}

// Hyperf\Crontab\Mutex\RedisServerMutex::class
public function attempt(Crontab $crontab): bool
{
// ...
$redis = $this->redisFactory->get($crontab->getMutexPool());
$mutexName = $this->getMutexName($crontab);
// 尝试加锁
$result = (bool) $redis->set($mutexName, $this->macAddress, ['NX', 'EX' => $crontab->getMutexExpires()]);

if ($result === true) {
// 启动coroutine监听`SIGTERM`信号,程序退出主动删除 锁
// 在容器场景这里会出现坑
Coroutine::create(function () use ($crontab, $redis, $mutexName) {
$exited = CoordinatorManager::until(Constants::WORKER_EXIT)->yield($crontab->getMutexExpires());
$exited && $redis->del($mutexName);
});
return $result;
}

}

上述两种场景中,只有oneServer这种策略相对满足多实例任务单一执行;

针对第一种单例场景可以重写WorkerStrategy逻辑,采用pcntl_signal函数监听SIGTERM信号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public function dispatch(Crontab $crontab)
{
$server = $this->serverFactory->getServer()->getServer();
if ($server instanceof Server && $crontab->getExecuteTime() instanceof Carbon) {
// ...
// 监听,主动删除
pcntl_signal(SIGTERM, function () use ($crontab){
$taskMutex = $this->container->get(TaskMutex::class);
if ($taskMutex instanceof TaskMutex) {
$taskMutex->remove($crontab);
}
});
}
}

0x4、容器场景

容器场景存在Pod扩容和缩容等情况,如果业务在未考虑到这种弹性场景会导致定时任务出现多次执行的情况;基于swoole的框架大都是内存常驻型,结合上述分析在启动程序时需要监听程序退出信号,否则导致任务锁无法释放。

官方Dockerfile启动程序示例:

1
2
#省略...
ENTRYPOINT ["php", "/opt/www/bin/hyperf.php", "start"]

实际业务开发中需要在程序启动前做一些操作,通常采用entrypoint.sh脚本执行

1
2
3
4
5
6
7
8
#!/bin/sh
set -e

# 启动前操作
# ...

# 启动
exec /app/bin/hyperf.php start

entrypoint.sh文件采用exec执行PHP文件,目的是容器启动后可以接收docker发送的SIGTERM信号

需要注意的是docker在关闭容器时,会向容器内PID=1的进程发送SIGTERM信号,如果entrypoint.sh文件中直接执行/app/bin/hyperf.php start, 容器内PID=1的进程则是/bin/sh entrypoint.sh

一旦容器出现关闭,hyperf框架本身是无法接收到SIGTERM信号,从而导致锁无法释放。

注意:互斥锁的第二种场景采用了swoole_get_local_mac()作为锁的值, 如果在非容器环境单主机部署多个实例的化,会导致互斥锁失效。

参考

评论