• 欢迎访问web前端中文站,JavaScript,CSS3,HTML5,web前端demo
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏web前端中文站吧

使用高性能Pipelines构建.NET通讯程序

JavaScript web前端中文站 6个月前 (05-05) 295次浏览 已收录 0个评论

.NET Standard 支持一组新的 API,System.Span, System.Memory,还有 System.IO.Pipelines。这几个新的 API 极大了提升了.NET 程序的效能,将来.NET 很多基础 API 都会使用它们进行重写。

Pipelines 旨在解决.NET 编写 Socket 通信程序时的很多困难,相信读者也对此不胜其烦,使用 stream 模型进行编程,就算能够解决,也是实在麻烦。

System.IO.Pipelines 使用简单的内存片段来管理数据,可以极大的简化编写程序的过程。关于 Pipelines 的详细介绍,可以看看这里。现在 ASP.NET Core 中使用的 Kestrel 已经在使用这个 API。(话说这个东西貌似就是 Kestrel 团队搞出来的。)

可能是直接需要用 Socket 场景有限(物联网用的还挺多的),Pipelines 相关的资料感觉不是很多。官方给出的示例是基于 ASCII 协议的,有固定结尾的协议,这里我以物联网设备常用的 BINARY 二进制自定义协议为例,讲解基于 Pipelines 的程序套路。

System.IO.Pipelines

与基于 Stream 的方式不同,pipelines 提供一个 pipe,用于存储数据,pipe 中间存储的数据有点链表的感觉,可以基于SequencePosition进行 slice 操作,这样就能得到一个ReadOnlySequence<T>对象。reader 可以进行自定义操作,并在操作完成之后告诉 pipe 已经处理了多少数据,整个过程是不需要进行内存复制操作的,因此性能得到了提升,还少了很多麻烦。可以简单理解作为服务器端,流程:

接受数据循环:接到数据->放 pipe 里面->告诉 pipe 放了多少数据
处理数据循环:在 pipe 里面找一条完整数据->交给处理流程->告诉 pipe 处理了多少数据

协议

有一款设备,binary 协议,数据包开头 0x75, 0xbd, 0x7e, 0x97 一共 4 个字节,随后跟数据包长度 2 个字节(固定 2400 字节,不固定长度也可以参照),随后是数据区。在设备连接成功之后,数据主动从设备发送到 PC。

关键代码

虽然是.NET Core 平台的,但是.NET FRAMEWORK 4.6.1 上面也可以 nuget 安装,直接

install-package system.io.pipelines

进行安装就可以了。Socket 相关处理的代码不再写了,只列关键的。

代码第一步是声明 pipe。

private async void InitPipe(Socket socket) {     Pipe pipe = new Pipe();     Task writing = FillPipeAsync(socket, pipe.Writer);     Task reading = ReadPipeAsync(socket, pipe.Reader);      await Task.WhenAll(reading, writing); }

pipe 有 reader 还有一个 writer,reader 负责读取 pipe 数据,主要用在数据处理循环,writer 负责将数据写入 pipe,主要用在数据接受循环。

//写入循环 private async Task FillPipeAsync(Socket socket, PipeWriter writer) {     //数据流量比较大,用 1M 字节作为 buffer     const int minimumBufferSize = 1024 * 1024;      while (running)     {         try         {             //从 writer 中,获得一段不少于指定大小的内存空间             Memory<byte> memory = writer.GetMemory(minimumBufferSize);              //将内存空间变成 ArraySegment,提供给 socket 使用             if (!MemoryMarshal.TryGetArray((ReadOnlyMemory<byte>)memory, out ArraySegment<byte> arraySegment))             {                 throw new InvalidOperationException("Buffer backed by array was expected");             }             //接受数据             int bytesRead = await SocketTaskExtensions.ReceiveAsync(socket, arraySegment, SocketFlags.None);             if (bytesRead == 0)             {                 break;             }              //一次接受完毕,数据已经在 pipe 中,告诉 pipe 已经给它写了多少数据。             writer.Advance(bytesRead);         }         catch         {             break;         }          // 提示 reader 可以进行读取数据,reader 可以继续执行 readAsync()方法         FlushResult result = await writer.FlushAsync();          if (result.IsCompleted)         {             break;         }     }      // 告诉 pipe 完事了     writer.Complete(); }  //读取循环 private async Task ReadPipeAsync(Socket socket, PipeReader reader) {     while (running)     {         //等待 writer 写数据         ReadResult result = await reader.ReadAsync();         //获得内存区域         ReadOnlySequence<byte> buffer = result.Buffer;         SequencePosition? position = null;          do         {             //寻找 head 的第一个字节所在的位置             position = buffer.PositionOf((byte)0x75);             if (position != null)             {                 //由于是连续四个字节作为 head,需要进行比对,我这里直接使用了 ToArray 方法,还是有了内存拷贝动作,不是很理想,但是写起来很方便。                 //对性能有更高要求的场景,可以进行 slice 操作后的单独比对,这样不需要内存拷贝动作                 var headtoCheck = buffer.Slice(position.Value, 4).ToArray();                 //SequenceEqual 需要引用 System.Linq                 if (headtoCheck.SequenceEqual(new byte[] { 0x75, 0xbd, 0x7e, 0x97 }))                 {                     //到这里,认为找到包开头了(从 position.value 开始),接下来需要从开头处截取整包的长度,需要先判断长度是否足够                     if (buffer.Slice(position.Value).Length >= 2400)                     {                         //长度足够,那么取出 ReadOnlySequence,进行操作                         var mes = buffer.Slice(position.Value, 2400);                         //这里是数据处理的函数,可以参考官方文档对 ReadOnlySequence 进行操作,文档里面使用了 span,那样性能会好一些。我这里简单实用 ToArray()操作,这样也有了内存拷贝的问题,但是处理的直接是 byte 数组了。                         await ProcessMessage(mes.ToArray());                         //这一段就算是完成了,从开头位置,一整个包的长度就算完成了                         var next = buffer.GetPosition(2400, position.Value);                         //将 buffer 处理过的舍弃,替换为剩余的 buffer 引用                         buffer = buffer.Slice(next);                     }                     else                     {                         //长度不够,说明数据包不完整,等下一波数据进来再拼接,跳出循环。                         break;                     }                 }                 else                 {                     //第一个是 0x75 但是后面不匹配,可能有数据传输问题,那么需要舍弃第一个,0x75 后面的字节开始再重新找 0x75                     var next = buffer.GetPosition(1, position.Value);                     buffer = buffer.Slice(next);                 }             }         }         while (position != null);          //数据处理完毕,告诉 pipe 还剩下多少数据没有处理(数据包不完整的数据,找不到 head)         reader.AdvanceTo(buffer.Start, buffer.End);          if (result.IsCompleted)         {             break;         }     }      reader.Complete(); } 

以上代码基本解决了以下问题:

  • 数据接收不完整,找不到开头结尾,导致数据大量丢弃,或者自己维护一个 queue 的代码复杂性
  • 数据接收与处理的同步问题
  • 一次性收到多条数据的情况

后记

本文只是解释了 pipeline 处理的模式,对于茫茫多的 ToArray 方法,可以使用基于 Span 的操作进行优化(有时间就来填坑)。另外,如果在await ProcessMessage(mes.ToArray());这里,直接使用Task.Run(()=>ProcessMessage(mes);代替的话,实测会出现莫名其妙的问题,很有可能是 pipe 运行快,在系统调度 Task 之前,已经将内存释放导致的,如果需要优化这一块的话,需要格外注意。

【注:本文源自网络文章资源,由站长整理发布】 更多精彩内容请看 web 前端中文站
http://www.lisa33xiaoq.net 可按 Ctrl + D 进行收藏


web 前端中文站 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:使用高性能 Pipelines 构建.NET 通讯程序
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址