热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

PHP并发编程之MasterWorker模式

Master-Worker的模式结构Master进程为主进程,它维护了一个Worker进程队列、子任务队列和子结果集。Worker进程队列中的Worker进程,不停地从任务队列中提






Master-Worker 的模式结构

Master 进程为主进程,它维护了一个 Worker 进程队列、子任务队列和子结果集。Worker 进程队列中的 Worker 进程,不停地从任务队列中提取要处理的子任务,并将子任务的处理结果写入结果集。


  • 使用多进程

  • 支持 Worker 错误重试,仅仅实现业务即可

  • 任务累积过多,自动 Fork Worker 进程

  • 常驻 Worker 进程,减少进程 Fork 开销

  • 非常驻 Worker 进程闲置,自动退出回收

  • 支持日志

Demo: 基于 Redis 生产消费队列 在 test 目录中

代码:

declare(ticks = 1);
// 必须先使用语句declare(ticks=1),否则注册的singal-handel就不会执行了
//error_reporting(E_ERROR);
abstract class MasterWorker
{
// 子进程配置属性
protected $maxWorkerNum; // 最多只能开启进程数
protected $minWorkerNum; // 最少常驻子进程数
protected $waitTaskTime; // 等待任务时间,单位秒
protected $waitTaskLoopTimes; // 连续这么多次队列为空就退出子进程
protected $consumeTryTimes; // 连续消费失败次数
// 父进程专用属性
protected $worker_list = [];
protected $check_internal = 1;
protected $masterExitCallback = [];
// 子进程专用属性
protected $autoQuit = false;
protected $status = self::WORKER_STATUS_IDLE;
protected $taskData; // 任务数据
protected $workerExitCallback = [];
// 通用属性
protected $stop_service = false;
protected $master = true;
// 通用配置
protected $logFile;
const WORKER_STATUS_IDLE = 'idle';
const WORKER_STATUS_FINISHED = 'finished';
const WORKER_STATUS_EXITING = 'exiting';
const WORKER_STATUS_WORKING = 'working';
const WORKER_STATUS_FAIL = 'fail';
const WORKER_STATUS_TERMINATED = 'terminated';
public function __construct($optiOns= [])
{
$this->initConfig($options);
}
protected function initConfig($optiOns= [])
{
$defaultCOnfig= [
'maxWorkerNum' => 10,
'minWorkerNum' => 3,
'waitTaskTime' => 0.01,
'waitTaskLoopTimes' => 50,
'consumeTryTimes' => 3,
'logFile' => './master_worker.log',
];
foreach ($defaultConfig as $key => $default) {
$this->$key = array_key_exists($key, $options) ? $options[$key] : $default;
}
}
public function start()
{
// 父进程异常,需要终止子进程
set_exception_handler([$this, 'exceptionHandler']);
// fork minWorkerNum 个子进程
$this->mutiForkWorker($this->minWorkerNum);
if ($this->getWorkerLength() <= 0) {
$this->masterWaitExit(true, 'fork 子进程全部失败');
}
// 父进程监听信号
pcntl_signal(SIGTERM, [$this, 'sig_handler']);
pcntl_signal(SIGINT, [$this, 'sig_handler']);
pcntl_signal(SIGQUIT, [$this, 'sig_handler']);
pcntl_signal(SIGCHLD, [$this, 'sig_handler']);
// 监听队列,队列比进程数多很多,则扩大进程,扩大部分的进程会空闲自动退出
$this->checkWorkerLength();
$this->masterWaitExit();
}
/**
* Master 等待退出
*
* @param boolean $force 强制退出
* @param string $msg 退出 message
* @return void
*/
protected function masterWaitExit($force = false, $msg = '')
{
// 强制发送退出信号
$force && $this->sig_handler(SIGTERM);
// 等到子进程退出
while ($this->stop_service) {
$this->checkExit($msg);
$this->msleep($this->check_internal);
}
}
protected function log($msg)
{
try {
$header = $this->isMaster() ? 'Master [permanent]' : sprintf('Worker [%s]', $this->autoQuit ? 'temporary' : 'permanent');
$this->writeLog($msg, $this->getLogFile(), $header);
} catch (\Exception $e) {
}
}
protected function mutiForkWorker($num, $autoQuit = false, $maxTryTimes = 3)
{
for ($i = 1; $i <= $num; ++$i) {
$this->forkWorker($autoQuit, $maxTryTimes);
}
}
protected function checkWorkerLength()
{
// 如果要退出父进程,就不执行检测
while (! $this->stop_service) {
$this->msleep($this->check_internal);
// 处理进程
$workerLength = $this->getWorkerLength();
// 如果进程数小于最低进程数
$this->mutiForkWorker($this->minWorkerNum - $workerLength);
$workerLength = $this->getWorkerLength();
// 创建常驻worker进程失败, 下次检查继续尝试创建
if ($workerLength <= 0) {
continue;
}
if ($workerLength >= $this->maxWorkerNum) {
// 不需要增加进程
continue;
}
$num = $this->calculateAddWorkerNum();
// 不允许超过最大进程数
$num = min($num, $this->maxWorkerNum - $workerLength);
// 创建空闲自动退出worker进程
$this->mutiForkWorker($num, true);
}
}
protected function getWorkerLength()
{
return count($this->worker_list);
}
//信号处理函数
public function sig_handler($sig)
{
switch ($sig) {
case SIGTERM:
case SIGINT:
case SIGQUIT:
// 退出: 给子进程发送退出信号,退出完成后自己退出
// 先标记一下,子进程完全退出后才能结束
$this->stop_service = true;
// 给子进程发送信号
foreach ($this->worker_list as $pid => $v) {
posix_kill($pid, SIGTERM);
}
break;
case SIGCHLD:
// 子进程退出, 回收子进程, 并且判断程序是否需要退出
while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
// 去除子进程
unset($this->worker_list[$pid]);
// 子进程是否正常退出
// if (pcntl_wifexited($status)) {
// //
// }
}
$this->checkExit();
break;
default:
$this->default_sig_handler($sig);
break;
}
}
public function child_sig_handler($sig)
{
switch ($sig) {
case SIGINT:
case SIGQUIT:
case SIGTERM:
$this->stop_service = true;
break;
// 操作比较危险 在处理任务当初强制终止
// case SIGTERM:
// // 强制退出
// $this->stop_service = true;
// $this->status = self::WORKER_STATUS_TERMINATED;
// $this->beforeWorkerExitHandler();
// $this->status = self::WORKER_STATUS_EXITING;
// die(1);
// break;
}
}
protected function checkExit($msg = '')
{
if ($this->stop_service && empty($this->worker_list)) {
$this->beforeMasterExitHandler();
die($msg ?:'Master 进程结束, Worker 进程全部退出');
}
}
protected function forkWorker($autoQuit = false, $maxTryTimes = 3)
{
$times = 1;
do {
$pid = pcntl_fork();
if ($pid == -1) {
++$times;
} elseif($pid) {
$this->worker_list[$pid] = true;
//echo 'pid:', $pid, "\n";
return $pid;
} else {
// 子进程 消费
$this->autoQuit = $autoQuit;
$this->master = false;
// 处理信号
pcntl_signal(SIGTERM, [$this, 'child_sig_handler']);
pcntl_signal(SIGINT, [$this, 'child_sig_handler']);
pcntl_signal(SIGQUIT, [$this, 'child_sig_handler']);
exit($this->runChild()); // worker进程结束
}
} while ($times <= $maxTryTimes);
// fork 3次都失败
return false;
}
/**
* 子进程处理内容
*/
protected function runChild()
{
$noDataLoopTime = 0;
$status = 0;
while (!$this->autoQuit || ($noDataLoopTime <= $this->waitTaskLoopTimes)) {
// 处理退出
if ($this->stop_service) {
break;
}
$this->taskData = null;
try {
$this->taskData = $this->deQueue();
if ($this->taskData) {
$noDataLoopTime = 1; // 重新从1开始
$this->status = self::WORKER_STATUS_WORKING;
$this->consumeByRetry($this->taskData);
$this->status = self::WORKER_STATUS_FINISHED;
} else {
$this->status = self::WORKER_STATUS_IDLE;
// 避免溢出
$noDataLoopTime = $noDataLoopTime >= PHP_INT_MAX ? PHP_INT_MAX : ($noDataLoopTime + 1);
// 等待队列
$this->msleep($this->waitTaskTime);
}
$status = 0;
} catch (\RedisException $e) {
$this->status = self::WORKER_STATUS_FAIL;
$this->consumeFail($this->taskData, $e);
$status = 1;
} catch (\Exception $e) {
// 消费出现错误
$this->status = self::WORKER_STATUS_FAIL;
$this->consumeFail($this->taskData, $e);
$status = 2;
}
}
$this->beforeWorkerExitHandler();
$this->status = self::WORKER_STATUS_EXITING;
return $status;
}
/**
* @param $data
* @param int $tryTimes
* @throws \Exception
*/
protected function consumeByRetry($data, $tryTimes = 1)
{
$tryTimes = 1;
$exception = null;
// consume 返回false 为失败
while ($tryTimes <= $this->consumeTryTimes) {
try {
return $this->consume($data);
} catch (\Exception $e) {
$exception = $e;
++$tryTimes;
}
}
// 最后一次还报错 写日志
if (($tryTimes > $this->consumeTryTimes) && $exception) {
throw $exception;
}
}
/**
* @param $mixed
* @param $filename
* @param $header
* @param bool $trace
* @return bool
* @throws \Exception
*/
protected function writeLog($mixed, $filename, $header, $trace = false)
{
if (is_string($mixed)) {
$text = $mixed;
} else {
$text = var_export($mixed, true);
}
$trace_list = "";
if ($trace) {
$_t = debug_backtrace();
$trace_list = "-- TRACE : \r\n";
foreach ($_t as $_line) {
$trace_list .= "-- " . $_line ['file'] . "[" . $_line ['line'] . "] : " . $_line ['function'] . "()" . "\r\n";
}
}
$text = "\r\n=" . $header . "==== " . strftime("[%Y-%m-%d %H:%M:%S] ") . " ===\r\n<" . getmypid() . "> : " . $text . "\r\n" . $trace_list;
$h = fopen($filename, 'a');
if (! $h) {
throw new \Exception('Could not open logfile:' . $filename);
}
// exclusive lock, will get released when the file is closed
if (! flock($h, LOCK_EX)) {
return false;
}
if (fwrite($h, $text) === false) {
throw new \Exception('Could not write to logfile:' . $filename);
}
flock($h, LOCK_UN);
fclose($h);
return true;
}
protected function msleep($time)
{
usleep($time * 1000000);
}
public function exceptionHandler($exception)
{
if ($this->isMaster()) {
$msg = '父进程['.posix_getpid().']错误退出中:' . $exception->getMessage();
$this->log($msg);
$this->masterWaitExit(true, $msg);
} else {
$this->child_sig_handler(SIGTERM);
}
}
public function isMaster()
{
return $this->master;
}
/**
* 默认的 worker 数量增加处理
*
* @return int
*/
public function calculateAddWorkerNum()
{
$workerLength = $this->getWorkerLength();
$taskLength = $this->getTaskLength();
// 还不够多
if (($taskLength / $workerLength <3) && ($taskLength - $workerLength <10)) {
return 0;
}
// 增加一定数量的进程
return ceil($this->maxWorkerNum - $workerLength / 2);
}
/**
* 自定义日子文件
*
* @return string
*/
protected function getLogFile()
{
return $this->logFile;
}
/**
* 自定义消费错误函数
*
* @param [type] $data
* @param \Exception $e
* @return void
*/
protected function consumeFail($data, \Exception $e)
{
$this->log(['data' => $data, 'errorCode' => $e->getCode(), 'errorMsg' => get_class($e) . ' : ' . $e->getMessage()]);
}
protected function beforeWorkerExitHandler()
{
foreach ($this->workerExitCallback as $callback) {
is_callable($callback) && call_user_func($callback, $this);
}
}
/**
* 设置Worker自定义结束回调
*
* @param mixed $func
* @param boolean $prepend
* @return void
*/
public function setWorkerExitCallback($callback, $prepend = false)
{
return $this->setCallbackQueue('workerExitCallback', $callback, $prepend);
}
/**
* 设置Master自定义结束回调
*
* @param callable $func
* @param boolean $prepend
* @return void
*/
public function setMasterExitCallback(callable $callback, $prepend = false)
{
return $this->setCallbackQueue('masterExitCallback', $callback, $prepend);
}
protected function setCallbackQueue($queueName, $callback, $prepend = false)
{
if (! isset($this->$queueName) || ! is_array($this->$queueName)) {
return false;
}
if (is_null($callback)) {
$this->$queueName = []; // 如果传递 null 就清空
return true;
} elseif (! is_callable($callback)) {
return false;
}
if ($prepend) {
array_unshift($this->$queueName, $callback);
} else {
$this->$queueName[] = $callback;
}
return true;
}
protected function beforeMasterExitHandler()
{
foreach ($this->masterExitCallback as $callback) {
is_callable($callback) && call_user_func($callback, $this);
}
}
protected function default_sig_handler($sig)
{
}
/**
* 得到待处理任务数量
*/
abstract protected function getTaskLength();
/**
* 出队
* @return mixed
*/
abstract public function deQueue();
/**
* 入队
* @param $data
* @return int
*/
abstract public function enQueue($data);
/**
* 消费的具体内容
* 不要进行失败重试
* 会自动进行
* 如果失败直接抛出异常
* @param $data
*/
abstract protected function consume($data);
}

Demo

基于redis 的 生产者-消费者模式


RedisProducterConsumer.php


require "../src/MasterWorker.php";
class RedisProducterConsumer extends MasterWorker
{
const QUERY_NAME = 'query_name';
/**
* Master 和 Worker 的连接分开,否则会出现问题
*
* @var Redis[]
*/
protected $redis_cOnnections= [];
public function __construct($optiOns= [])
{
parent::__construct($options);
// 设置退出回调
$this->setWorkerExitCallback(function ($worker) {
$this->closeRedis();
// 处理结束,把redis关闭
$this->log('进程退出:' . posix_getpid());
});
$this->setMasterExitCallback(function ($master) {
$this->closeRedis();
$this->log('master 进程退出:' . posix_getpid());
});
}
/**
* 得到队列长度
*/
protected function getTaskLength()
{
return (int) $this->getRedis()->lSize(static::QUERY_NAME);
}
/**
* 出队
* @return mixed
*/
public function deQueue()
{
return $this->getRedis()->lPop(static::QUERY_NAME);
}
/**
* 入队
* @param $data
* @return int
*/
public function enQueue($data)
{
return $this->getRedis()->rPush(static::QUERY_NAME, (string) $data);
}
/**
* 消费的具体内容
* 不要进行失败重试
* 会自动进行
* 如果失败直接抛出异常
* @param $data
*/
protected function consume($data)
{
// 错误抛出异常
//throw new Exception('错误信息');
$this->log('消费中 ' . $data);
$this->msleep(1);
$this->log('消费结束:' . $data . '; 剩余个数:' . $this->getTaskLength());
}
/**
* @return Redis
*/
public function getRedis()
{
$index = $this->isMaster() ? 'master' : 'worker';
// 后续使用 predis 使用redis池
if (! isset($this->redis_connections[$index])) {
$cOnnection= new \Redis();
$connection->connect('127.0.0.1', 6379, 2);
$this->redis_connections[$index] = $connection;
}
return $this->redis_connections[$index];
}
public function closeRedis()
{
foreach ($this->redis_connections as $key => $connection) {
$connection && $connection->close();
}
}
protected function consumeFail($data, \Exception $e)
{
parent::consumeFail($data, $e);
// 自定义操作,比如重新入队,上报错误等
}
}

调用例子

require "./RedisProducterConsumer.php";
$producterCOnsumer= new RedisProducterConsumer();
// 清空任务队列
$producterConsumer->getRedis()->ltrim(RedisProducterConsumer::QUERY_NAME, 1, 0);
// 写入任务队列
for ($i = 1; $i <= 100; ++$i) {
$producterConsumer->enQueue($i);
}
$producterConsumer->start();
// 接下来的写的代码不会执行
// 查看运行的进程
// ps aux | grep test.php
// 试一试 Ctrl + C 在执行上面产看进程命令

代码地址:https://github.com/MrSuperLi/php-master-wo...




php
并发编程
master-worker


推荐阅读
  • HBase客户端Table类中getRpcTimeout方法的应用与编程实例解析 ... [详细]
  • 本文详细介绍了使用响应文件在静默模式下安装和配置Oracle 11g的方法。硬件要求包括:内存至少1GB,具体可通过命令`grep -i memtotal /proc/meminfo`进行检查。此外,还提供了详细的步骤和注意事项,确保安装过程顺利进行。 ... [详细]
  • 本文深入探讨了在Android应用开发中常见的相机连接故障问题,特别是在RK3288平台和Android 6.0系统上。通过分析具体案例,本文提供了详细的解决方案和应对策略,旨在帮助开发者有效解决相机连接问题,提升应用的稳定性和用户体验。 ... [详细]
  • 本文详细介绍了如何在Linux系统中搭建51单片机的开发与编程环境,重点讲解了使用Makefile进行项目管理的方法。首先,文章指导读者安装SDCC(Small Device C Compiler),这是一个专为小型设备设计的C语言编译器,适合用于51单片机的开发。随后,通过具体的实例演示了如何配置Makefile文件,以实现代码的自动化编译与链接过程,从而提高开发效率。此外,还提供了常见问题的解决方案及优化建议,帮助开发者快速上手并解决实际开发中可能遇到的技术难题。 ... [详细]
  • 深入解析Gradle中的Project核心组件
    在Gradle构建系统中,`Project` 是一个核心组件,扮演着至关重要的角色。通过使用 `./gradlew projects` 命令,可以清晰地列出当前项目结构中包含的所有子项目,这有助于开发者更好地理解和管理复杂的多模块项目。此外,`Project` 对象还提供了丰富的配置选项和生命周期管理功能,使得构建过程更加灵活高效。 ... [详细]
  • Java 零基础入门:SQL Server 学习笔记(第21篇)
    Java 零基础入门:SQL Server 学习笔记(第21篇) ... [详细]
  • Go语言实现Redis客户端与服务器的交互机制深入解析
    在前文对Godis v1.0版本的基础功能进行了详细介绍后,本文将重点探讨如何实现客户端与服务器之间的交互机制。通过具体代码实现,使客户端与服务器能够顺利通信,赋予项目实际运行的能力。本文将详细解析Go语言在实现这一过程中的关键技术和实现细节,帮助读者深入了解Redis客户端与服务器的交互原理。 ... [详细]
  • 技术日志:深入探讨Spark Streaming与Spark SQL的融合应用
    技术日志:深入探讨Spark Streaming与Spark SQL的融合应用 ... [详细]
  • DHCP三层交换机设置方式全局模式和接口模式设置方式和命令resetsave回车输入yreboot输入n输入y重启后就恢复默认设置了默认用户名密码adminAdmin@huawei ... [详细]
  • 深入解析 iOS Objective-C 中的对象内存对齐规则及其优化策略
    深入解析 iOS Objective-C 中的对象内存对齐规则及其优化策略 ... [详细]
  • 本文提供了 RabbitMQ 3.7 的快速上手指南,详细介绍了环境搭建、生产者和消费者的配置与使用。通过官方教程的指引,读者可以轻松完成初步测试和实践,快速掌握 RabbitMQ 的核心功能和基本操作。 ... [详细]
  • 第五章详细探讨了 Red Hat Enterprise Linux 6 中的 Ext3 文件系统。5.1 节介绍了如何创建 Ext3 文件系统,包括必要的命令和步骤,以及在实际操作中可能遇到的问题和解决方案。此外,还涵盖了 Ext3 文件系统的性能优化和维护技巧,为用户提供全面的操作指南。 ... [详细]
  • APKAnalyzer(1):命令行操作体验与功能解析
    在对apkChecker进行深入研究后,自然而然地关注到了Android Studio中的APK分析功能。将APK文件导入IDE中,系统会自动解析并展示其中各类文件的详细信息。官方文档提供了详细的命令行工具使用指南,帮助开发者快速上手。本文以一个RecyclerView的Adapter代理开源库为例,探讨了如何利用这些工具进行高效的APK分析。 ... [详细]
  • 本文深入探讨了 MXOTDLL.dll 在 C# 环境中的应用与优化策略。针对近期公司从某生物技术供应商采购的指纹识别设备,该设备提供的 DLL 文件是用 C 语言编写的。为了更好地集成到现有的 C# 系统中,我们对原生的 C 语言 DLL 进行了封装,并利用 C# 的互操作性功能实现了高效调用。此外,文章还详细分析了在实际应用中可能遇到的性能瓶颈,并提出了一系列优化措施,以确保系统的稳定性和高效运行。 ... [详细]
  • 本文详细解析了神州数码DCRS5980交换机的基础配置流程和技术要点。首先,通过进入配置模式(`enable`),设置主机名(`hostname 5980`),并创建VLAN,逐步介绍了设备的初始设置步骤。此外,还涵盖了端口配置、IP地址分配及安全设置等关键环节,为用户提供了全面的配置指导。 ... [详细]
author-avatar
糖糖糖开水
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有