热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

教你使用mixphp打造多进程异步邮件发送

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。
注意:这个是 MixPHP V1 的范例

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

推荐:《PHP视频教程》

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer

生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接
$redis = new \Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new \Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

  • 左进程:负责从消息队列取出任务数据,投放给中进程。

  • 中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:


 */
class PushCommand extends BaseCommand
{

    // 配置信息
    const HOST = 'smtpdm.aliyun.com';
    const PORT = 465;
    const SECURITY = 'ssl';
    const USERNAME = '****@email.***.com';
    const PASSWORD = '****';

    // 初始化事件
    public function onInitialize()
    {
        parent::onInitialize(); // TODO: Change the autogenerated stub
        // 获取程序名称
        $this->programName = Input::getCommandName();
        // 设置pidfile
        $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    /**
     * 获取服务
     * @return TaskExecutor
     */
    public function getTaskService()
    {
        return create_object(
            [
                // 类路径
                'class'         => 'mix\task\TaskExecutor',
                // 服务名称
                'name'          => "mix-daemon: {$this->programName}",
                // 执行类型
                'type'          => \mix\task\TaskExecutor::TYPE_DAEMON,
                // 执行模式
                'mode'          => \mix\task\TaskExecutor::MODE_PUSH,
                // 左进程数
                'leftProcess'   => 1,
                // 中进程数
                'centerProcess' => 5,
                // 任务超时时间 (秒)
                'timeout'       => 5,
            ]
        );
    }

    // 启动
    public function actionStart()
    {
        // 预处理
        if (!parent::actionStart()) {
            return ExitCode::UNSPECIFIED_ERROR;
        }
        // 启动服务
        $service = $this->getTaskService();
        $service->on('LeftStart', [$this, 'onLeftStart']);
        $service->on('CenterStart', [$this, 'onCenterStart']);
        $service->start();
        // 返回退出码
        return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
        try {
            // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
            $queueModel = Redis::getInstance();
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j <16000; $j++) {
                // 从消息队列中间件阻塞获取一条消息
                $data = $queueModel->brpop(&#39;queue:email&#39;, 10);
                if (empty($data)) {
                    continue;
                }
                list(, $data) = $data;
                // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
                $worker->push($data, false);
            }
        } catch (\Exception $e) {
            // 休息一会,避免 CPU 出现 100%
            sleep(1);
            // 抛出错误
            throw $e;
        }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
        // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
        for ($j = 0; $j <16000; $j++) {
            // 从进程消息队列中抢占一条消息
            $data = $worker->pop();
            if (empty($data)) {
                continue;
            }
            // 处理消息
            try {
                // 处理消息,比如:发送短信、发送邮件、微信推送
                var_dump($data);
                $ret = self::sendEmail($data);
                var_dump($ret);
            } catch (\Exception $e) {
                // 回退数据到消息队列
                $worker->rollback($data);
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
        // Create the Transport
        $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
            ->setUsername(self::USERNAME)
            ->setPassword(self::PASSWORD);
        // Create the Mailer using your created Transport
        $mailer = new \Swift_Mailer($transport);
        // Create a message
        $message = (new \Swift_Message($data[&#39;subject&#39;]))
            ->setFrom([self::USERNAME => &#39;**网&#39;])
            ->setTo($data[&#39;to&#39;])
            ->setBody($data[&#39;body&#39;]);
        // Send the message
        $result = $mailer->send($message);
        return $result;
    }

}

测试

1.在 shell 中启动 push 常驻程序。
[root@localhost bin]# ./mix-daemon push start
mix-daemon &#39;push&#39; start successed.
1.调用接口往消息队列投放任务。

此时 shell 终端将打印:

官网:http://www.mixphp1.cn/

以上就是教你使用mixphp打造多进程异步邮件发送的详细内容,更多请关注 第一PHP社区 其它相关文章!


推荐阅读
  • 一、前言在园子里有太多的权限管理的文章、解决方案和源码,有通用的,有开源的,有Winform的,有Webform的,有MVC的,但百变不离其宗,思想都是一样的。我在项目开发中经 ... [详细]
  • Redis底层数据结构之压缩列表的介绍及实现原理
    本文介绍了Redis底层数据结构之压缩列表的概念、实现原理以及使用场景。压缩列表是Redis为了节约内存而开发的一种顺序数据结构,由特殊编码的连续内存块组成。文章详细解释了压缩列表的构成和各个属性的含义,以及如何通过指针来计算表尾节点的地址。压缩列表适用于列表键和哈希键中只包含少量小整数值和短字符串的情况。通过使用压缩列表,可以有效减少内存占用,提升Redis的性能。 ... [详细]
  • MVC设计模式的介绍和演化过程
    本文介绍了MVC设计模式的基本概念和原理,以及在实际项目中的演化过程。通过分离视图、模型和控制器,实现了代码的解耦和重用,提高了项目的可维护性和可扩展性。详细讲解了分离视图、分离模型和分离控制器的具体步骤和规则,以及它们在项目中的应用。同时,还介绍了基础模型的封装和控制器的命名规则。该文章适合对MVC设计模式感兴趣的读者阅读和学习。 ... [详细]
  • MySQL中的MVVC多版本并发控制机制的应用及实现
    本文介绍了MySQL中MVCC的应用及实现机制。MVCC是一种提高并发性能的技术,通过对事务内读取的内存进行处理,避免写操作堵塞读操作的并发问题。与其他数据库系统的MVCC实现机制不尽相同,MySQL的MVCC是在undolog中实现的。通过undolog可以找回数据的历史版本,提供给用户读取或在回滚时覆盖数据页上的数据。MySQL的大多数事务型存储引擎都实现了MVCC,但各自的实现机制有所不同。 ... [详细]
  • Redis API
    安装启动最简启动命令行输入验证动态参数启动配置文件启动常用配置通用命令keysbdsize计算key的总数exists判断是否存在delkeyvalue删除指定的keyvalue成 ... [详细]
  • 本文介绍了在无法联网的情况下,通过下载rpm包离线安装zip和unzip的方法。详细介绍了如何搜索并下载合适的rpm包,以及如何使用rpm命令进行安装。 ... [详细]
  • Redis的默认端口、数据库使用和多端口配置
    本文介绍了Redis的默认端口、数据库使用和多端口配置的方法。通过选择不同的数据库和使用flushdb命令可以实现对不同数据库的访问和清除数据。同时,本文还介绍了在同一台机器上启用多个Redis实例的方法,并讨论了配置认证密码的步骤和注意事项。 ... [详细]
  • MVC中的自定义控件
    怎么样创建自定义控 ... [详细]
  • MVC就是model模型control控制view视图把web开发的分工明确各施其职,互不干涉在MVC模式中,三个层各施其职,所以如果一旦哪一层的需求发生了变化,就只需要更改相 ... [详细]
  • Windows系统 查询已开通的端口号和对外开放端口号
    查询端口号开放情况:查看该端口被那个PID所占用;方法一:有针对性的查看端口,使用命令:netstat–ano|findstr“”netstat-a对外开放端口号参考ht ... [详细]
  • 你知道Kafka和Redis的各自优缺点吗?一文带你优化选择,不走弯路 ... [详细]
  • Spring MVC 浅谈
    大学时写的的文章,当时文章水平略差,大家见谅。MVC这个词儿,最早的定义应该是作为一种软件架构设计模式出现在软工里面的,即使用model、view、controller来设计及定 ... [详细]
  • kafka教程基本概念
    kafka教程基本概念 ... [详细]
  • (10)redis mysql 数据同步
    update之后,写MySql,再写入Redis,替旧数据(可在MySql端定义CRUD触发器,触发后写数据到Redis,也可Redis端解析binlog) ... [详细]
  • (单机安装kafka)mac安装jdkzookeeperkafkapythonkafka模块
    此处讲解单机安装kafka kafka是LinkedIn开发并开源的一个分布式MQ系统,现在是Apache的一个孵化项目。在它的主页描述kafka为一个高吞吐量的分布式(能将消息分 ... [详细]
author-avatar
区小靜
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有