PHP线程池

 海哭的声音2602928847 发布于 2023-02-03 10:50

编辑:澄清和简化:我正在寻找一种"好"的方式,每当Stackable结束时(使用第一个Stackable中的数据添加第二个),可以向Pool提交更多可堆叠对象.我有想法轮询对象,直到一个结束(低效和丑陋)并传递对Pool对象的引用(我无法使其工作).基本代码是这个:https://github.com/krakjoe/pthreads/blob/master/examples/Pooling.php

现在,完整描述:

我正在开发一个PHP应用程序,它已经增长太多而且需要花费很多时间.因此,我正在尝试多线程该应用程序,使用线程池(我知道PHP不是最好的选择,但我不想要,并且此时无法更改语言).

问题是,应用程序有两个阶段,必须按顺序排列,每个阶段都有很多可以同时执行的子任务.所以,这是我头脑中的过程:

第1阶段将有N个子任务,这些子任务将是Stackable对象.

当子任务i结束时,必须通知"main"(创建池,Stackables等),并使用来自子任务i(不同的Stackable对象)的一些数据执行子任务i的阶段2.在此阶段,第1阶段的每个子任务将有M个子任务.

我想为第1阶段和第2阶段的线程使用相同的线程池,我从第1阶段到第2阶段的唯一解决方案是轮询N个子任务中的每一个,直到其中一个结束,然后调用第2阶段结束,并重复直到所有N个子任务结束.

我正在使用Joe Watkins在pthreads源中包含的线程池的示例作为基本代码.

1 个回答
  • 您应该首先阅读:https://gist.github.com/krakjoe/6437782

    <?php
    /**
    * Normal worker
    */
    class PooledWorker extends Worker {
        public function run(){}
    }
    
    
    /**
    * Don't descend from pthreads, normal objects should be used for pools
    */
    class Pool {
        protected $size;
        protected $workers;
    
        /**
        * Construct a worker pool of the given size
        * @param integer $size
        */  
        public function __construct($size) {
            $this->size = $size;
        }
    
        /**
        * Start worker threads
        */
        public function start() {
            while (@$worker++ < $this->size) {
                $this->workers[$worker] = new PooledWorker();
                $this->workers[$worker]->start();
            }
            return count($this->workers);
        }
    
        /**
        * Submit a task to pool
        */
        public function submit(Stackable $task) {
            $this->workers[array_rand($this->workers)]
                ->stack($task);
            return $task;
        }
    
        /**
        * Shutdown worker threads
        */
        public function shutdown() {
            foreach ($this->workers as $worker)
                $worker->shutdown();
        }
    }
    
    class StageTwo extends Stackable {
        /**
        * Construct StageTwo from a part of StageOne data
        * @param int $data
        */
        public function __construct($data) {
            $this->data = $data;
        }
    
        public function run(){
            printf(
                "Thread %lu got data: %d\n", 
                $this->worker->getThreadId(), $this->data);
        }
    }
    
    class StageOne extends Stackable {
        protected $done;
    
        /**
        * Construct StageOne with suitable storage for data
        * @param StagingData $data
        */
        public function __construct(StagingData $data) {
            $this->data = $data;
        }
    
        public function run() {
            /* create dummy data array */
            while (@$i++ < 100) {
                $this->data[] = mt_rand(
                    20, 1000);
            }
            $this->done = true;
        }
    }
    
    /**
    * StagingData to hold data from StageOne
    */
    class StagingData extends Stackable {
        public function run() {}
    }
    
    /* stage and data reference arrays */
    $one = [];
    $two = [];
    $data = [];
    
    $pool = new Pool(8);
    $pool->start();
    
    /* construct stage one */
    while (count($one) < 10) {
        $staging = new StagingData();
        /* maintain reference counts by storing return value in normal array in local scope */
        $one[] = $pool
            ->submit(new StageOne($staging));
        /* maintain reference counts */
        $data[] = $staging;
    }
    
    /* construct stage two */
    while (count($one)) {
    
        /* find completed StageOne objects */
        foreach ($one as $id => $job) {
            /* if done is set, the data from this StageOne can be used */
            if ($job->done) {
                /* use each element of data to create new tasks for StageTwo */
                foreach ($job->data as $chunk) {
                    /* submit stage two */
                    $two[] = $pool
                        ->submit(new StageTwo($chunk));
                }
    
                /* no longer required */
                unset($one[$id]);
            }
        }
    
        /* in the real world, it is unecessary to keep polling the array */
        /* you probably have some work you want to do ... do it :) */
        if (count($one)) {
            /* everyone likes sleep ... */
            usleep(1000000);
        }
    }
    
    /* all tasks stacked, the pool can be shutdown */
    $pool->shutdown();
    ?>
    

    将输出:

    Thread 140012266239744 got data: 612
    Thread 140012275222272 got data: 267
    Thread 140012257257216 got data: 971
    Thread 140012033140480 got data: 881
    Thread 140012257257216 got data: 1000
    Thread 140012016355072 got data: 261
    Thread 140012257257216 got data: 510
    Thread 140012016355072 got data: 148
    Thread 140012016355072 got data: 501
    Thread 140012257257216 got data: 767
    Thread 140012024747776 got data: 504
    Thread 140012033140480 got data: 401
    Thread 140012275222272 got data: 20
    <-- trimmed from 1000 lines -->
    Thread 140012041533184 got data: 285
    Thread 140012275222272 got data: 811
    Thread 140012041533184 got data: 436
    Thread 140012257257216 got data: 977
    Thread 140012033140480 got data: 830
    Thread 140012275222272 got data: 554
    Thread 140012024747776 got data: 704
    Thread 140012033140480 got data: 50
    Thread 140012257257216 got data: 794
    Thread 140012024747776 got data: 724
    Thread 140012033140480 got data: 624
    Thread 140012266239744 got data: 756
    Thread 140012284204800 got data: 997
    Thread 140012266239744 got data: 708
    Thread 140012266239744 got data: 981
    

    因为您想使用一个池,所以除了在创建池的主上下文中创建任务之外别无选择.我可以想象其他解决方案,但你特别要求这种解决方案.

    根据我可以使用的硬件,以及要处理的任务和数据的性质,我可能有多个[小]线程池,每个工作一个,这将允许StageOne在Worker中创建StageTwo对象正在执行它们的上下文,可能需要考虑.

    2023-02-03 10:53 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有