• 欢迎访问web前端中文站,JavaScript,CSS3,HTML5,web前端demo
  • 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏web前端中文站吧

哎呀,我老大写Bug啦——记一次MessageQueue的优化

JavaScript web前端中文站 12个月前 (11-10) 396次浏览 已收录 0个评论

MessageQueue,顾名思义消息队列,在系统开发中也是用的比较多的一个中间件吧。我们这里主要用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开发了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了,(接着接着……就辞职了)。哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

更多精彩内容请看 web 前端中文站
http://www.lisa33xiaoq.net 可按 Ctrl + D 进行收藏

之后我们的开发仍然有条不紊的开发着,直到今年的一月份吧,才上线开始运行,然后就出现了常规状态,上线之后就开始爆炸,

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

这个页面打不开呀,那个内容没东西呀,第三方登录问题呀,支付问题呀,临时再改需求呀……(该来的都来了),加班、debug、测试、再 debug……,然后经过几天的修复,终于完成了跟自己电脑一样稳定的运行,组员们都美滋滋的,今晚加个鸡腿才行。

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

都说祸不单行,古人是不会骗我们的,Bug 怎么会修得完呢?天真,要是 Bug 能修得完还要我们来干啥,好景不长,果然,过了一周之后,组员突然群里叫喳喳,

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

what is it ??

?哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

 

来了,今天的主角登场了,我也要开始加班了。

RabbitMQ

这个是今天要说的东西,基础概念什么的不是今天要说的重点,重点是:

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

 

RabbitMQ内存使得整个服务器濒临瘫痪,远程登录服务器都差点挤不进去的状态,别看截图目前才 1.3G,吃个午饭回来,就 2.3G 了,可怕不可怕?咋回事?

老板喊你回来加班啦

先不管了,线上优先解决,手动先 Reset 回收资源以释放空间,这个只是临时的办法,然后检查一下 rabbitMQ 的配置有没有问题,路径在

?C:/Users/Administrator/AppData/Roaming/RabbitMQ

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

完全是默认的配置,完全 ojbk 啊,那到底咋回事?继续检查,想想不如从项目开始吧,然后查看项目中的代码,都是从来自【MessageLib】的组件调用

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

好了,叫我老老大要这个组件的代码,他把 git 的地址就发给我,我把项目 down 下来,

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

这个封装的组件内容不多,主要的文件一目了然,其实就是用到这个两个组件来进行的二次封装来调用

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

主要的代码是在【MessageQueue.cs】文件里,展示一下当时的代码情况:

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using MessageLib.ClassBean; using EasyNetQ; using System.Threading;  namespace MessageLib {     public static class MessageQueue     {         public static IBus bus = MQBusBuilder.CreateMessageBus();         //消息队列         private static Queue<Item> NoticQueue = new Queue<Item>(5000);         //日志队列         private static Queue<Item> LogQueue = new Queue<Item>(5000);         //队列数目发布数量         private static int max_count_to_pulish = 1000;          /// <summary>         /// 可供外部使用的消息入列操作         /// </summary>         public static void push(Item item)         {             if (item.type == ItemType.notic)             {                 NoticQueue.Enqueue(item);             }              if (item.type == ItemType.log)             {                 LogQueue.Enqueue(item);             }         }          /// <summary>         /// 监听后需要调用的发布接口         /// </summary>         private static void Pulish(object source, System.Timers.ElapsedEventArgs e)         {             if (NoticQueue.Count > 0 || LogQueue.Count > 0)             {                 if (bus == null || !bus.IsConnected)                 {                     bus = MQBusBuilder.CreateMessageBus();                 }                  if (bus.IsConnected)                 {                     Send(ItemType.notic);                     Send(ItemType.log);                 }             }         }          /// <summary>         /// 程序自运行并开始监听         /// </summary>         public static void Run()         {             System.Timers.Timer timer = new System.Timers.Timer();             timer.Interval = 1000;             timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;                 timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);                 timer.Enabled = true;//是否执行 System.Timers.Timer.Elapsed 事件;             }          /// <summary>         /// 启动线程异步调用         /// </summary>         /// <param name="channelType"></param>         private static void Send(string channelType)         {             Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));             thread.IsBackground = true;             thread.Start(channelType);         }          /// <summary>         /// 调用发布日志及提醒两个接口         /// </summary>         /// <param name="channel"></param>         private static void PublishAction(object channel)         {             PublisLog();             PublisNotic();         }          /// <summary>         /// 日志消息发送至 RabbitMQ 指定 exchange、Queue         /// </summary>         private static void PublisLog()         {             string channelName = ItemType.log;             try             {                 var routingKey = channelName;                 var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                 var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct");                 var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);                 while (LogQueue.Count > 0)                 {                     Item item = LogQueue.Dequeue();                     if (item != null)                     {                         var properties = new MessageProperties();                         var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));                         Message.Properties.AppId = item.appid;                         bus.Advanced.Publish(exchange, routingKey, false, Message);                     }                  }             }             catch (Exception ex)             {                 throw ex;             }         }          /// <summary>         /// 提醒消息发送至 RabbitMQ 指定 exchange、Queue         /// </summary>         private static void PublisNotic()         {             string channelName = ItemType.notic;             var routingKey = channelName;             var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));             var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");             var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);             while(NoticQueue.Count > 0)             {                 Item item = NoticQueue.Dequeue();                 if (item != null)                 {                     var properties = new MessageProperties();                     var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));                     Message.Properties.AppId = item.appid;                     bus.Advanced.Publish(exchange, routingKey, false, Message);                 }             }         }     } }

View Code

然后我就发现了这一段代码!

        /// <summary>         /// 程序自运行并开始监听         /// </summary>         public static void Run()         {             System.Timers.Timer timer = new System.Timers.Timer();             timer.Interval = 1000;             timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;                 timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);                 timer.Enabled = true;//是否执行 System.Timers.Timer.Elapsed 事件;             }
        /// <summary>         /// 启动线程异步调用         /// </summary>         /// <param name="channelType"></param>         private static void Send(string channelType)         {             Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));             thread.IsBackground = true;             thread.Start(channelType);         }

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

老老大写 Bug 了,当 Run()起来之后,队列中【NoticQueue】有内容,就开始推送消息,发送消息 Send(),每来一次推送 new 一个线程并设置为后台线程,然后发送消息。好了,明白了,这里的线程很混乱,因为线程操作不当,new 了 N 多个频道,并且没有主动回收,这也难怪内存暴涨呢。并且要是 Run()调用多次,后果更加不堪设想。

加班改起来

开始动手吧,业务主要推送有普通消息、错误消息和通知消息,那么将队列与线程组装一起,新增一个类 QueueTask.cs:

    public class QueueTask     {         private Queue<Item> NoticQueue = new Queue<Item>(5000);         //队列数目发布数量         private int max_count_to_pulish = 1000;         public  bool isRunning = false;         private string itemType = ItemType.info;         private string MessageRouter = ItemType.info;          public QueueTask(string itemType,string MessageRouter)         {             this.itemType = itemType;             this.MessageRouter = MessageRouter;         }          /// <summary>         /// 可供外部使用的消息入列操作         /// </summary>         public void Push(Item item, IBus IBus)         {             NoticQueue.Enqueue(item);             if (!isRunning)                 Run(IBus);         }          public void Run(IBus IBus)         {             if (!isRunning)             {                 Timer timer = new Timer(PulishMsg, IBus, 1000, 1000);
          timer.AutoReset = timer.Enabled = true;                 isRunning = true;             }         }          private void PulishMsg(object state)         {             IBus IBus = state as IBus;             if (NoticQueue.Count > 0)             {                 PublisMsg(itemType, IBus);             }         }          private void PublisMsg(object channel, IBus BusInstance)         {             try             {                 string channelName = channel as string;                 if (NoticQueue.Count > 0)                 {                     var routingKey = MessageRouter;                     var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                     var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");                     var binding = BusInstance.Advanced.Bind(exchange, mqqueue, routingKey);                      while (NoticQueue.Count > 0)                     {                         Item item = NoticQueue.Dequeue();                         if (item != null)                         {                             var properties = new MessageProperties();                             var Message = new EasyNetQ.Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));                             Message.Properties.AppId = item.appid;                             BusInstance.Advanced.Publish(exchange, routingKey, false, Message);                         }                     }                 }             }             catch (Exception ex)             {                 Console.WriteLine("PublisMsg error:" + ex.Message);             }         }          public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item         {             try             {                 string channelName = itemType;                 var routingKey = MessageRouter;                 var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                 var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");                 var binding = BusInstance.Advanced.Bind(exchange, mqqueue, routingKey);                  var Consume = BusInstance.Advanced.Consume(mqqueue, registration =>                 {                     registration.Add<string>((message, info) =>                     {                         Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body);                         dealAction(data);                     });                 });             }             catch (Exception ex)             {                 Console.WriteLine("Read error:" + ex.Message);             }         }     }

然后,在 MessageQueue.cs 修改为单例模式:

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

    public static class MessageQueue     {         /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/          private static IBus bus = null;         public static bool isRunning = false;          //消息队列         private static QueueTask NoticQueue = null;         //日志队列         private static QueueTask LogQueue = null;         //自定义         private static QueueTask InfoQueue = null;          #region 同步锁         private static readonly object obj = new object();         #endregion          public static void Init(string Connection, string routeKey)         {             if (NoticQueue == null)                 NoticQueue = new QueueTask(ItemType.notic, ItemType.notic);             if (LogQueue == null)                 LogQueue = new QueueTask(ItemType.error, ItemType.error);             if (InfoQueue == null)                 InfoQueue = new QueueTask(ItemType.info, routeKey);             if (string.IsNullOrEmpty(MQBusBuilder.Connnection))                 MQBusBuilder.Connnection = Connection;         }          public static IBus BusInstance         {             get             {                 if (bus == null)                 {                     lock (obj)                     {                         if (bus == null|| !bus.IsConnected)                         {                             bus = MQBusBuilder.CreateMessageBus();                         }                     }                 }                 return bus;             }         }           /// <summary>         /// 可供外部使用的消息入列操作         /// </summary>         public static void PushAndRun(Item item)         {             if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null)                 return;             if (item.type == ItemType.notic)             {                 NoticQueue.Push(item, BusInstance);             }             if (item.type == ItemType.error)             {                 LogQueue.Push(item, BusInstance);             }             if (item.type == ItemType.info)             {                 InfoQueue.Push(item, BusInstance);             }         }          public static void Read(string itemType, Action<Item> dealAction)         {             if (itemType == ItemType.notic)             {                 NoticQueue.Read<NoticItem>(BusInstance, dealAction);             }             if (itemType == ItemType.error)             {                 LogQueue.Read<ErrorItem>(BusInstance, dealAction);             }             if (itemType == ItemType.info)             {                 InfoQueue.Read<Message>(BusInstance, dealAction);             }         }     }

View Code

每次推送消息的时候,每个 QueueTask 就自己维护自己的线程和队列了,当调用推送之后,就开始运作起来。恩,应该没问题了。然后就发布 nuget,再更新项目,然后发布。观察一段时间,恩,完美。

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

 

事件二

事情过后,B 端开始搞起来了,然后涉及到订单系统,跟老大(不是老老大,老老大那时候已经跑了)商量之后确定使用消息队列来做订单的事件的拓展,然后就直接美滋滋的调用好之前写的了,没想到啊,这次是线程涨!因为订单是从 B 端推送过来的,B 端肯定没事,订单后台订阅消息之后,读取过程中出现的线程增多,然后看看之前写的 Read()方法,感觉没啥问题啊,每运行完一次,就多了一个线程,这个神奇了啊,那么源代码撸起来。

https://github.com/EasyNetQ/EasyNetQ

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

翻来覆去,看到这个 Consume 方法,继承的是 IDisposable 接口,得勒,知道咋回事了。

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

Consume.Dispose(); 用完请记得主动释放啊。

这回真的可以浪了。

哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化

总结

遇到问题,冷静下来,耐得了寂寞才行。线上的问题优先解决,然后再慢慢 Debug,解决不了,看源码,再解决不了,降级处理,欢迎共同探讨。同时也感谢一下技术群里的兄弟给的一些建议,并帮忙查找资料,还好 EasyNetQ 是开源了,不然也打算说先不用了,毕竟一开始没什么用户量,所以没必要整那么麻烦,加班加点的弄这个问题。不过最终都完美的解决了,心里还是挺美滋滋的,程序猿随之而来的成就感。

别看我们在工位上默不作声,我们可能在拯救世界呢!老板,该加工资啦!

 

原文链接:https://www.cnblogs.com/EminemJK/p/9921800.html

【注:本文源自网络文章资源,由站长整理发布】


web 前端中文站 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:哎呀,我老大写 Bug 啦——记一次 MessageQueue 的优化
喜欢 (0)
发表我的评论
取消评论
表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址