0x1、背景
hyperf框架通过swoole\process
及swoole\coroutine\sleep
函数实现定时任务,代替linux环境中的crontab;该组件兼容crontab语法并支持秒级别定时任务。
本文侧重分析crontab组件如何实现多实例场景下保障任务单一执行。
0x2、执行流程
crontab 配置解析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
public function listen(): array { return [ BeforeProcessHandle::class, BeforeCoroutineHandle::class, ]; }
public function process(object $event) { $crontabs = $this->parseCrontabs(); foreach ($crontabs as $crontab) { if ($crontab instanceof Crontab) { $this->crontabManager->register($crontab); } } }
|
crontab 组件启动:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public function handle(): void { $this->event->dispatch(new CrontabDispatcherStarted()); while (ProcessManager::isRunning()) { $this->sleep(); $crontabs = $this->scheduler->schedule(); while (! $crontabs->isEmpty()) { $crontab = $crontabs->dequeue(); $this->strategy->dispatch($crontab); } } }
private function sleep() { $current = date('s', time()); $sleep = 60 - $current; $this->logger->debug('Crontab dispatcher sleep ' . $sleep . 's.'); $sleep > 0 && \Swoole\Coroutine::sleep($sleep); }
|
任务执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
|
public function dispatch(Crontab $crontab) { $server = $this->serverFactory->getServer()->getServer(); if ($server instanceof Server && $crontab->getExecuteTime() instanceof Carbon) { $workerId = $this->getNextWorkerId($server); $server->sendMessage(new PipeMessage( 'callback', [Executor::class, 'execute'], $crontab ), $workerId); } }
public function execute(Crontab $crontab) { $diff = $crontab->getExecuteTime()->diffInRealSeconds(new Carbon()); $callback = null; switch ($crontab->getType()) { case 'callback': [$class, $method] = $crontab->getCallback(); $parameters = $crontab->getCallback()[2] ?? null; if ($class && $method && class_exists($class) && method_exists($class, $method)) { $callback = function () use ($class, $method, $parameters, $crontab) { Coroutine::create($this->decorateRunnable($crontab, $runnable)); }; } break; } }
protected function decorateRunnable(Crontab $crontab, Closure $runnable): Closure { if ($crontab->isSingleton()) { $runnable = $this->runInSingleton($crontab, $runnable); }
if ($crontab->isOnOneServer()) { $runnable = $this->runOnOneServer($crontab, $runnable); }
return $runnable; }
|
0x3、互斥锁
crontab组件实现两种互斥锁解决多实例部署场景
单例场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| protected function runInSingleton(Crontab $crontab, Closure $runnable): Closure { return function () use ($crontab, $runnable) { $taskMutex = $this->getTaskMutex(); if ($taskMutex->exists($crontab) || ! $taskMutex->create($crontab)) { $this->logger->info(sprintf('Crontab task [%s] skipped execution at %s.', $crontab->getName(), date('Y-m-d H:i:s'))); return; } try { $runnable(); } finally { $taskMutex->remove($crontab); } }; }
|
单服务场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| protected function runOnOneServer(Crontab $crontab, Closure $runnable): Closure { return function () use ($crontab, $runnable) { $taskMutex = $this->getServerMutex(); if (! $taskMutex->attempt($crontab)) { $this->logger->info(sprintf('Crontab task [%s] skipped execution at %s.', $crontab->getName(), date('Y-m-d H:i:s'))); return; }
$runnable(); }; }
public function attempt(Crontab $crontab): bool { $redis = $this->redisFactory->get($crontab->getMutexPool()); $mutexName = $this->getMutexName($crontab); $result = (bool) $redis->set($mutexName, $this->macAddress, ['NX', 'EX' => $crontab->getMutexExpires()]);
if ($result === true) { Coroutine::create(function () use ($crontab, $redis, $mutexName) { $exited = CoordinatorManager::until(Constants::WORKER_EXIT)->yield($crontab->getMutexExpires()); $exited && $redis->del($mutexName); }); return $result; }
}
|
上述两种场景中,只有oneServer
这种策略相对满足多实例任务单一执行;
针对第一种单例场景可以重写WorkerStrategy
逻辑,采用pcntl_signal
函数监听SIGTERM
信号
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public function dispatch(Crontab $crontab) { $server = $this->serverFactory->getServer()->getServer(); if ($server instanceof Server && $crontab->getExecuteTime() instanceof Carbon) { pcntl_signal(SIGTERM, function () use ($crontab){ $taskMutex = $this->container->get(TaskMutex::class); if ($taskMutex instanceof TaskMutex) { $taskMutex->remove($crontab); } }); } }
|
0x4、容器场景
容器场景存在Pod扩容和缩容等情况,如果业务在未考虑到这种弹性场景会导致定时任务出现多次执行的情况;基于swoole
的框架大都是内存常驻型,结合上述分析在启动程序时需要监听程序退出信号,否则导致任务锁无法释放。
官方Dockerfile启动程序示例:
1 2
| ENTRYPOINT ["php", "/opt/www/bin/hyperf.php", "start"]
|
实际业务开发中需要在程序启动前做一些操作,通常采用entrypoint.sh
脚本执行
1 2 3 4 5 6 7 8
| #!/bin/sh set -e
# 启动前操作 # ...
# 启动 exec /app/bin/hyperf.php start
|
在entrypoint.sh
文件采用exec
执行PHP文件,目的是容器启动后可以接收docker发送的SIGTERM
信号
需要注意的是docker在关闭容器时,会向容器内PID=1
的进程发送SIGTERM
信号,如果entrypoint.sh
文件中直接执行/app/bin/hyperf.php start
, 容器内PID=1
的进程则是/bin/sh entrypoint.sh
一旦容器出现关闭,hyperf框架本身是无法接收到SIGTERM
信号,从而导致锁无法释放。
注意:互斥锁的第二种场景采用了swoole_get_local_mac()
作为锁的值, 如果在非容器环境单主机部署多个实例的化,会导致互斥锁失效。
参考