导航菜单
路很长,又很短
博主信息
昵   称:Cocodroid ->关于我
Q     Q:2531075716
博文数:311
阅读量:875770
访问量:86607
至今:
×
云标签 标签球>>
云标签 - Su的技术博客
博文->>首页 博主的更多博文>>
RocketMQ最佳实践
Tags : RocketMQ,消息队列,MQ发表时间: 2019-08-17 22:19:16
原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 、作者信息和本声明。否则将追究法律责任。
比如: 转自:Su的技术博客  原文地址:



RocketMQ是阿里开源的消息队列框架,如今也已成为Apache顶级项目,RockerMQ是一个非常优秀的框架,现在大部分互联网公司使用的消息队列也是RocketMQ,在我们使用的过程中,如果能一开始就给你最佳实践,可以避免走一些弯路,甚至你看完之后可以自身检查下你们是不是这样使用,没有的话可以进行适当的调整,这篇文章应该能够帮助你更好的使用RockerMQ。


1

〓Producer最佳实践


1、Topic
一个应用尽可能用一个 Topic,消息子类型用 tags 来标识,tags 可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用 tags 在 broker 做消息过滤:message.setTags("TagA")。

2、Key
每个消息在业务层面的唯一标识码,要设置到 keys 字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。

每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。

由于是哈希索引,请务必保证 key 尽可能唯一,这样可以避免潜在的哈希冲突。
// 订单Id String orderId = "20034568923546"; message.setKeys(orderId);

3、日志
消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

send消息方法只要不抛异常,就代表发送成功。

发送成功会有多个状态,在sendResult里定义。

4、重发
对于消息不可丢失应用,务必要有消息重发机制。

例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。

5、sendOneWay
某些应用如果不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

对可靠性要求并不高,例如日志收集类应用,此类应用可以采用oneway形式调用。


1


Consumer最佳实践



1、幂等

消费过程要做到幂等(即消费端去重)。

RocketMQ目前无法避免消息重复,所以如果业务对消费重复非常敏感,务必要在业务层面去重,有以下几种去重方式:

a).将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或Tair(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。

b). 用业务层面的状态机去重。

2、批量消费
尽量使用批量方式消费方式,可以很大程度上提高消费吞吐量。

某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。

3、 跳过非重要消息
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。

例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。

示例代码如下:

public ConsumeConcurrentlyStatus consumeMessage(
 List<MessageExt> msgs,
 ConsumeConcurrentlyContext context) {
 long offset = msgs.get(0).getQueueOffset();
 String maxOffset =
 msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
 long diff = Long.parseLong(maxOffset) - offset;
 if (diff > 100000) {
 // TODO 消息堆积情况的特殊处理
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }
 // TODO 正常消费过程
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 }

4、提高消费并行度
a). 同一个ConsumerGroup下,通过增加Consumer实例数量来提高并行度,超过订阅队列数的Consumer实例无效。

可以通过加机器,或者在已有机器启动多个进程的方式。

b). 提高单个Consumer的消费并行线程,通过修改以下参数: 
consumeThreadMin consumeThreadMax

5、优化每条消息消费过程
举例如下,某条消息的消费过程如下:
  • 根据消息从 DB 查询【数据 1】
  • 根据消息从 DB 查询【数据 2】
  • 复杂的业务计算
  • 向 DB 插入【数据 3】
  • 向 DB 插入【数据 4】

这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。

所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCSI磁盘,前者的RT会小很多。

6、日志
消费时记录日志,以便后续定位问题。

如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。

 public ConsumeConcurrentlyStatus consumeMessage(
 List<MessageExt> msgs,
 ConsumeConcurrentlyContext context) {
 log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
 // TODO 正常消费过程
 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
 } 

如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。

〓其他配置


1、线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。

RocketMQ在发送消息时,会首先获取路由信息。如果是新的消息,由于MQServer上面还没有创建对应的Topic,这个时候,如果上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面创建名为TBW102的TOPIC)路由信息,然后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic还没有创建,就会自动创建topic。

后果就是:以后所有该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

所以基于目前RocketMQ的设计,建议关闭自动创建TOPIC的功能,然后根据消息量的大小,手动创建TOPIC。

2、JVM选项
推荐使用最新发布的JDK 1.8版本。通过设置相同的Xms和Xmx值来防止JVM调整堆大小以获得更好的性能。

简单的JVM配置如下所示:
-server -Xms8g -Xmx8g -Xmn4g
如果您不关心RocketMQ Broker的启动时间,还有一种更好的选择,就是通过“预触摸”Java堆以确保在JVM初始化期间每个页面都将被分配。

那些不关心启动时间的人可以启用它: -XX:+AlwaysPreTouch
禁用偏置锁定可能会减少JVM暂停,  -XX:-UseBiasedLocking
至于垃圾回收,建议使用带JDK 1.8的G1收集器。
-XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30

这些GC选项看起来有点激进,但事实证明它在我们的生产环境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件:
-XX:+UseGCLogFileRotation  -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m

如果写入GC文件会增加代理的延迟,可以考虑将GC日志文件重定向到内存文件系统:
-Xloggc:/dev/shm/mq_gc_%p.log123

3、Linux内核参数
os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。

下面的参数需要注意,更多细节请参考/proc/sys/vm/*的文档
  • vm.extra_free_kbytes,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关)
  • vm.min_free_kbytes,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。
  • vm.max_map_count,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness)
  • vm.swappiness,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。
  • File descriptor limits,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。
  • Disk scheduler,RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。

    如果您有更好的实践经验,欢迎留言。



▽参考资料:
http://jm.taobao.org/2017/03/09/20170309/
https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md




推荐阅读


-关注搬运工来架构,与优秀的你一同进步-


打赏
打赏
关注公众号
公众号
类别:消息队列| 阅读(267)| 赞 (0)
评论
暂无评论!
发表评论
昵  称:

验证码:

内  容:

    同时赞一个 赞