使用TcpClient和Reactive Extensions从Stream读取连续字节流

 天晴的故事_665 发布于 2023-01-29 06:29

请考虑以下代码:

internal class Program
{
    private static void Main(string[] args)
    {
        var client = new TcpClient();
        client.ConnectAsync("localhost", 7105).Wait();
        var stream = client.GetStream();
        var observable = stream.ReadDataObservable().Repeat();

        var s = from d in observable.Buffer(4)
                let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
                let b = observable.Take(headerLength)
                select b.ToEnumerable().ToArray();
        s.Subscribe(a => Console.WriteLine("{0}", a));
        Console.ReadLine();
    }
}

public static class Extensions
{
    public static IObservable ReadDataObservable(this Stream stream)
    {
        return Observable.Defer(async () =>
        {
            var buffer = new byte[1024];
            var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length);
            return buffer.Take(readBytes).ToObservable();
        });
    }
}

基本上我想解析我通过Reactive Extensions收到的消息.使用Buffer(4)正确解析消息的头部,并获得消息剩余部分的长度.出现的问题是,当我执行stream.Take(headerLength)时,代码重新评估整个"链"并尝试从流中获取新消息,而不是返回已从流中读取的其余字节.更确切地说,第一个ReadAsync(...)返回38个字节,Buffer(4)返回前4个,observable.Take(headerLength)不返回剩余的34个字节,而是尝试读取新的使用ReadAsync的消息.

问题是,如何确保observable.Take(headerLength)接收已读取的34个字节而不尝试从流中读取新消息?我一直在寻找解决方案,但我无法弄清楚如何实现这一目标.

编辑:这个解决方案(使用Reactive Extensions(Rx)实现套接字编程?)不是我想要的.这不是读取流中的所有可用内容(最多为buffersize),而是从中获取连续的字节流.对我来说,这个解决方案似乎不是从流中读取的一种非常有效的方式,因此我的问题.

1 个回答
  • 这种方法不起作用.问题是你使用observable的方式.Buffer将不会读取4个字节并退出,它将连续读取4个字节的块.该Take表格的第二签约,将读取的字节重叠.您会发现将流直接解析为消息要容易得多.

    以下代码也为清理工作做了大量工作.

    假设你Message就是这个,(ToString为测试添加):

    public class Message
    {
        public byte[] PayLoad;
    
        public override string ToString()
        {
            return Encoding.UTF8.GetString(PayLoad);
        }
    }
    

    你已经获得了一个,Stream你可以解析如下.首先,从流中读取确切字节数的方法:

    public async static Task ReadExactBytesAsync(
        Stream stream, byte[] buffer, CancellationToken ct)
    {
        var count = buffer.Length;
        var totalBytesRemaining = count;
        var totalBytesRead = 0;
        while (totalBytesRemaining != 0)
        {
            var bytesRead = await stream.ReadAsync(
                buffer, totalBytesRead, totalBytesRemaining, ct);
            ct.ThrowIfCancellationRequested();
            totalBytesRead += bytesRead;
            totalBytesRemaining -= bytesRead;
        }
    }
    

    然后将流转换为IObservable<Message>:

    public static IObservable<Message> ReadMessages(
        Stream sourceStream,
        IScheduler scheduler = null)
    {
        int subscribed = 0;
        scheduler = scheduler ?? Scheduler.Default;
    
        return Observable.Create<Message>(o =>
        {
            // first check there is only one subscriber
            // (multiple stream readers would cause havoc)
            int previous = Interlocked.CompareExchange(ref subscribed, 1, 0);
    
            if (previous != 0)
                o.OnError(new Exception(
                    "Only one subscriber is allowed for each stream."));
    
            // we will return a disposable that cleans
            // up both the scheduled task below and
            // the source stream
            var dispose = new CompositeDisposable
            {
                Disposable.Create(sourceStream.Dispose)
            };
    
            // use async scheduling to get nice imperative code
            var schedule = scheduler.ScheduleAsync(async (ctrl, ct) =>
            {
                // store the header here each time
                var header = new byte[4];
    
                // loop until cancellation requested
                while (!ct.IsCancellationRequested)
                {                        
                    try
                    {
                        // read the exact number of bytes for a header
                        await ReadExactBytesAsync(sourceStream, header, ct);
                    }
                    catch (OperationCanceledException)
                    {
                        throw;
                    }
                    catch (Exception ex)
                    {
                        // pass through any problem in the stream and quit
                        o.OnError(new InvalidDataException("Error in stream.", ex));
                        return;
                    }                   
                    ct.ThrowIfCancellationRequested();
    
                    var bodyLength = IPAddress.NetworkToHostOrder(
                        BitConverter.ToInt16(header, 2));
                    // create buffer to read the message
                    var payload = new byte[bodyLength];
    
                    // read exact bytes as before
                    try
                    {
                        await ReadExactBytesAsync(sourceStream, payload, ct);
                    }
                    catch (OperationCanceledException)
                    {
                        throw;
                    }
                    catch (Exception ex)
                    {
                        o.OnError(new InvalidDataException("Error in stream.", ex));
                        return;
                    }
    
                    // create a new message and send it to client
                    var message = new Message { PayLoad = payload };
                    o.OnNext(message);
                }
                // wrap things up
                ct.ThrowIfCancellationRequested();
                o.OnCompleted();
            });
    
            // return the suscription handle
            dispose.Add(schedule);
            return dispose;
        });
    }
    

    编辑 - 我使用的非常hacky测试代码:

    private static void Main(string[] args)
    {
        var listener = new TcpListener(IPAddress.Any, 12873);
        listener.Start();
    
        var listenTask = listener.AcceptTcpClientAsync();
        listenTask.ContinueWith((Task<TcpClient> t) =>
        {
            var client = t.Result;
            var stream = client.GetStream();
            const string messageText = "Hello World!";                
            var body = Encoding.UTF8.GetBytes(messageText);                
            var header = BitConverter.GetBytes(
                IPAddress.HostToNetworkOrder(body.Length));
            for (int i = 0; i < 5; i++)
            {
                stream.Write(header, 0, 4);
                stream.Write(body, 0, 4);
                stream.Flush();
                // deliberate nasty delay
                Thread.Sleep(2000);
                stream.Write(body, 4, body.Length - 4);
                stream.Flush();
            }
            stream.Close();
            listener.Stop();
        });
    
    
        var tcpClient = new TcpClient();
        tcpClient.Connect(new IPEndPoint(IPAddress.Loopback, 12873));
        var clientStream = tcpClient.GetStream();
    
        ReadMessages(clientStream).Subscribe(
            Console.WriteLine,
            ex => Console.WriteLine("Error: " + ex.Message),
            () => Console.WriteLine("Done!"));
    
        Console.ReadLine();
    }
    

    包起来

    如果服务器死了,您需要考虑设置读取超时,并且服务器应该发送某种"结束消息".目前,此方法将不断尝试接收字节.正如你没有指出的那样,我没有包括这样的东西 - 但是如果你这样做的话,那么当我写完它break时,就会导致OnCompleted发送.

    2023-01-29 07:03 回答
撰写答案
今天,你开发时遇到什么问题呢?
立即提问
热门标签
PHP1.CN | 中国最专业的PHP中文社区 | PNG素材下载 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有