热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Swoole源码分析——Client模块之Send

前言上一章我们说了客户端的连接connect,对于同步客户端来说,连接已经建立成功;但是对于异步客户端来说,此时可能还在进行DNS的解析,onConnect回调函数还未执行。本节中






前言

上一章我们说了客户端的连接 connect,对于同步客户端来说,连接已经建立成功;但是对于异步客户端来说,此时可能还在进行 DNS 的解析,onConnect 回调函数还未执行。

本节中,我们将继续说明客户端发送数据的流程,同时我们可以看到 TCP 异步客户端执行 onConnect 回调函数的过程。


send 入口

本入口函数逻辑非常简单,从 PHP 函数中获取数据 data,然后调用 connect 函数。

static PHP_METHOD(swoole_client, send)
{
char *data;
zend_size_t data_len;
zend_long flags = 0;
#ifdef FAST_ZPP
ZEND_PARSE_PARAMETERS_START(1, 2)
Z_PARAM_STRING(data, data_len)
Z_PARAM_OPTIONAL
Z_PARAM_LONG(flags)
ZEND_PARSE_PARAMETERS_END();
#else
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "s|l", &data, &data_len, &flags) == FAILURE)
{
return;
}
#endif
swClient *cli = client_get_ptr(getThis() TSRMLS_CC);
//clear errno
SwooleG.error = 0;
int ret = cli->send(cli, data, data_len, flags);
if (ret <0)
{
swoole_php_sys_error(E_WARNING, "failed to send(%d) %zd bytes.", cli->socket->fd, data_len);
zend_update_property_long(swoole_client_class_entry_ptr, getThis(), SW_STRL("errCode")-1, SwooleG.error TSRMLS_CC);
RETVAL_FALSE;
}
else
{
RETURN_LONG(ret);
}
}

swClient_tcp_send_sync 同步 TCP 客户端

对于同步客户端来说,发送数据是通过 swConnection_send 函数来进行阻塞式调用 send,当返回的错误是 EAGAIN 的时候调用 swSocket_wait 等待 1s。

static int swClient_tcp_send_sync(swClient *cli, char *data, int length, int flags)
{
int written = 0;
int n;
assert(length > 0);
assert(data != NULL);
while (written {
n = swConnection_send(cli->socket, data, length - written, flags);
if (n <0)
{
if (errno == EINTR)
{
continue;
}
else if (errno == EAGAIN)
{
swSocket_wait(cli->socket->fd, 1000, SW_EVENT_WRITE);
continue;
}
else
{
SwooleG.error = errno;
return SW_ERR;
}
}
written += n;
data += n;
}
return written;
}

swClient_tcp_send_async 异步 TCP 客户端

由于异步客户端已经设置为非阻塞,并且加入了 reactor 的监控,因此发送数据只需要 reactor->write 函数即可。当此时的套接字不可写的时候,会自动放入 out_buff 缓冲区中。

out_buffer 大于高水线时,会自动调用 onBufferFull 回调函数。

static int swClient_tcp_send_async(swClient *cli, char *data, int length, int flags)
{
int n = length;
if (cli->reactor->write(cli->reactor, cli->socket->fd, data, length) <0)
{
if (SwooleG.error == SW_ERROR_OUTPUT_BUFFER_OVERFLOW)
{
n = -1;
cli->socket->high_watermark = 1;
}
else
{
return SW_ERR;
}
}
if (cli->onBufferFull && cli->socket->out_buffer && cli->socket->high_watermark == 0
&& cli->socket->out_buffer->length >= cli->buffer_high_watermark)
{
cli->socket->high_watermark = 1;
cli->onBufferFull(cli);
}
return n;
}

swClient_udp_send UDP 客户端

对于 UDP 客户端来说,如果 Socket 缓存区塞满,并不会像 TCP 进行等待 reactor 可写状态,而是直接返回了结果。对于异步客户端来说,仅仅是非阻塞调用 sendto 函数。

static int swClient_udp_send(swClient *cli, char *data, int len, int flags)
{
int n;
n = sendto(cli->socket->fd, data, len, 0, (struct sockaddr *) &cli->server_addr.addr, cli->server_addr.len);
if (n <0 || n {
return SW_ERR;
}
else
{
return n;
}
}

sendto UDP 客户端

类似于 send 函数,sendto 函数专门针对 UDP 客户端,与 send 函数不同的是,sendto 函数在底层套接字缓冲区塞满的时候,会调用 swSocket_wait 进行阻塞等待。

static PHP_METHOD(swoole_client, sendto)
{
char* ip;
zend_size_t ip_len;
long port;
char *data;
zend_size_t len;
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "sls", &ip, &ip_len, &port, &data, &len) == FAILURE)
{
return;
}
swClient *cli = (swClient *) swoole_get_object(getThis());
int ret;
if (cli->type == SW_SOCK_UDP)
{
ret = swSocket_udp_sendto(cli->socket->fd, ip, port, data, len);
}
else if (cli->type == SW_SOCK_UDP6)
{
ret = swSocket_udp_sendto6(cli->socket->fd, ip, port, data, len);
}
else
{
swoole_php_fatal_error(E_WARNING, "only supports SWOOLE_SOCK_UDP or SWOOLE_SOCK_UDP6.");
RETURN_FALSE;
}
SW_CHECK_RETURN(ret);
}
int swSocket_udp_sendto(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)
{
struct sockaddr_in addr;
if (inet_aton(dst_ip, &addr.sin_addr) == 0)
{
swWarn("ip[%s] is invalid.", dst_ip);
return SW_ERR;
}
addr.sin_family = AF_INET;
addr.sin_port = htons(dst_port);
return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));
}
int swSocket_udp_sendto6(int server_sock, char *dst_ip, int dst_port, char *data, uint32_t len)
{
struct sockaddr_in6 addr;
bzero(&addr, sizeof(addr));
if (inet_pton(AF_INET6, dst_ip, &addr.sin6_addr) <0)
{
swWarn("ip[%s] is invalid.", dst_ip);
return SW_ERR;
}
addr.sin6_port = (uint16_t) htons(dst_port);
addr.sin6_family = AF_INET6;
return swSocket_sendto_blocking(server_sock, data, len, 0, (struct sockaddr *) &addr, sizeof(addr));
}
int swSocket_sendto_blocking(int fd, void *__buf, size_t __n, int flag, struct sockaddr *__addr, socklen_t __addr_len)
{
int n = 0;
while (1)
{
n = sendto(fd, __buf, __n, flag, __addr, __addr_len);
if (n >= 0)
{
break;
}
else
{
if (errno == EINTR)
{
continue;
}
else if (swConnection_error(errno) == SW_WAIT)
{
swSocket_wait(fd, 1000, SW_EVENT_WRITE);
continue;
}
else
{
break;
}
}
}
return n;
}

swClient_onWrite 写就绪状态

reactor 监控到套接字进入了写就绪状态时,就会调用 swClient_onWrite 函数。

从上一章我们知道,异步客户端建立连接过程中 swoole 调用了 connect 函数,该返回 0,或者返回错误码 EINPROGRESS 都会对写就绪进行监控。无论哪种情况,swClient_onWrite 被调用就说明此时连接已经建立成功,三次握手已经完成,但是 cli->socket->active 还是 0。

如果 cli->socket->active 为 0,说明此时异步客户端虽然建立了连接,但是还没有调用 onConnect 回调函数,因此这时要调用 execute_onConnect 函数。如果使用了 SSL 隧道加密,还要进行 SSL 握手,并且设置 _socket->ssl_state = SW_SSL_STATE_WAIT_STREAM

active 为 1 的时候,就可以调用 swReactor_onWrite 来发送数据。

static int swClient_onWrite(swReactor *reactor, swEvent *event)
{
swClient *cli = event->socket->object;
swConnection *_socket = cli->socket;
if (cli->socket->active)
{
#ifdef SW_USE_OPENSSL
if (cli->open_ssl && _socket->ssl_state == SW_SSL_STATE_WAIT_STREAM)
{
if (swClient_ssl_handshake(cli) <0)
{
goto connect_fail;
}
else if (_socket->ssl_state == SW_SSL_STATE_READY)
{
goto connect_success;
}
else
{
if (_socket->ssl_want_read)
{
cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ);
}
return SW_OK;
}
}
#endif
if (swReactor_onWrite(cli->reactor, event) <0)
{
return SW_ERR;
}
if (cli->onBufferEmpty && _socket->high_watermark && _socket->out_buffer->length <= cli->buffer_low_watermark)
{
_socket->high_watermark = 0;
cli->onBufferEmpty(cli);
}
return SW_OK;
}
socklen_t len = sizeof(SwooleG.error);
if (getsockopt(event->fd, SOL_SOCKET, SO_ERROR, &SwooleG.error, &len) <0)
{
swWarn("getsockopt(%d) failed. Error: %s[%d]", event->fd, strerror(errno), errno);
return SW_ERR;
}
//success
if (SwooleG.error == 0)
{
//listen read event
cli->reactor->set(cli->reactor, event->fd, SW_FD_STREAM_CLIENT | SW_EVENT_READ);
//connected
_socket->active = 1;
#ifdef SW_USE_OPENSSL
if (cli->open_ssl)
{
if (swClient_enable_ssl_encrypt(cli) <0)
{
goto connect_fail;
}
if (swClient_ssl_handshake(cli) <0)
{
goto connect_fail;
}
else
{
_socket->ssl_state = SW_SSL_STATE_WAIT_STREAM;
}
return SW_OK;
}
connect_success:
#endif
if (cli->onConnect)
{
execute_onConnect(cli);
}
}
else
{
#ifdef SW_USE_OPENSSL
connect_fail:
#endif
_socket->active = 0;
cli->close(cli);
if (cli->onError)
{
cli->onError(cli);
}
}
return SW_OK;
}
static sw_inline void execute_onConnect(swClient *cli)
{
if (cli->timer)
{
swTimer_del(&SwooleG.timer, cli->timer);
cli->timer = NULL;
}
cli->onConnect(cli);
}

client_onConnect

static void client_onConnect(swClient *cli)
{
zval *zobject = (zval *) cli->object;
#ifdef SW_USE_OPENSSL
if (cli->ssl_wait_handshake)
{
client_execute_callback(zobject, SW_CLIENT_CB_onSSLReady);
}
else
#endif
if (!cli->redirect)
{
client_execute_callback(zobject, SW_CLIENT_CB_onConnect);
}
else
{
client_callback *cb = (client_callback *) swoole_get_property(zobject, 0);
if (!cb || !cb->onReceive)
{
swoole_php_fatal_error(E_ERROR, "has no 'onReceive' callback function.");
}
}
}



swoole


推荐阅读
  • React基础篇一 - JSX语法扩展与使用
    本文介绍了React基础篇一中的JSX语法扩展与使用。JSX是一种JavaScript的语法扩展,用于描述React中的用户界面。文章详细介绍了在JSX中使用表达式的方法,并给出了一个示例代码。最后,提到了JSX在编译后会被转化为普通的JavaScript对象。 ... [详细]
  • 本文介绍了一个React Native新手在尝试将数据发布到服务器时遇到的问题,以及他的React Native代码和服务器端代码。他使用fetch方法将数据发送到服务器,但无法在服务器端读取/获取发布的数据。 ... [详细]
  • 本文介绍了在使用Laravel和sqlsrv连接到SQL Server 2016时,如何在插入查询中使用输出子句,并返回所需的值。同时讨论了使用CreatedOn字段返回最近创建的行的解决方法以及使用Eloquent模型创建后,值正确插入数据库但没有返回uniqueidentifier字段的问题。最后给出了一个示例代码。 ... [详细]
  • 本文介绍了使用C++Builder实现获取USB优盘序列号的方法,包括相关的代码和说明。通过该方法,可以获取指定盘符的USB优盘序列号,并将其存放在缓冲中。该方法可以在Windows系统中有效地获取USB优盘序列号,并且适用于C++Builder开发环境。 ... [详细]
  • 该楼层疑似违规已被系统折叠隐藏此楼查看此楼*madebyebhrz*#include#include#include#include#include#include#include ... [详细]
  • Ihavebeenworkingwithbufferingafileonmylocaldrivetoparseandobtaincertaindata.Forte ... [详细]
  • 图片添加二维码水印教程
    本博客介绍一下用jdkawt实现图片加文字水印和图片水印的方法一、图片文字水印原来图片加上文字水印后图片二、图片加图片水印原来图片:水印图片:添加水印后的图片: ... [详细]
  • 浅解XXE与Portswigger Web Sec
    XXE与PortswiggerWebSec​相关链接:​博客园​安全脉搏​FreeBuf​XML的全称为XML外部实体注入,在学习的过程中发现有回显的XXE并不多,而 ... [详细]
  • 程序员_阿里Antd藏圣诞节彩蛋 程序员被离职
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了阿里Antd藏圣诞节彩蛋程序员被离职相关的知识,希望对你有一定的参考价值。 ... [详细]
  • eclipse学习(第三章:ssh中的Hibernate)——11.Hibernate的缓存(2级缓存,get和load)
    本文介绍了eclipse学习中的第三章内容,主要讲解了ssh中的Hibernate的缓存,包括2级缓存和get方法、load方法的区别。文章还涉及了项目实践和相关知识点的讲解。 ... [详细]
  • Java String与StringBuffer的区别及其应用场景
    本文主要介绍了Java中String和StringBuffer的区别,String是不可变的,而StringBuffer是可变的。StringBuffer在进行字符串处理时不生成新的对象,内存使用上要优于String类。因此,在需要频繁对字符串进行修改的情况下,使用StringBuffer更加适合。同时,文章还介绍了String和StringBuffer的应用场景。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 上图是InnoDB存储引擎的结构。1、缓冲池InnoDB存储引擎是基于磁盘存储的,并将其中的记录按照页的方式进行管理。因此可以看作是基于磁盘的数据库系统。在数据库系统中,由于CPU速度 ... [详细]
  • OpenMap教程4 – 图层概述
    本文介绍了OpenMap教程4中关于地图图层的内容,包括将ShapeLayer添加到MapBean中的方法,OpenMap支持的图层类型以及使用BufferedLayer创建图像的MapBean。此外,还介绍了Layer背景标志的作用和OMGraphicHandlerLayer的基础层类。 ... [详细]
author-avatar
xiubao
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有