编辑:澄清和简化:我正在寻找一种"好"的方式,每当Stackable结束时(使用第一个Stackable中的数据添加第二个),可以向Pool提交更多可堆叠对象.我有想法轮询对象,直到一个结束(低效和丑陋)并传递对Pool对象的引用(我无法使其工作).基本代码是这个:https://github.com/krakjoe/pthreads/blob/master/examples/Pooling.php
现在,完整描述:
我正在开发一个PHP应用程序,它已经增长太多而且需要花费很多时间.因此,我正在尝试多线程该应用程序,使用线程池(我知道PHP不是最好的选择,但我不想要,并且此时无法更改语言).
问题是,应用程序有两个阶段,必须按顺序排列,每个阶段都有很多可以同时执行的子任务.所以,这是我头脑中的过程:
第1阶段将有N个子任务,这些子任务将是Stackable对象.
当子任务i结束时,必须通知"main"(创建池,Stackables等),并使用来自子任务i(不同的Stackable对象)的一些数据执行子任务i的阶段2.在此阶段,第1阶段的每个子任务将有M个子任务.
我想为第1阶段和第2阶段的线程使用相同的线程池,我从第1阶段到第2阶段的唯一解决方案是轮询N个子任务中的每一个,直到其中一个结束,然后调用第2阶段结束,并重复直到所有N个子任务结束.
我正在使用Joe Watkins在pthreads源中包含的线程池的示例作为基本代码.
您应该首先阅读:https://gist.github.com/krakjoe/6437782
<?php /** * Normal worker */ class PooledWorker extends Worker { public function run(){} } /** * Don't descend from pthreads, normal objects should be used for pools */ class Pool { protected $size; protected $workers; /** * Construct a worker pool of the given size * @param integer $size */ public function __construct($size) { $this->size = $size; } /** * Start worker threads */ public function start() { while (@$worker++ < $this->size) { $this->workers[$worker] = new PooledWorker(); $this->workers[$worker]->start(); } return count($this->workers); } /** * Submit a task to pool */ public function submit(Stackable $task) { $this->workers[array_rand($this->workers)] ->stack($task); return $task; } /** * Shutdown worker threads */ public function shutdown() { foreach ($this->workers as $worker) $worker->shutdown(); } } class StageTwo extends Stackable { /** * Construct StageTwo from a part of StageOne data * @param int $data */ public function __construct($data) { $this->data = $data; } public function run(){ printf( "Thread %lu got data: %d\n", $this->worker->getThreadId(), $this->data); } } class StageOne extends Stackable { protected $done; /** * Construct StageOne with suitable storage for data * @param StagingData $data */ public function __construct(StagingData $data) { $this->data = $data; } public function run() { /* create dummy data array */ while (@$i++ < 100) { $this->data[] = mt_rand( 20, 1000); } $this->done = true; } } /** * StagingData to hold data from StageOne */ class StagingData extends Stackable { public function run() {} } /* stage and data reference arrays */ $one = []; $two = []; $data = []; $pool = new Pool(8); $pool->start(); /* construct stage one */ while (count($one) < 10) { $staging = new StagingData(); /* maintain reference counts by storing return value in normal array in local scope */ $one[] = $pool ->submit(new StageOne($staging)); /* maintain reference counts */ $data[] = $staging; } /* construct stage two */ while (count($one)) { /* find completed StageOne objects */ foreach ($one as $id => $job) { /* if done is set, the data from this StageOne can be used */ if ($job->done) { /* use each element of data to create new tasks for StageTwo */ foreach ($job->data as $chunk) { /* submit stage two */ $two[] = $pool ->submit(new StageTwo($chunk)); } /* no longer required */ unset($one[$id]); } } /* in the real world, it is unecessary to keep polling the array */ /* you probably have some work you want to do ... do it :) */ if (count($one)) { /* everyone likes sleep ... */ usleep(1000000); } } /* all tasks stacked, the pool can be shutdown */ $pool->shutdown(); ?>
将输出:
Thread 140012266239744 got data: 612 Thread 140012275222272 got data: 267 Thread 140012257257216 got data: 971 Thread 140012033140480 got data: 881 Thread 140012257257216 got data: 1000 Thread 140012016355072 got data: 261 Thread 140012257257216 got data: 510 Thread 140012016355072 got data: 148 Thread 140012016355072 got data: 501 Thread 140012257257216 got data: 767 Thread 140012024747776 got data: 504 Thread 140012033140480 got data: 401 Thread 140012275222272 got data: 20 <-- trimmed from 1000 lines --> Thread 140012041533184 got data: 285 Thread 140012275222272 got data: 811 Thread 140012041533184 got data: 436 Thread 140012257257216 got data: 977 Thread 140012033140480 got data: 830 Thread 140012275222272 got data: 554 Thread 140012024747776 got data: 704 Thread 140012033140480 got data: 50 Thread 140012257257216 got data: 794 Thread 140012024747776 got data: 724 Thread 140012033140480 got data: 624 Thread 140012266239744 got data: 756 Thread 140012284204800 got data: 997 Thread 140012266239744 got data: 708 Thread 140012266239744 got data: 981
因为您想使用一个池,所以除了在创建池的主上下文中创建任务之外别无选择.我可以想象其他解决方案,但你特别要求这种解决方案.
根据我可以使用的硬件,以及要处理的任务和数据的性质,我可能有多个[小]线程池,每个工作一个,这将允许StageOne在Worker中创建StageTwo对象正在执行它们的上下文,可能需要考虑.