- 1 问题背景
- 2 验证
- 2.1 生产者启动
- 2.2 消费者启动
- 2.3 执行回溯
- 2.4 结果验证
- 2.5 验证小结
- 3 分析
- 3.1 策略模式,解析命令行
- 3.2 创建客户端,与服务端交互
- 3.3 获取topic对应的broker地址,提交重置请求
- 3.4 与 nameserver交互获取broker地址
- 3.5 与broker交互,执行重置操作
- 3.6 消费客户端收到请求,开始处理
- 4 核心流程
- 5 总结
- 6 延申
- 7 对比
1 问题背景
前段时间,小A公司的短信服务出现了问题,导致一段时间内的短信没有发出去,等服务修复后,需要重新补发这批数据。
由于短信服务是直接通过RocketMQ触发,因此在修复这些数据的时候,小A犯了难,于是就有了以下对话
领导:小A呀,这数据这么多,你准备怎么修呀?
小A:头大呀领导,一般业务我们都有一个本地消息表来做幂等,我只需要把数据库表的状态重置,然后把数据捞出来重新循环执行就可以啦,但是短信服务我们没有本地表呀!
领导:那你有什么想法吗?
小A:简单的话,那就让上游重发吧,我们再消费一遍就好了。
领导:这样问题就更严重了呀,你想,上游重发一遍,那是不是所有的消费者组都要重新消费一遍,到时候其他业务同学就要来找你了。
小A:那就不好办了。。。
领导:其实RocketMQ有专门的消息回溯的能力,你可以试试
小A:这么神奇?我研究研究。。。
2 验证
2.1 生产者启动
准备一个新的topic,并发送1W条消息
public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 10000; i++) { try { Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); }
2.2 消费者启动
准备一个新的消费者组,消费topic下数据并记录总条数
public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*");final AtomicInteger count = new AtomicInteger(); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); count.incrementAndGet(); System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs); System.out.println(count.get()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();
}
消费者消息记录2.3 执行回溯
命令行执行
mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000以下为mqadmin.cmd的内容,因此也可以直接通过调用MQAdminStartup的main方法执行
MQAdminStartup手动执行
代码执行
public static void main(String[] args) { String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t", "TopicTest", "-g", "please_rename_unique_group_name_4", "-s", "1722240069000"}; MQAdminStartup.main(params); }2.4 结果验证
2.5 验证小结
从结果上来看,消费者offset被重置到了指定的时间戳位置,由于指定时间戳早于最早消息的创建时间,因此重新消费了所有未被删除的消息。
那rocketmq究竟做了什么呢?
2.5.1 分析参数
动作标识:resetOffsetByTime
额外参数:
-n nameserver的地址
-t 指定topic名称
-g 指定消费者组名称
-s 指定回溯时间
2.5.2 思考
3 分析
以下源码部分均出自4.2.0版本,展示代码有所精简。
3.1 策略模式,解析命令行
org.apache.rocketmq.tools.command.MQAdminStartup#main
/*根据动作标识解析除对应的处理类,我们本次请求实际处理策略类:ResetOffsetByTimeCommand*/ SubCommand cmd = findSubCommand(args[0]); /*解析命令行*/ Options options = ServerUtil.buildCommandlineOptions(new Options()); CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());/提交请求执行/ cmd.execute(commandLine, options, rpcHook);
3.2 创建客户端,与服务端交互
org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);String group = commandLine.getOptionValue("g").trim();//消费者组 String topic = commandLine.getOptionValue("t").trim();//主题 String timeStampStr = commandLine.getOptionValue("s").trim();//重置时间戳 long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);//重置时间戳 boolean isC = false;//是否C客户端 boolean force = true;//是否强制重置,这里提前解释一下,有可能时间戳对应的offset比当前消费进度要大,强制的话会出现部分消息消费不到 if (commandLine.hasOption('f')) { force = Boolean.valueOf(commandLine.getOptionValue("f").trim()); } /*与nameserver以及broker交互的客户端启动*/ defaultMQAdminExt.start(); /*正式执行命令*/ Map<MessageQueue, Long> offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
}
3.3 获取topic对应的broker地址,提交重置请求
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException { /*从nameserver处获取broker地址*/ TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic); /*由于消息数据分区分片,topic下的messagequeue可能存在多个broker上,因此这是个列表*/ List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas(); Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>(); if (brokerDatas != null) { for (BrokerData brokerData : brokerDatas) { String addr = brokerData.selectBrokerAddr(); if (addr != null) { /*循环与各个broker交互,执行重置操作*/ Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC); if (offsetTable != null) { allOffsetTable.putAll(offsetTable); } } } } return allOffsetTable; }3.4 与 nameserver交互获取broker地址
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#examineTopicRouteInfo
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader(); requestHeader.setTopic(topic); /*同样的组装参数,请求码:105*/ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);/*创建请求与nameserver交互*/ RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis); byte[] body = response.getBody(); if (body != null) { return TopicRouteData.decode(body, TopicRouteData.class); }
}
3.4.1 nameserver收到请求,获取路由信息并返回
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);/*nameserver内部存储topic的路由信息*/ TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
3.4.2 RouteInfoManager的核心属性
//topic路由信息,根据这个做负载均衡,QueueData里面记录brokerName private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //broke基本信息 名称 所在集群信息 主备broke地址 brokerId=0表示master >0表示slave private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //集群信息,包含集群所有的broke信息 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //存活的broke信息,以及对应的channel private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //broke的过滤类信息 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;3.5 与broker交互,执行重置操作
org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) throws RemotingException, MQClientException, InterruptedException {ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); requestHeader.setTimestamp(timestamp); requestHeader.setForce(isForce); /*同样的组装参数,请求码:222*/ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); if (isC) { request.setLanguage(LanguageCode.CPP); }
/创建请求与broker交互,注意这里是同步invokeSync/
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
if (response.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
return body.getOffsetTable();
}
}broker收到请求,开始处理
org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, boolean isC) { final RemotingCommand response = RemotingCommand.createResponseCommand(null);TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); /*记录下该消费者组消费topic下的队列要重置到哪条offset*/ Map<MessageQueue/*队列*/, Long/*offser*/> offsetTable = new HashMap<MessageQueue, Long>(); for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); mq.setTopic(topic); mq.setQueueId(i); /*broker可以获取该topic下的consumergroup下的某个队列的offset*/ long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);//消费者组当前已经消费的offset if (-1 == consumerOffset) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("THe consumer group <%s> not exist", group)); return response; } long timeStampOffset; if (timeStamp == -1) {
//没有指定表示当前队列最大的offset
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
//根据时间戳查到队列下对应的offset
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}if (timeStampOffset < 0) { //<0表示消息已经被删掉了 log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset); timeStampOffset = 0; } /*如果isForce=false,则要重置的offset<当前正在消费的offset才会重置。也过来,也就是说重置不仅会回溯,消费进度过慢也可以往后拨,加快消费进度*/ if (isForce || timeStampOffset < consumerOffset) { offsetTable.put(mq, timeStampOffset); } else { offsetTable.put(mq, consumerOffset); } } /*确定了要先重置的offset之后开始与客户端交互,准备客户端重置,请求码220*/ ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); requestHeader.setTimestamp(timeStamp); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); if (isC) { // c++ language ResetOffsetBodyForC body = new ResetOffsetBodyForC(); List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable); body.setOffsetTable(offsetList); request.setBody(body.encode()); } else { // other language ResetOffsetBody body = new ResetOffsetBody(); body.setOffsetTable(offsetTable); request.setBody(body.encode()); } /*拿到与当前broker建立连接的消费者组客户端信息*/ ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(group); if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { //获取长连接channel ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = consumerGroupInfo.getChannelInfoTable(); for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); /*这里版本可以判断,只有客户端版本>3.0.7才支持重置*/ if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { try { /*注意这里是只管发不管收,可以简单理解为异步了invokeOneway*/ this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", topic, group, entry.getValue().getClientId()); } catch (Exception e) { log.error("[reset-offset] reset offset exception. topic={}, group={}", new Object[] {topic, group}, e); } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version)); log.warn("[reset-offset] the client does not support this feature. version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return response; } } } else { String errorInfo = String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", requestHeader.getGroup(), requestHeader.getTopic(), requestHeader.getTimestamp()); log.error(errorInfo); response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); response.setRemark(errorInfo); return response; } response.setCode(ResponseCode.SUCCESS); ResetOffsetBody resBody = new ResetOffsetBody(); resBody.setOffsetTable(offsetTable); response.setBody(resBody.encode()); return response;
}
3.6 消费客户端收到请求,开始处理
org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) { DefaultMQPushConsumerImpl consumer = null; try { /*根据消费者组找到对应的消费实现,即我们熟悉的DefaultMQPushConsumerImpl或者DefaultMQPullConsumerImpl*/ MQConsumerInner impl = this.consumerTable.get(group); if (impl != null && impl instanceof DefaultMQPushConsumerImpl) { consumer = (DefaultMQPushConsumerImpl) impl; } else { //由于PullConsumer消费进度自己控制,因此直接返回 log.info("[reset-offset] consumer dose not exist. group={}", group); return; }consumer.suspend();//暂停消费 /*暂停消息拉取,以及待处理的消息缓存都清掉*/ ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable(); for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) { ProcessQueue pq = entry.getValue(); pq.setDropped(true); pq.clear(); } } /*这里的等待实现比较简单,与broker交互是同步,broker与consumer交互是异步,因此这里阻塞10秒是为了保证所有的consumer都在这里存储offset并触发reblance*/ try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { } Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator(); while (iterator.hasNext()) { MessageQueue mq = iterator.next(); //获取messagequeue应该被重置的offset Long offset = offsetTable.get(mq); if (topic.equals(mq.getTopic()) && offset != null) { try { /*更新更新本地offset,这里注意集群模式是先修改本地,然后定时任务每五秒上报broker,而广播模式offset在本地存储,因此只需要修改消费者本地的offset即可*/ consumer.updateConsumeOffset(mq, offset); consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq)); iterator.remove(); } catch (Exception e) { log.warn("reset offset failed. group={}, {}", group, mq, e); } } } } finally { if (consumer != null) { /*重新触发reblance,由于broker已经重置的该消费者组的offset,因此重分配后以broker为准*/ consumer.resume(); } }
}
4 核心流程
消息回溯全流程
5 总结
消息回溯功能是 RocketMQ 提供给业务方的定心丸,业务在出现任何无法恢复的问题后,都可以及时通过消息回溯来恢复业务或者订正数据。特别是在流或者批计算的场景,重跑数据往往是常态。
RocketMQ 能实现消息回溯功能得益于其简单的位点管理机制,可以很容易通过 mqadmin 工具重置位点。但要注意,由于topic的消息实际都是存储在broker上,且有一定的删除机制,因此首先要确认需要消息回溯的集群broker不能下线节点或者回溯数据被删除之前的时间点,确保消息不会丢失。
6 延申
通过消息回溯的功能,我们可以任意向前或者向后拨动offset,那当我们想要指定一个区间进行消费,这个时候怎么办呢。比如当消费进度过慢,我们选择向后拨动offset,那就会有一部分未消费的消息出现,针对这部分消息,我们应该在空余时间把他消费完成,就需要指定区间来消费了。
其实通过上面代码org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset中我们可以看到,对于DefaultMQPullConsumerImpl类型的消费者,消息重置是不生效的,这是因为DefaultMQPullConsumerImpl的消费进度完全由消费者来控制,那我们就可以采用拉模式来进行消费。
示例代码:
public class PullConsumerLocalTest { private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>(); private static final Map<MessageQueue, Pair<Long/*最小offset*/,Long/*最大offset*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>(); private static final Long MIN_TIMESTAMP = 1722240069000L;//最小时间戳 private static final Long MAX_TIMESTAMP = 1722240160000L;//最大时间戳public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5"); consumer.setNamesrvAddr("127.0.0.1:9876"); consumer.start(); /*初始化待处理的offset*/ String topic = "TopicTest"; init(consumer, topic); Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic); for (MessageQueue mq : mqs) { System.out.printf("Consume from the queue: %s%n", mq); SINGLE_MQ: while (true) { try { PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); System.out.printf("%s%n", pullResult); putMessageQueueOffset(mq, pullResult.getNextBeginOffset()); switch (pullResult.getPullStatus()) { case FOUND: //check max offset and dosomething... break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break SINGLE_MQ; case OFFSET_ILLEGAL: break; default: break; } } catch (Exception e) { e.printStackTrace(); } } } consumer.shutdown(); } private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException { Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic); for (MessageQueue mq : mqs) { long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP); long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP); //记录区间内范围内最小以及最大的offset QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset)); //将最小offset写为下次消费的初始offset OFFSE_TABLE.put(mq, minOffset); } } private static long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if (offset != null) return offset; return 0; } private static void putMessageQueueOffset(MessageQueue mq, long offset) { OFFSE_TABLE.put(mq, offset); }
}
7 对比
方式 | 优点 | 缺点 |
---|---|---|
消费者本地消息表 | 业务完全可控 | 额外存储开销,重复消费需要单独开发 |
消息重置 | 无需业务修改,支持广播/集群,顺序/无序消息(有幂等操作的需要重置状态) | 低版本3.0.7之前不支持 |
pull手动控制 | 消费进度完全可控 | 需要考虑offset维护,复杂度较高 |
关于作者
李志浩,采货侠JAVA开发工程师
本文仅供学习!所有权归属原作者。侵删!文章来源: 转转技术 -李志浩 :http://mp.weixin.qq.com/s/gI303eszTbTi5hin-dw4UQ