我正在使用同步apis和线程池的tcp服务器上看起来像这样:
TcpListener listener; void Serve(){ while(true){ var client = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem(this.HandleConnection, client); //Or alternatively new Thread(HandleConnection).Start(client) } }
假设我的目标是在资源使用率最低的情况下处理尽可能多的并发连接,这似乎很快就会受到可用线程数量的限制.我怀疑通过使用非阻塞任务apis,我将能够用更少的资源处理更多.
我最初的印象是这样的:
async Task Serve(){ while(true){ var client = await listener.AcceptTcpClientAsync(); HandleConnectionAsync(client); //fire and forget? } }
但令我印象深刻的是,这可能会导致瓶颈.也许HandleConnectionAsync需要花费非常长的时间才能达到第一次等待,并且将阻止主接受循环继续进行.这只会使用一个线程,还是运行时会在多个线程上神奇地运行它看起来合适的东西?
有没有办法结合这两种方法,以便我的服务器将使用它所需的线程数量来确定正在运行的任务的数量,但是它不会在IO操作上不必要地阻塞线程?
在这样的情况下,是否存在最大化吞吐量的惯用方法?
现有的答案已正确提出使用Task.Run(() => HandleConnection(client));
,但没有解释原因.
原因如下:您担心,HandleConnectionAsync
可能需要一些时间才能达到第一个等待.如果您坚持使用异步IO(在本例中应该如此),这意味着HandleConnectionAsync
在没有任何阻塞的情况下进行CPU绑定工作.这是线程池的完美案例.它可以运行简短,无阻塞的CPU工作.
而你是对的,接受循环将HandleConnectionAsync
在返回之前花费很长时间来限制(可能因为其中有大量的CPU绑定工作).如果您需要高频率的新连接,则应避免这种情况.
如果您确定没有重要的工作限制循环,您可以保存额外的线程池Task
而不是这样做.
或者,您可以同时运行多个接受.替换await Serve();
为(例如):
var serverTasks = Enumerable.Range(0, Environment.ProcessorCount) .Select(_ => Serve()); await Task.WhenAll(serverTasks);
这消除了可伸缩性问题.注意,这await
将吞下除一个错误之外的所有错误.
我让Framework管理线程并且不会创建任何额外的线程,除非我可能需要进行性能分析测试.特别是,如果内部调用HandleConnectionAsync
主要是IO绑定的.
无论如何,如果你想在开始时释放调用线程(调度程序)HandleConnectionAsync
,那么这是一个非常简单的解决方案.您可以从一个新的线程跳ThreadPool
带await Yield()
.如果您的服务器在初始线程(控制台应用程序,WCF服务)上没有安装任何同步上下文的执行环境中运行,那么这是有效的,这通常是TCP服务器的情况.
[已编辑]以下说明了这一点(代码最初来自此处).注意,主while
循环不会显式创建任何线程:
using System; using System.Collections.Generic; using System.Net.Sockets; using System.Text; using System.Threading.Tasks; class Program { object _lock = new Object(); // sync lock List<Task> _connections = new List<Task>(); // pending connections // The core server task private async Task StartListener() { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); // if already faulted, re-throw any error on the calling context if (task.IsFaulted) await task; } } // Register and handle the connection private async Task StartHandleConnectionAsync(TcpClient tcpClient) { // start the new connection task var connectionTask = HandleConnectionAsync(tcpClient); // add it to the list of pending task lock (_lock) _connections.Add(connectionTask); // catch all errors of HandleConnectionAsync try { await connectionTask; // we may be on another thread after "await" } catch (Exception ex) { // log the error Console.WriteLine(ex.ToString()); } finally { // remove pending task lock (_lock) _connections.Remove(connectionTask); } } // Handle new connection private async Task HandleConnectionAsync(TcpClient tcpClient) { await Task.Yield(); // continue asynchronously on another threads using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } } // The entry point of the console app static async Task Main(string[] args) { Console.WriteLine("Hit Ctrl-C to exit."); await new Program().StartListener(); } }
或者,代码可能如下所示,没有await Task.Yield()
.注意,我传递一个async
lambdaTask.Run
,因为我仍然希望从里面的异步API中受益HandleConnectionAsync
并await
在那里使用:
// Handle new connection private static Task HandleConnectionAsync(TcpClient tcpClient) { return Task.Run(async () => { using (var networkStream = tcpClient.GetStream()) { var buffer = new byte[4096]; Console.WriteLine("[Server] Reading from client"); var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length); var request = Encoding.UTF8.GetString(buffer, 0, byteCount); Console.WriteLine("[Server] Client wrote {0}", request); var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server"); await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length); Console.WriteLine("[Server] Response has been written"); } }); }
[更新]基于注释:如果这将是库代码,则执行环境确实是未知的,并且可能具有非默认同步上下文.在这种情况下,我宁愿在池线程上运行主服务器循环(没有任何同步上下文):
private static Task StartListener() { return Task.Run(async () => { var tcpListener = TcpListener.Create(8000); tcpListener.Start(); while (true) { var tcpClient = await tcpListener.AcceptTcpClientAsync(); Console.WriteLine("[Server] Client has connected"); var task = StartHandleConnectionAsync(tcpClient); if (task.IsFaulted) await task; } }); }
这样,内部创建的所有子任务StartListener
都不会受到客户端代码的同步上下文的影响.所以,我不必Task.ConfigureAwait(false)
明确地在任何地方打电话.