当前位置:  首页  >  PHP教程  >  PHP 应用  >  代码收藏

php用socket重写CURL实现多线程资源请求代码

Socket替换Curl重新封装实现HTTP/HTTPS[POST/GET]请求,支持无阻塞,多并发。植入缓存调度,性能监控。

Socket 替换Curl 重新封装实现 HTTP/HTTPS [ POST/GET ] 请求, 支持无阻塞,多并发。植入缓存调度,性能监控。


_/___.' >'"".
                | | : ` \`.;`\ _ /`;.`/  ` : | |
                  \ \ `. \_ __\ /__ _/ .` / /
          ======`.____`.___\_____/___.`____.'======
                             `=='
 
          .............................................
                   佛祖保佑             永无BUG
 
  危险代码,谨慎修改
 /



//namespace Http;
class Curls{
    private  $_instance;
    //demo
    public static function demo(){

        $urls=array(

            array(
                'url'   =>'http://aaa.com.cn/api/v3/get?feed_fmt=1&dedup=32&merge=3&statics=1&this_page=1&rfunc=105&fields=url&offset=0&length=10&feed_fields=url,mtitle,stitle,title,wapurls,wapurl,img,comment_total,type,ctime&cateid=sina_all&mod=f&cre=newspagew&pageUrl=http://news.sina.com.cn/c/nd/20161014/docifxwvpar8010943.shtml&_=1476409984855&callback=jsonp_vHEE1476409984854',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
                'cacheTime'=>60,
            ),
            array(
                'url'   =>'http://test.cn/mobile/impress',
                'method'=>'POST',
                'params'=>'{"adunit_id":["PDPS000000054013","PDPS000000051741"],"size":["unknown"],"rotate_count":43,"timestamp":1476264235.3584,"device_id":"8d3d672ac567d13495277adb5319cfca3ac03323","device_platform":"android","device_type":4,"carrier":"2g","client":"newsapp","ip":"10.236.55.81","targeting":[]}',

            ),
            array(
                'url'   =>'http://test.cn/native/impress',
                'method'=>'POST',
                'params'=>'{"adunit_id":["PDPS000000056242","PDPS000000056436","PDPS000000056437","PDPS000000056438"],"timestamp":1476264322.912,"app":{"name":"com.sina.news","channel":"news_news","make":"","model":"","os":"android","osv":"9.2.1","device_type":4,"connection_type":2,"carrier":"0","ip":"10.236.55.81","device_id":"8d3d672ac567d13495277adb5319cfca3ac03323","version":"6054095012"}}'
            ),
        );

        $urls=array(
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=1111',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
                'cacheTime'=>600,
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=222',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
                //'cacheTime'=>600,
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=333',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=444',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=555',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
            ),

        );


        $data=self::httpReq($urls,1000);


        print_r($data);

    }


    public static function httpReq($urls,$timeout) {
        $timeout=$timeout1000;

        if(empty($urls)||!is_array($urls)){return;}

        $urls_mapping = self::prepareReq($urls);
        $urls_mapping_cache=self::cacheCheck($urls_mapping);
        $urls_mapping=$urls_mapping_cache['urlsMapping'];
        $urlCache=$urls_mapping_cache['urlCache'];
        //print_r($urlCache);exit;

        if (FALSE === self::connReq($urls_mapping, $timeout)) {
            return FALSE;
        }
        if (FALSE === self::dispatch($urls_mapping, $timeout)) {
            //return FALSE;
        }



        self::cacheBuild($urls_mapping);

        if(!empty($urlCache)&&is_array($urlCache)){
            $urls_mapping=array_merge($urlCache,$urls_mapping);
        }

        //print_r($urls_mapping);exit;

        foreach ($urls_mapping as $key=>$uData){
            //print_r($uData);exit;
            $data[$key]=!empty($uData['response']['body'])?$uData['response']['body']:'';
            if(empty($data[$key])&&!empty($uData['cacheTime'])){
                $data[$key] = $uData['data'];
            }
            if(empty($data[$key])&&!empty($uData['cacheTime'])){
                $data[$key] = Mcache::getInstance()>get($uData['cacheKey']);
            }
            if(empty($data[$key])&&!empty($uData['cacheTime'])){
                $data[$key] = Mcache::getInstance()>get($uData['cacheKey']. '_bak');
            }
        }
        return $data;
    }

    //检查请求的URL是否有缓存
    public static function cacheCheck($urls_mapping){
        if(!empty($_GET['refresh'])&&$_GET['refresh']==1){return;}
        if (class_exists('Yac', false)) {
            $yac = new Yac();
            $isYac=1;
        }
        foreach ($urls_mapping as $key=>$urlData){
            //print_r($urlData);exit;
            if($urlData['method']=='POST'||empty($urlData['cacheTime'])){continue;}
            $url=$urlData['url'];
            $urlHost=$urlData['component']['host'];
            $urlHost=str_replace('.','_',$urlHost);
            $urlPath=!empty($urlData['component']['path'])?$urlData['component']['path']:'';
            $urlPath=str_replace('/','_',$urlPath);
            $urlPath=str_replace('.','_',$urlPath);
            $cacheKey='curls_'.$urlHost.'_'.$urlPath.'_'.substr(md5($url),0,16);
            $yacCacheKey=md5($cacheKey);
            if($isYac){
                $data = $yac>get($yacCacheKey);
                //print_r($data);exit;
            }
            if(!empty($data)){
                $urlData['data']=$data;
                unset($urls_mapping[$key]);
                $urlCache[$key]=$urlData;
            }else{
                $urls_mapping[$key]['cacheKey']=$cacheKey;
            }
        }
        //print_r($urls_mapping);exit;
        return array(
            'urlsMapping'=>$urls_mapping,
            'urlCache'=>!empty($urlCache)?$urlCache:array()
        );

    }

    public static function cacheBuild($urls_mapping){
        if (class_exists('Yac', false)) {
            $yac = new Yac();
            $isYac=1;
        }
        foreach ($urls_mapping as $key=>$uData){
            if(!empty($uData['cacheTime'])&&!empty($uData['response']['body'])){
                if($isYac){
                    $yac>set(md5($uData['cacheKey']), $uData['response']['body'],(int)$uData['cacheTime']);
                }
                Mcache::getInstance()>set($uData['cacheKey'], $uData['response']['body'], 3600);
                Mcache::getInstance()>set($uData['cacheKey'].'_bak', $uData['response']['body'], 36000);
            }
        }
    }

    public static function prepareReq($urls) {
        $urls_mapping = array();

        foreach ($urls as $urlData) {

            $url=$urlData['url'];

            if($urlData['method']=='GET'&&!empty($urlData['params'])){
                $params = http_build_query($urlData['params']);
                $concat = strpos($url, '?') !== false ? '&' : '?';
                $url .= $concat.$params;
            }

            if(!empty($_GET['refresh'])){
                $url .= strpos($url,'?')!==false?'&':'?'.'__refresh=1&refresh=1';
            }

            $urls_mapping[$url]['url'] = $url;
            $urls_mapping[$url]['status'] = 'inited';

            $component = parse_url($url);

            // 异常Case检查
            if (!$component || !isset($component['host'])) {
                $urls_mapping[$url]['error'] = 'parse_url failed: ' . json_encode($component);
                continue;
            }
            if (isset($component['scheme'])
                && !in_array($component['scheme'], array('http', 'https'))) {
                $urls_mapping[$url]['error'] = 'unsupport scheme: ' . $component['scheme'];
                continue;
            }

            // 默认值处理
            if (!isset($component['scheme'])) {
                $component['scheme'] = 'http';
            }
            if (!isset($component['port'])) {
                $component['port'] = $component['scheme'] == 'https' ? '443' : '80';
            }

            $urls_mapping[$url]['status'] = 'prepared';
            $urls_mapping[$url]['component'] = $component;
            $urls_mapping[$url]['method'] = $urlData['method'];

            if($urlData['method']=='POST'&&!empty($urlData['params'])){
                $urls_mapping[$url]['params'] = $urlData['params'];
            }else{
                $urls_mapping[$url]['params']=array();
            }

            if($urlData['method']=='GET'&&!empty($urlData['cacheTime'])){
                $urls_mapping[$url]['cacheTime'] = $urlData['cacheTime'];
            }else{
                $urls_mapping[$url]['cacheTime']='';
            }

        }
        return $urls_mapping;
    }

    public static function connReq(&$urls_mapping, &$timeout) {
        foreach ($urls_mapping as $url => $url_info) {
            if ($url_info['status'] !== 'prepared') {
                continue;
            }
            $host = $url_info['component']['host'];
            $port = $url_info['component']['port'];
            $begin = microtime(TRUE)  1000000;


            /$sock = @stream_socket_client("tcp://$host:$port", $conn_errno,
                $conn_errstr, max(1, floor($timeout / 1000000)), STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);/

            $sock = @stream_socket_client("tcp://$host:$port", $conn_errno,
                $conn_errstr, max(1, floor($timeout / 1000000)),STREAM_CLIENT_CONNECT);


            $end = microtime(TRUE)  1000000;
            $timeout = ($end  $begin);
            if ($timeout <= 0) {
                return FALSE;
            }
            stream_set_blocking($sock, FALSE);

            if ($sock) {
                $urls_mapping[$url]['sock'] = $sock;
                $urls_mapping[$url]['status'] = 'connected';
            } else {
                $urls_mapping[$url]['conn_errno'] = $conn_errno;
                $urls_mapping[$url]['conn_errstr'] = $conn_errstr;
            }
        }
    }

    public static function dispatch(&$urls_mapping, &$timeout) {
        $socks       = array();
        $socks_index = array();
        foreach ($urls_mapping as $url => $url_info) {
            if ($url_info['status'] !== 'connected') {
                continue;
            }
            $socks_index[(int)$url_info['sock']] = $url;
            $socks[] = $url_info['sock'];
        }

        self::writeReq($urls_mapping, $socks_index, $socks);

        while (count($socks) > 0) {
            $rsocks = $socks;
            $wsocks = array();
            $esocks = array();

            $begin = microtime(TRUE)  1000000;
            //$nsock = stream_select($rsocks, $wsocks, $esocks, 0, $timeout % 1000000);
            $nsock = @stream_select($rsocks, $wsocks, $esocks, floor($timeout / 1000000), $timeout % 1000000);

            //stream_set_blocking($rsocks, FALSE);
            $end = microtime(TRUE)  1000000;
            $timeout = ($end  $begin);
            if ($timeout <= 0) {
                return FALSE;
            }
            if ($nsock <= 0) {
                foreach ($socks as $sock) {
                    $url = $socks_index[(int)$sock];
                    $urls_mapping[$url]['error'] = 'stream_select timeout';
                }
                break;
            }

            if (!empty($rsocks)) {
                self::readResp($urls_mapping, $socks_index, $rsocks);

                $socks = array_diff($socks, $rsocks);
            }
        }
    }
    public static function readResp(&$urls_mapping, $socks_index, $rsocks) {
        foreach ($rsocks as $idx => $sock) {
            $url = $socks_index[(int)$sock];
            $url_info = &$urls_mapping[$url];

            $url_info['status'] = 'reading';

            $data = '';
            while (!feof($sock)) {
                $data .= fread($sock, 8192);
            }

            if (strlen($data) == 0) {
                $url_info['error'] = 'response empty';
                fclose($sock);
                continue;
            }

            $url_info['status'] = 'readed';
            $url_info['response_str'] = $data;
            self::parseResponse($url_info);
        }
    }
    public static function parseResponse(&$url_info) {
        list($header_str, $body_str) = array_pad(explode("\r\n\r\n", $url_info['response_str'], 2), 2, '');
        $headers_str = explode("\r\n", $header_str);
        $headers     = array();
        foreach ($headers_str as $header_str) {
            list($name, $val) = array_pad(explode(":", $header_str, 2), 2, "");
            $name = trim($name);
            $val  = trim($val);
            if (isset($headers[$name])) {
                if (is_array($headers[$name])) {
                    $headers[$name][] = $val;
                } else {
                    $headers[$name] = array($headers[$name], $val);
                }
            } else {
                $headers[$name] = $val;
            }
        }
        $url_info['response'] = array(
            'headers' => $headers,
            'body'    => $body_str,
        );
    }
    public static function writeReq(&$urls_mapping, $socks_index, &$wsocks) {
        foreach ($wsocks as $idx => $sock) {
            $url = $socks_index[(int)$sock];
            $url_info = &$urls_mapping[$url];

            $url_info['status'] = 'writing';

            self::buildRequest($url_info);
            //print_r($sock);
            $wlen = @fwrite($sock, $url_info['req_str']);
            stream_set_blocking($sock, FALSE);
            if ($wlen != strlen($url_info['req_str'])) {
                $url_info['error'] = 'Write request error[expect write len: ' . strlen($url_info['req_str']) . ', actually write len: ' . $wlen . ']';
                unset($wsocks[$idx]);
                continue;
            }
            $url_info['status'] = 'writed';
        }
    }
    public static function buildRequest(&$url_info) {
        $host   = $url_info['component']['host'];
        $path   = !empty($url_info['component']['path'])?$url_info['component']['path']:'';
        $method = $url_info['method'];
        $query_string = isset($url_info['component']['query']) ? "?{$url_info['component']['query']}" : '';

        //$url_info['req_str'] = "GET {$path}{$query_string} HTTP/1.0\r\nHost: {$host}\r\n\r\n";
        $req='';
        $req.="$method {$path}{$query_string} HTTP/1.0\r\n";
        $req.="Host: $host\r\n";
        if($method=='POST'){
            $postData='';
            if(!empty($url_info['params'])){
                if(is_array($url_info['params'])){
                    $postData=http_build_query($url_info['params']);
                }else{
                    $postData=$url_info['params'];
                }
            }
            $req.='Contentlength: '. strlen($postData) ."\r\n";
            $req.="Connection: close\r\n\r\n$postData";
        }else{
            $req.="\r\n";
        }
        $url_info['req_str'] = $req;
    }
}


//Http::demo();









/
// stop profiler
$xhprof_data = xhprof_disable();

// display raw xhprof data for the profiler run
//print_r($xhprof_data);


$XHPROF_ROOT = realpath(dirname(__FILE__) .'/xhprof0.9.3/');
//print_r($XHPROF_ROOT);exit;

include_once $XHPROF_ROOT . "/xhprof_lib/utils/xhprof_lib.php";
include_once $XHPROF_ROOT . "/xhprof_lib/utils/xhprof_runs.php";

// save raw data for this profiler run using default
// implementation of iXHProfRuns.
$xhprof_runs = new XHProfRuns_Default();

// save the run under a namespace "xhprof_foo"
$run_id = $xhprof_runs>save_run($xhprof_data, "xhprof_foo");

echo "\n".
    "Assuming you have set up the http based UI for \n".
    "XHProf at some address, you can view run at \n".
    "http://test.liuqing.com/xhprof0.9.3/xhprof_html/index.php?run=$run_id&source=xhprof_foo\n".
    "\n";


/

吐了个 "CAO" !
扫码关注 PHP1 官方微信号
PHP1.CN | 中国最专业的PHP中文社区 | PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | PHP问答
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved PHP1.CN 第一PHP社区 版权所有