0%

RocketMQ 发送消息偶尔会无法消费

RocketMQ 发送消息偶尔会无法消费

问题背景

公司全款支付业务上线在即,测试组同学反馈,测试环境偶现某些操作无效的问题。经排查是系统中使用的MQ出现了问题。
消息队列是在老的生产环境就已经接入的,之前消息的传递是没有问题的。
以上是主要背景

排查过程

  1. 经过初步排查是消息队列中的消息没有被消费导致的。也就是生产者已经将消息发送到队列中,但是消费者没有消费到。

  2. 从消息队列控制台查看这个topic和消费组的一些消息,有几个消费组的消息是积压状态(最可以从topic - consumer管理 - 弹出界面中consumer offset 后的差值,如果不为0 说明消息积压)。这里有个点,消费挤压的消费者的消费者终端都是空的,此处先记下。

  3. 接下来因为认识不足的问题,问题解决方向侧重点放在了检查消费端的堵塞上,此处问题没有得到很好的解决,但是发现不少比较不错的文档,记录在文末。

  4. 在控制台具体查看消息的一些生命情况,通过Topic 和 时间过滤 找到最近产生的几条消息数据,打开 message detail,最下面会有关于消息的 trackType 和一些操作。观察到消费失败的消息的trackType的属性是 NOT_ONLINE, 即消费者没有运行。
    这里展开说下 trackType 的类型和代表的含义。

    NOT_ONLINE 代表该Consumer没有运行
    CONSUMED 代表该消息已经被消费
    NOT_CONSUME_YET 还没被消费
    UNKNOW_EXCEPTION 报错
    CONSUMED_BUT_FILTERED 消费了,但是被过滤了,一般是被tag过滤了

  5. 在尝试联系运维同学重启了MQ之后,从消息队列的整个的配置上闲逛的时候,发现消费者的客户端的版本号存在多个,联系最近业务系统的测试和上线动作,新的测试环境部署了 流程引擎服务 的消费组名称和当前的同名,当时鉴于topic不一样,修改后未及时进行验证。

  6. 马上修改对应的服务的消费组名称, 问题解决。 不同的Topic的消费组名称也会冲突

结论

消息队列使用时命名规范问题一定要注意,不同的项目的Topic、消费组名称和tag关系需要梳理。

附录:

另附网上关于mq消息积压的案例有很多,但场景都不是很一样,从中找到排查步骤和工具不错的方案如下:

  1. RocketMQ消息积压问题
  2. 消息积压判断及解决

附录链接文字备份

Part1:

  1. 可通过 jps -m 或者 ps -ef | grep java 命令获取当前正在运行的 Java 程序,通过启动主类即可获得应用的进程 id,然后可以通过 jstack pid > j.log 命令获取线程的堆栈,在这里我建议大家连续运行 5 次该命令,分别获取 5 个线程堆栈文件,主要用于对比线程的状态是否在向前推进。
  2. 通过 jstack 获取堆栈信息后,重点搜索 线程状态。

Part2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
一. 定位问题
1. Console入口
主题-->Topic-->Consumer管理-->订阅组

2. 延迟数量(Delay)
消息积压数量,即当前Topic还剩下多少消息未处理,该值越大,表示积压的消息越多

3. 最后消费时间(LastConsumeTime)
当前Topic消息最后被消费的时间,该值表示消费端有多长时间未拉取消息进行消费

二. 分析问题
1. 查看rocketmq_client.log日志
grep "do flow control" rocketmq_client.log

如果出现 so do flow control 这样的日志,说明触发了消费限流,原因是:消费端积压了消息,即消费端无法消费已拉取的消息,消费端在没有将消息处理完成前,不会再向服务端拉取消息,并打印日志。

2. 消费端业务逻辑
1. 执行了长耗时的逻辑,导致消息处理很慢
2. 第三方接口调用很慢或超时

三. 解决问题
1. 消费端解决
1. 增加消费端的消费线程数或增加消费者数量,提升消费能力
2. 优化代码逻辑,降低执行时间
3. 调用第三方接口时,设置较短的超时时间,避免长时间等待,快速返回错误信息并告警

Part3:

出现场景
生产者系统会负责不停的把消息写入RocketMQ里去,然后消费者系统就是负责从RocketMQ里消费消息。

系统在生产环境是有高峰和低谷的,在晚上几个小时的高峰期内,大概就会有100多万条消息进入RocketMQ。然后消费者系统从RocketMQ里获取到消息之后,会依赖一些Redis去进行一些业务逻辑的实现。

有一天晚上就出现了一个问题,消费者系统依赖的Redis就挂掉了,导致消费者系统自己也没法运作了,此时就没法继续从RocketMQ里消费数据和处理了,消费者系统几乎就处于停滞不动的状态。然后生产者系统在晚上几个小时的高峰期内,就往MQ里写入了100多万的消息,此时都积压在MQ里了,根本没有系统消费和处理。

解决方案
一般来说有以下几种方案:

全部丢弃:如果这些消息允许丢失,那么此时可以紧急修改消费者系统的代码,在代码里对所有的消息都获取到就直接丢弃,不做任何的处理,这样可以迅速的让积压在MQ里的百万消息被处理掉,只不过处理方式就是全部丢弃而已。
等待Redis恢复:往往对很多系统而言,不能简单粗暴的丢弃这些消息,所以最常见的办法,还是先等待消费者系统底层依赖的Redis先恢复,恢复之后,就可以根据线上Topic的MessageQueue的数量来看看如何后续处理。
临时扩容消费者系统,增加机器来加快消费速度,但要考虑依赖的Redis也要能抗住压力;
临时扩容消费者系统
假如Topic有20个MessageQueue,然后只有4个消费者系统在消费,那么每个消费者系统会从5个MessageQueue里获取消息,所以此时如果你仅仅依靠4个消费者系统是肯定不够的,毕竟MQ里积压了百万消息了。

此时可以临时申请16台机器多部署16个消费者系统的实例,然后20个消费者系统同时消费,每个消费者消费一个MessageQueue的消息,此时会发现消费的速度提高了5倍,很快积压的百万消息都会被处理完毕。
但是这里同时要考虑到Redis必须要能抗住临时增加了5倍的读写压力,因为原来就4个消费者系统在读写Redis,现在临时变成了20个消费者系统了。

当你处理完百万积压的消息之后,就可以下线多余的16台机器了。

那么如果Topic总共就只有4个MessageQueue,只有4个消费者系统呢?
这个时候就没办法扩容消费者系统了,因为再多的消费者系统,还是只有4个MessageQueue,没法并行消费。

所以此时往往是临时修改那4个消费者系统的代码,让他们获取到消息然后不写入Redis,而是直接把消息写入一个新的Topic,这个速度是很快的,因为仅仅是读写MQ而已。

然后新的Topic有20个MessageQueue,然后再部署20台临时增加的消费者系统,去消费新的Topic后写入数据到NoSQL里去,这样子也可以迅速的增加消费者系统的并行处理能力,使用一个新的Topic来允许更多的消费者系统并行处理。