热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

php用socket重写CURL实现多线程资源请求代码

Socket替换Curl重新封装实现HTTP/HTTPS[POST/GET]请求,支持无阻塞,多并发。植入缓存调度,性能监控。

Socket 替换Curl 重新封装实现 HTTP/HTTPS [ POST/GET ] 请求, 支持无阻塞,多并发。植入缓存调度,性能监控。


_/___.' >'"".
                | | : ` \`.;`\ _ /`;.`/  ` : | |
                  \ \ `. \_ __\ /__ _/ .` / /
          ======`.____`.___\_____/___.`____.'======
                             `=='
 
          .............................................
                   佛祖保佑             永无BUG
 
  危险代码,谨慎修改
 /



//namespace Http;
class Curls{
    private  $_instance;
    //demo
    public static function demo(){

        $urls=array(

            array(
                'url'   =>'http://aaa.com.cn/api/v3/get?feed_fmt=1&dedup=32&merge=3&statics=1&this_page=1&rfunc=105&fields=url&offset=0&length=10&feed_fields=url,mtitle,stitle,title,wapurls,wapurl,img,comment_total,type,ctime&cateid=sina_all&mod=f&cre=newspagew&pageUrl=http://news.sina.com.cn/c/nd/20161014/docifxwvpar8010943.shtml&_=1476409984855&callback=jsonp_vHEE1476409984854',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
                'cacheTime'=>60,
            ),
            array(
                'url'   =>'http://test.cn/mobile/impress',
                'method'=>'POST',
                'params'=>'{"adunit_id":["PDPS000000054013","PDPS000000051741"],"size":["unknown"],"rotate_count":43,"timestamp":1476264235.3584,"device_id":"8d3d672ac567d13495277adb5319cfca3ac03323","device_platform":"android","device_type":4,"carrier":"2g","client":"newsapp","ip":"10.236.55.81","targeting":[]}',

            ),
            array(
                'url'   =>'http://test.cn/native/impress',
                'method'=>'POST',
                'params'=>'{"adunit_id":["PDPS000000056242","PDPS000000056436","PDPS000000056437","PDPS000000056438"],"timestamp":1476264322.912,"app":{"name":"com.sina.news","channel":"news_news","make":"","model":"","os":"android","osv":"9.2.1","device_type":4,"connection_type":2,"carrier":"0","ip":"10.236.55.81","device_id":"8d3d672ac567d13495277adb5319cfca3ac03323","version":"6054095012"}}'
            ),
        );

        $urls=array(
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=1111',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
                'cacheTime'=>600,
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=222',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
                //'cacheTime'=>600,
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=333',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=444',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
            ),
            array(
                'url'=>'http://127.0.0.1/sleep1.php?aaa=555',
                'method'=>'GET',
                'params'=>array(
                    'a'=>1,
                    'b'=>2,
                    'c'=>3,
                ),
            ),

        );


        $data=self::httpReq($urls,1000);


        print_r($data);

    }


    public static function httpReq($urls,$timeout) {
        $timeout=$timeout1000;

        if(empty($urls)||!is_array($urls)){return;}

        $urls_mapping = self::prepareReq($urls);
        $urls_mapping_cache=self::cacheCheck($urls_mapping);
        $urls_mapping=$urls_mapping_cache['urlsMapping'];
        $urlCache=$urls_mapping_cache['urlCache'];
        //print_r($urlCache);exit;

        if (FALSE === self::connReq($urls_mapping, $timeout)) {
            return FALSE;
        }
        if (FALSE === self::dispatch($urls_mapping, $timeout)) {
            //return FALSE;
        }



        self::cacheBuild($urls_mapping);

        if(!empty($urlCache)&&is_array($urlCache)){
            $urls_mapping=array_merge($urlCache,$urls_mapping);
        }

        //print_r($urls_mapping);exit;

        foreach ($urls_mapping as $key=>$uData){
            //print_r($uData);exit;
            $data[$key]=!empty($uData['response']['body'])?$uData['response']['body']:'';
            if(empty($data[$key])&&!empty($uData['cacheTime'])){
                $data[$key] = $uData['data'];
            }
            if(empty($data[$key])&&!empty($uData['cacheTime'])){
                $data[$key] = Mcache::getInstance()>get($uData['cacheKey']);
            }
            if(empty($data[$key])&&!empty($uData['cacheTime'])){
                $data[$key] = Mcache::getInstance()>get($uData['cacheKey']. '_bak');
            }
        }
        return $data;
    }

    //检查请求的URL是否有缓存
    public static function cacheCheck($urls_mapping){
        if(!empty($_GET['refresh'])&&$_GET['refresh']==1){return;}
        if (class_exists('Yac', false)) {
            $yac = new Yac();
            $isYac=1;
        }
        foreach ($urls_mapping as $key=>$urlData){
            //print_r($urlData);exit;
            if($urlData['method']=='POST'||empty($urlData['cacheTime'])){continue;}
            $url=$urlData['url'];
            $urlHost=$urlData['component']['host'];
            $urlHost=str_replace('.','_',$urlHost);
            $urlPath=!empty($urlData['component']['path'])?$urlData['component']['path']:'';
            $urlPath=str_replace('/','_',$urlPath);
            $urlPath=str_replace('.','_',$urlPath);
            $cacheKey='curls_'.$urlHost.'_'.$urlPath.'_'.substr(md5($url),0,16);
            $yacCacheKey=md5($cacheKey);
            if($isYac){
                $data = $yac>get($yacCacheKey);
                //print_r($data);exit;
            }
            if(!empty($data)){
                $urlData['data']=$data;
                unset($urls_mapping[$key]);
                $urlCache[$key]=$urlData;
            }else{
                $urls_mapping[$key]['cacheKey']=$cacheKey;
            }
        }
        //print_r($urls_mapping);exit;
        return array(
            'urlsMapping'=>$urls_mapping,
            'urlCache'=>!empty($urlCache)?$urlCache:array()
        );

    }

    public static function cacheBuild($urls_mapping){
        if (class_exists('Yac', false)) {
            $yac = new Yac();
            $isYac=1;
        }
        foreach ($urls_mapping as $key=>$uData){
            if(!empty($uData['cacheTime'])&&!empty($uData['response']['body'])){
                if($isYac){
                    $yac>set(md5($uData['cacheKey']), $uData['response']['body'],(int)$uData['cacheTime']);
                }
                Mcache::getInstance()>set($uData['cacheKey'], $uData['response']['body'], 3600);
                Mcache::getInstance()>set($uData['cacheKey'].'_bak', $uData['response']['body'], 36000);
            }
        }
    }

    public static function prepareReq($urls) {
        $urls_mapping = array();

        foreach ($urls as $urlData) {

            $url=$urlData['url'];

            if($urlData['method']=='GET'&&!empty($urlData['params'])){
                $params = http_build_query($urlData['params']);
                $cOncat= strpos($url, '?') !== false ? '&' : '?';
                $url .= $concat.$params;
            }

            if(!empty($_GET['refresh'])){
                $url .= strpos($url,'?')!==false?'&':'?'.'__refresh=1&refresh=1';
            }

            $urls_mapping[$url]['url'] = $url;
            $urls_mapping[$url]['status'] = 'inited';

            $compOnent= parse_url($url);

            // 异常Case检查
            if (!$component || !isset($component['host'])) {
                $urls_mapping[$url]['error'] = 'parse_url failed: ' . json_encode($component);
                continue;
            }
            if (isset($component['scheme'])
                && !in_array($component['scheme'], array('http', 'https'))) {
                $urls_mapping[$url]['error'] = 'unsupport scheme: ' . $component['scheme'];
                continue;
            }

            // 默认值处理
            if (!isset($component['scheme'])) {
                $component['scheme'] = 'http';
            }
            if (!isset($component['port'])) {
                $component['port'] = $component['scheme'] == 'https' ? '443' : '80';
            }

            $urls_mapping[$url]['status'] = 'prepared';
            $urls_mapping[$url]['component'] = $component;
            $urls_mapping[$url]['method'] = $urlData['method'];

            if($urlData['method']=='POST'&&!empty($urlData['params'])){
                $urls_mapping[$url]['params'] = $urlData['params'];
            }else{
                $urls_mapping[$url]['params']=array();
            }

            if($urlData['method']=='GET'&&!empty($urlData['cacheTime'])){
                $urls_mapping[$url]['cacheTime'] = $urlData['cacheTime'];
            }else{
                $urls_mapping[$url]['cacheTime']='';
            }

        }
        return $urls_mapping;
    }

    public static function connReq(&$urls_mapping, &$timeout) {
        foreach ($urls_mapping as $url => $url_info) {
            if ($url_info['status'] !== 'prepared') {
                continue;
            }
            $host = $url_info['component']['host'];
            $port = $url_info['component']['port'];
            $begin = microtime(TRUE)  1000000;


            /$sock = @stream_socket_client("tcp://$host:$port", $conn_errno,
                $conn_errstr, max(1, floor($timeout / 1000000)), STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);/

            $sock = @stream_socket_client("tcp://$host:$port", $conn_errno,
                $conn_errstr, max(1, floor($timeout / 1000000)),STREAM_CLIENT_CONNECT);


            $end = microtime(TRUE)  1000000;
            $timeout = ($end  $begin);
            if ($timeout <= 0) {
                return FALSE;
            }
            stream_set_blocking($sock, FALSE);

            if ($sock) {
                $urls_mapping[$url]['sock'] = $sock;
                $urls_mapping[$url]['status'] = 'connected';
            } else {
                $urls_mapping[$url]['conn_errno'] = $conn_errno;
                $urls_mapping[$url]['conn_errstr'] = $conn_errstr;
            }
        }
    }

    public static function dispatch(&$urls_mapping, &$timeout) {
        $socks       = array();
        $socks_index = array();
        foreach ($urls_mapping as $url => $url_info) {
            if ($url_info['status'] !== 'connected') {
                continue;
            }
            $socks_index[(int)$url_info['sock']] = $url;
            $socks[] = $url_info['sock'];
        }

        self::writeReq($urls_mapping, $socks_index, $socks);

        while (count($socks) > 0) {
            $rsocks = $socks;
            $wsocks = array();
            $esocks = array();

            $begin = microtime(TRUE)  1000000;
            //$nsock = stream_select($rsocks, $wsocks, $esocks, 0, $timeout % 1000000);
            $nsock = @stream_select($rsocks, $wsocks, $esocks, floor($timeout / 1000000), $timeout % 1000000);

            //stream_set_blocking($rsocks, FALSE);
            $end = microtime(TRUE)  1000000;
            $timeout = ($end  $begin);
            if ($timeout <= 0) {
                return FALSE;
            }
            if ($nsock <= 0) {
                foreach ($socks as $sock) {
                    $url = $socks_index[(int)$sock];
                    $urls_mapping[$url]['error'] = 'stream_select timeout';
                }
                break;
            }

            if (!empty($rsocks)) {
                self::readResp($urls_mapping, $socks_index, $rsocks);

                $socks = array_diff($socks, $rsocks);
            }
        }
    }
    public static function readResp(&$urls_mapping, $socks_index, $rsocks) {
        foreach ($rsocks as $idx => $sock) {
            $url = $socks_index[(int)$sock];
            $url_info = &$urls_mapping[$url];

            $url_info['status'] = 'reading';

            $data = '';
            while (!feof($sock)) {
                $data .= fread($sock, 8192);
            }

            if (strlen($data) == 0) {
                $url_info['error'] = 'response empty';
                fclose($sock);
                continue;
            }

            $url_info['status'] = 'readed';
            $url_info['response_str'] = $data;
            self::parseResponse($url_info);
        }
    }
    public static function parseResponse(&$url_info) {
        list($header_str, $body_str) = array_pad(explode("\r\n\r\n", $url_info['response_str'], 2), 2, '');
        $headers_str = explode("\r\n", $header_str);
        $headers     = array();
        foreach ($headers_str as $header_str) {
            list($name, $val) = array_pad(explode(":", $header_str, 2), 2, "");
            $name = trim($name);
            $val  = trim($val);
            if (isset($headers[$name])) {
                if (is_array($headers[$name])) {
                    $headers[$name][] = $val;
                } else {
                    $headers[$name] = array($headers[$name], $val);
                }
            } else {
                $headers[$name] = $val;
            }
        }
        $url_info['response'] = array(
            'headers' => $headers,
            'body'    => $body_str,
        );
    }
    public static function writeReq(&$urls_mapping, $socks_index, &$wsocks) {
        foreach ($wsocks as $idx => $sock) {
            $url = $socks_index[(int)$sock];
            $url_info = &$urls_mapping[$url];

            $url_info['status'] = 'writing';

            self::buildRequest($url_info);
            //print_r($sock);
            $wlen = @fwrite($sock, $url_info['req_str']);
            stream_set_blocking($sock, FALSE);
            if ($wlen != strlen($url_info['req_str'])) {
                $url_info['error'] = 'Write request error[expect write len: ' . strlen($url_info['req_str']) . ', actually write len: ' . $wlen . ']';
                unset($wsocks[$idx]);
                continue;
            }
            $url_info['status'] = 'writed';
        }
    }
    public static function buildRequest(&$url_info) {
        $host   = $url_info['component']['host'];
        $path   = !empty($url_info['component']['path'])?$url_info['component']['path']:'';
        $method = $url_info['method'];
        $query_string = isset($url_info['component']['query']) ? "?{$url_info['component']['query']}" : '';

        //$url_info['req_str'] = "GET {$path}{$query_string} HTTP/1.0\r\nHost: {$host}\r\n\r\n";
        $req='';
        $req.="$method {$path}{$query_string} HTTP/1.0\r\n";
        $req.="Host: $host\r\n";
        if($method=='POST'){
            $postData='';
            if(!empty($url_info['params'])){
                if(is_array($url_info['params'])){
                    $postData=http_build_query($url_info['params']);
                }else{
                    $postData=$url_info['params'];
                }
            }
            $req.='Contentlength: '. strlen($postData) ."\r\n";
            $req.="Connection: close\r\n\r\n$postData";
        }else{
            $req.="\r\n";
        }
        $url_info['req_str'] = $req;
    }
}


//Http::demo();









/
// stop profiler
$xhprof_data = xhprof_disable();

// display raw xhprof data for the profiler run
//print_r($xhprof_data);


$XHPROF_ROOT = realpath(dirname(__FILE__) .'/xhprof0.9.3/');
//print_r($XHPROF_ROOT);exit;

include_once $XHPROF_ROOT . "/xhprof_lib/utils/xhprof_lib.php";
include_once $XHPROF_ROOT . "/xhprof_lib/utils/xhprof_runs.php";

// save raw data for this profiler run using default
// implementation of iXHProfRuns.
$xhprof_runs = new XHProfRuns_Default();

// save the run under a namespace "xhprof_foo"
$run_id = $xhprof_runs>save_run($xhprof_data, "xhprof_foo");

echo "\n".
    "Assuming you have set up the http based UI for \n".
    "XHProf at some address, you can view run at \n".
    "http://test.liuqing.com/xhprof0.9.3/xhprof_html/index.php?run=$run_id&source=xhprof_foo\n".
    "\n";


/


推荐阅读
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 本文介绍了lua语言中闭包的特性及其在模式匹配、日期处理、编译和模块化等方面的应用。lua中的闭包是严格遵循词法定界的第一类值,函数可以作为变量自由传递,也可以作为参数传递给其他函数。这些特性使得lua语言具有极大的灵活性,为程序开发带来了便利。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文讨论了Alink回归预测的不完善问题,指出目前主要针对Python做案例,对其他语言支持不足。同时介绍了pom.xml文件的基本结构和使用方法,以及Maven的相关知识。最后,对Alink回归预测的未来发展提出了期待。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • Mac OS 升级到11.2.2 Eclipse打不开了,报错Failed to create the Java Virtual Machine
    本文介绍了在Mac OS升级到11.2.2版本后,使用Eclipse打开时出现报错Failed to create the Java Virtual Machine的问题,并提供了解决方法。 ... [详细]
  • 本文介绍了在SpringBoot中集成thymeleaf前端模版的配置步骤,包括在application.properties配置文件中添加thymeleaf的配置信息,引入thymeleaf的jar包,以及创建PageController并添加index方法。 ... [详细]
  • 本文详细介绍了Linux中进程控制块PCBtask_struct结构体的结构和作用,包括进程状态、进程号、待处理信号、进程地址空间、调度标志、锁深度、基本时间片、调度策略以及内存管理信息等方面的内容。阅读本文可以更加深入地了解Linux进程管理的原理和机制。 ... [详细]
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • 1,关于死锁的理解死锁,我们可以简单的理解为是两个线程同时使用同一资源,两个线程又得不到相应的资源而造成永无相互等待的情况。 2,模拟死锁背景介绍:我们创建一个朋友 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
author-avatar
null5269
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有