Socket 替换Curl 重新封装实现 HTTP/HTTPS [ POST/GET ] 请求, 支持无阻塞,多并发。植入缓存调度,性能监控。
_/___.' >'"". | | : ` \`.;`\ _ /`;.`/ ` : | | \ \ `. \_ __\ /__ _/ .` / / ======`.____`.___\_____/___.`____.'====== `==' ............................................. 佛祖保佑 永无BUG 危险代码,谨慎修改 / //namespace Http; class Curls{ private $_instance; //demo public static function demo(){ $urls=array( array( 'url' =>'http://aaa.com.cn/api/v3/get?feed_fmt=1&dedup=32&merge=3&statics=1&this_page=1&rfunc=105&fields=url&offset=0&length=10&feed_fields=url,mtitle,stitle,title,wapurls,wapurl,img,comment_total,type,ctime&cateid=sina_all&mod=f&cre=newspagew&pageUrl=http://news.sina.com.cn/c/nd/20161014/docifxwvpar8010943.shtml&_=1476409984855&callback=jsonp_vHEE1476409984854', 'method'=>'GET', 'params'=>array( 'a'=>1, 'b'=>2, 'c'=>3, ), 'cacheTime'=>60, ), array( 'url' =>'http://test.cn/mobile/impress', 'method'=>'POST', 'params'=>'{"adunit_id":["PDPS000000054013","PDPS000000051741"],"size":["unknown"],"rotate_count":43,"timestamp":1476264235.3584,"device_id":"8d3d672ac567d13495277adb5319cfca3ac03323","device_platform":"android","device_type":4,"carrier":"2g","client":"newsapp","ip":"10.236.55.81","targeting":[]}', ), array( 'url' =>'http://test.cn/native/impress', 'method'=>'POST', 'params'=>'{"adunit_id":["PDPS000000056242","PDPS000000056436","PDPS000000056437","PDPS000000056438"],"timestamp":1476264322.912,"app":{"name":"com.sina.news","channel":"news_news","make":"","model":"","os":"android","osv":"9.2.1","device_type":4,"connection_type":2,"carrier":"0","ip":"10.236.55.81","device_id":"8d3d672ac567d13495277adb5319cfca3ac03323","version":"6054095012"}}' ), ); $urls=array( array( 'url'=>'http://127.0.0.1/sleep1.php?aaa=1111', 'method'=>'GET', 'params'=>array( 'a'=>1, 'b'=>2, 'c'=>3, ), 'cacheTime'=>600, ), array( 'url'=>'http://127.0.0.1/sleep1.php?aaa=222', 'method'=>'GET', 'params'=>array( 'a'=>1, 'b'=>2, 'c'=>3, ), //'cacheTime'=>600, ), array( 'url'=>'http://127.0.0.1/sleep1.php?aaa=333', 'method'=>'GET', 'params'=>array( 'a'=>1, 'b'=>2, 'c'=>3, ), ), array( 'url'=>'http://127.0.0.1/sleep1.php?aaa=444', 'method'=>'GET', 'params'=>array( 'a'=>1, 'b'=>2, 'c'=>3, ), ), array( 'url'=>'http://127.0.0.1/sleep1.php?aaa=555', 'method'=>'GET', 'params'=>array( 'a'=>1, 'b'=>2, 'c'=>3, ), ), ); $data=self::httpReq($urls,1000); print_r($data); } public static function httpReq($urls,$timeout) { $timeout=$timeout1000; if(empty($urls)||!is_array($urls)){return;} $urls_mapping = self::prepareReq($urls); $urls_mapping_cache=self::cacheCheck($urls_mapping); $urls_mapping=$urls_mapping_cache['urlsMapping']; $urlCache=$urls_mapping_cache['urlCache']; //print_r($urlCache);exit; if (FALSE === self::connReq($urls_mapping, $timeout)) { return FALSE; } if (FALSE === self::dispatch($urls_mapping, $timeout)) { //return FALSE; } self::cacheBuild($urls_mapping); if(!empty($urlCache)&&is_array($urlCache)){ $urls_mapping=array_merge($urlCache,$urls_mapping); } //print_r($urls_mapping);exit; foreach ($urls_mapping as $key=>$uData){ //print_r($uData);exit; $data[$key]=!empty($uData['response']['body'])?$uData['response']['body']:''; if(empty($data[$key])&&!empty($uData['cacheTime'])){ $data[$key] = $uData['data']; } if(empty($data[$key])&&!empty($uData['cacheTime'])){ $data[$key] = Mcache::getInstance()>get($uData['cacheKey']); } if(empty($data[$key])&&!empty($uData['cacheTime'])){ $data[$key] = Mcache::getInstance()>get($uData['cacheKey']. '_bak'); } } return $data; } //检查请求的URL是否有缓存 public static function cacheCheck($urls_mapping){ if(!empty($_GET['refresh'])&&$_GET['refresh']==1){return;} if (class_exists('Yac', false)) { $yac = new Yac(); $isYac=1; } foreach ($urls_mapping as $key=>$urlData){ //print_r($urlData);exit; if($urlData['method']=='POST'||empty($urlData['cacheTime'])){continue;} $url=$urlData['url']; $urlHost=$urlData['component']['host']; $urlHost=str_replace('.','_',$urlHost); $urlPath=!empty($urlData['component']['path'])?$urlData['component']['path']:''; $urlPath=str_replace('/','_',$urlPath); $urlPath=str_replace('.','_',$urlPath); $cacheKey='curls_'.$urlHost.'_'.$urlPath.'_'.substr(md5($url),0,16); $yacCacheKey=md5($cacheKey); if($isYac){ $data = $yac>get($yacCacheKey); //print_r($data);exit; } if(!empty($data)){ $urlData['data']=$data; unset($urls_mapping[$key]); $urlCache[$key]=$urlData; }else{ $urls_mapping[$key]['cacheKey']=$cacheKey; } } //print_r($urls_mapping);exit; return array( 'urlsMapping'=>$urls_mapping, 'urlCache'=>!empty($urlCache)?$urlCache:array() ); } public static function cacheBuild($urls_mapping){ if (class_exists('Yac', false)) { $yac = new Yac(); $isYac=1; } foreach ($urls_mapping as $key=>$uData){ if(!empty($uData['cacheTime'])&&!empty($uData['response']['body'])){ if($isYac){ $yac>set(md5($uData['cacheKey']), $uData['response']['body'],(int)$uData['cacheTime']); } Mcache::getInstance()>set($uData['cacheKey'], $uData['response']['body'], 3600); Mcache::getInstance()>set($uData['cacheKey'].'_bak', $uData['response']['body'], 36000); } } } public static function prepareReq($urls) { $urls_mapping = array(); foreach ($urls as $urlData) { $url=$urlData['url']; if($urlData['method']=='GET'&&!empty($urlData['params'])){ $params = http_build_query($urlData['params']); $cOncat= strpos($url, '?') !== false ? '&' : '?'; $url .= $concat.$params; } if(!empty($_GET['refresh'])){ $url .= strpos($url,'?')!==false?'&':'?'.'__refresh=1&refresh=1'; } $urls_mapping[$url]['url'] = $url; $urls_mapping[$url]['status'] = 'inited'; $compOnent= parse_url($url); // 异常Case检查 if (!$component || !isset($component['host'])) { $urls_mapping[$url]['error'] = 'parse_url failed: ' . json_encode($component); continue; } if (isset($component['scheme']) && !in_array($component['scheme'], array('http', 'https'))) { $urls_mapping[$url]['error'] = 'unsupport scheme: ' . $component['scheme']; continue; } // 默认值处理 if (!isset($component['scheme'])) { $component['scheme'] = 'http'; } if (!isset($component['port'])) { $component['port'] = $component['scheme'] == 'https' ? '443' : '80'; } $urls_mapping[$url]['status'] = 'prepared'; $urls_mapping[$url]['component'] = $component; $urls_mapping[$url]['method'] = $urlData['method']; if($urlData['method']=='POST'&&!empty($urlData['params'])){ $urls_mapping[$url]['params'] = $urlData['params']; }else{ $urls_mapping[$url]['params']=array(); } if($urlData['method']=='GET'&&!empty($urlData['cacheTime'])){ $urls_mapping[$url]['cacheTime'] = $urlData['cacheTime']; }else{ $urls_mapping[$url]['cacheTime']=''; } } return $urls_mapping; } public static function connReq(&$urls_mapping, &$timeout) { foreach ($urls_mapping as $url => $url_info) { if ($url_info['status'] !== 'prepared') { continue; } $host = $url_info['component']['host']; $port = $url_info['component']['port']; $begin = microtime(TRUE) 1000000; /$sock = @stream_socket_client("tcp://$host:$port", $conn_errno, $conn_errstr, max(1, floor($timeout / 1000000)), STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);/ $sock = @stream_socket_client("tcp://$host:$port", $conn_errno, $conn_errstr, max(1, floor($timeout / 1000000)),STREAM_CLIENT_CONNECT); $end = microtime(TRUE) 1000000; $timeout = ($end $begin); if ($timeout <= 0) { return FALSE; } stream_set_blocking($sock, FALSE); if ($sock) { $urls_mapping[$url]['sock'] = $sock; $urls_mapping[$url]['status'] = 'connected'; } else { $urls_mapping[$url]['conn_errno'] = $conn_errno; $urls_mapping[$url]['conn_errstr'] = $conn_errstr; } } } public static function dispatch(&$urls_mapping, &$timeout) { $socks = array(); $socks_index = array(); foreach ($urls_mapping as $url => $url_info) { if ($url_info['status'] !== 'connected') { continue; } $socks_index[(int)$url_info['sock']] = $url; $socks[] = $url_info['sock']; } self::writeReq($urls_mapping, $socks_index, $socks); while (count($socks) > 0) { $rsocks = $socks; $wsocks = array(); $esocks = array(); $begin = microtime(TRUE) 1000000; //$nsock = stream_select($rsocks, $wsocks, $esocks, 0, $timeout % 1000000); $nsock = @stream_select($rsocks, $wsocks, $esocks, floor($timeout / 1000000), $timeout % 1000000); //stream_set_blocking($rsocks, FALSE); $end = microtime(TRUE) 1000000; $timeout = ($end $begin); if ($timeout <= 0) { return FALSE; } if ($nsock <= 0) { foreach ($socks as $sock) { $url = $socks_index[(int)$sock]; $urls_mapping[$url]['error'] = 'stream_select timeout'; } break; } if (!empty($rsocks)) { self::readResp($urls_mapping, $socks_index, $rsocks); $socks = array_diff($socks, $rsocks); } } } public static function readResp(&$urls_mapping, $socks_index, $rsocks) { foreach ($rsocks as $idx => $sock) { $url = $socks_index[(int)$sock]; $url_info = &$urls_mapping[$url]; $url_info['status'] = 'reading'; $data = ''; while (!feof($sock)) { $data .= fread($sock, 8192); } if (strlen($data) == 0) { $url_info['error'] = 'response empty'; fclose($sock); continue; } $url_info['status'] = 'readed'; $url_info['response_str'] = $data; self::parseResponse($url_info); } } public static function parseResponse(&$url_info) { list($header_str, $body_str) = array_pad(explode("\r\n\r\n", $url_info['response_str'], 2), 2, ''); $headers_str = explode("\r\n", $header_str); $headers = array(); foreach ($headers_str as $header_str) { list($name, $val) = array_pad(explode(":", $header_str, 2), 2, ""); $name = trim($name); $val = trim($val); if (isset($headers[$name])) { if (is_array($headers[$name])) { $headers[$name][] = $val; } else { $headers[$name] = array($headers[$name], $val); } } else { $headers[$name] = $val; } } $url_info['response'] = array( 'headers' => $headers, 'body' => $body_str, ); } public static function writeReq(&$urls_mapping, $socks_index, &$wsocks) { foreach ($wsocks as $idx => $sock) { $url = $socks_index[(int)$sock]; $url_info = &$urls_mapping[$url]; $url_info['status'] = 'writing'; self::buildRequest($url_info); //print_r($sock); $wlen = @fwrite($sock, $url_info['req_str']); stream_set_blocking($sock, FALSE); if ($wlen != strlen($url_info['req_str'])) { $url_info['error'] = 'Write request error[expect write len: ' . strlen($url_info['req_str']) . ', actually write len: ' . $wlen . ']'; unset($wsocks[$idx]); continue; } $url_info['status'] = 'writed'; } } public static function buildRequest(&$url_info) { $host = $url_info['component']['host']; $path = !empty($url_info['component']['path'])?$url_info['component']['path']:''; $method = $url_info['method']; $query_string = isset($url_info['component']['query']) ? "?{$url_info['component']['query']}" : ''; //$url_info['req_str'] = "GET {$path}{$query_string} HTTP/1.0\r\nHost: {$host}\r\n\r\n"; $req=''; $req.="$method {$path}{$query_string} HTTP/1.0\r\n"; $req.="Host: $host\r\n"; if($method=='POST'){ $postData=''; if(!empty($url_info['params'])){ if(is_array($url_info['params'])){ $postData=http_build_query($url_info['params']); }else{ $postData=$url_info['params']; } } $req.='Contentlength: '. strlen($postData) ."\r\n"; $req.="Connection: close\r\n\r\n$postData"; }else{ $req.="\r\n"; } $url_info['req_str'] = $req; } } //Http::demo(); / // stop profiler $xhprof_data = xhprof_disable(); // display raw xhprof data for the profiler run //print_r($xhprof_data); $XHPROF_ROOT = realpath(dirname(__FILE__) .'/xhprof0.9.3/'); //print_r($XHPROF_ROOT);exit; include_once $XHPROF_ROOT . "/xhprof_lib/utils/xhprof_lib.php"; include_once $XHPROF_ROOT . "/xhprof_lib/utils/xhprof_runs.php"; // save raw data for this profiler run using default // implementation of iXHProfRuns. $xhprof_runs = new XHProfRuns_Default(); // save the run under a namespace "xhprof_foo" $run_id = $xhprof_runs>save_run($xhprof_data, "xhprof_foo"); echo "\n". "Assuming you have set up the http based UI for \n". "XHProf at some address, you can view run at \n". "http://test.liuqing.com/xhprof0.9.3/xhprof_html/index.php?run=$run_id&source=xhprof_foo\n". "\n"; /