macos - 请教大神们:用php客户端怎么连kafka,怎么做到监控broker变化而来刷新客户端数据的?

 兔子东大门一手批发 发布于 2022-12-01 06:30

最近在研究用php连kafka.
用的是github上的nmred/kafka-php项目代码
目前:
1.已经可以连接服务器上的kafka,
2.测试:命令行执行php Produce.php,consumer端也能获取得到数据
问题:
1.consumer端怎么一直执行,难道写 while死循环?
2.kafka-php是怎么做到客户端隔段时间拉取新的信息,并刷新客户端数据的?
3.在README.md有这么一句话,又是什么意思:

 Watches broker state, if broker changes, the client will refresh     broker and topic metadata stored in the client?

请使用过该工具的赐教思路,不甚感激!

补充:现在我的php客户端 consumer能收到数据了,但是一直都包含乱码,比如我在producer段输入:nihao,则php consumer.php后,会输出:
�m�5����3�SNAPPY`���;����nihao

现在的疑问是:
1.为什么producer端输入的是:nihao,但在consumer端输出的信息为什么不是 nihao?
2.为什么会有一串乱码?
3.怎么解决?
4.下面附上socket读取的核心代码:

// 获取consumer端的信息,其中$this->stream是一个socket连接成功的socket对象:
$msg = $this->stream->read($messageSize, true);

read方法核心代码如下:
$readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);

    if ($readable > 0) {
        $remainingBytes = $len;
        $data = $chunk = '';
        while ($remainingBytes > 0) {
            $chunk = fread($this->stream, $remainingBytes);
            if ($chunk === false) {
                $this->close();
                throw new \Kafka\Exception\SocketEOF('Could not read '.$len.' bytes from stream (no data)');
            }
            if (strlen($chunk) === 0) {
                // Zero bytes because of EOF?
                if (feof($this->stream)) {
                    $this->close();
                    throw new \Kafka\Exception\SocketEOF('Unexpected EOF while reading '.$len.' bytes from stream (no data)');
                }
                // Otherwise wait for bytes
                $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec, $this->recvTimeoutUsec);
                if ($readable !== 1) {
                    throw new \Kafka\Exception\SocketTimeout('Timed out reading socket while reading ' . $len . ' bytes with ' . $remainingBytes . ' bytes to go');
                }
                continue; // attempt another read
            }
            $data .= $chunk;
            $remainingBytes -= strlen($chunk);
        }

通过这个while循环来拼接信息。重点在于 fread出来的 $chunk就包含乱码

呼高手!

2 个回答
  • consumer应该只能通过循环来判断broker是否有变化,然后更新说。

    2022-12-01 07:00 回答
  • 请问楼主现在解决这个问题了吗,是怎么解决的,我现在也出来从kafka获取的数据是乱码,好像是snappy压缩过的,在网上找了php解压snappy不知道什么情况一直出错

    2022-12-01 07:00 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有