Su的技术博客

  • 首页
  • 原创
  • 视频
  • Java
  • MySQL
  • DDD
  • 事故复盘
  • 架构方案
  • AI
  • Other
  • 工具
    • AI工具集
    • 工具清单
    • JSON在线格式化
    • JSON在线比较
    • SQL在线格式化
  • 打赏
  • 关于
路很长,又很短
  1. 首页
  2. Java
  3. 正文
                           

【消息回溯】RocketMQ消息回溯实践与解析

2024-10-01 1351点热度 0人点赞 0条评论
  • 1 问题背景
  • 2 验证
    • 2.1 生产者启动
    • 2.2 消费者启动
    • 2.3 执行回溯
    • 2.4 结果验证
    • 2.5 验证小结
  • 3 分析
    • 3.1 策略模式,解析命令行
    • 3.2 创建客户端,与服务端交互
    • 3.3 获取topic对应的broker地址,提交重置请求
    • 3.4 与 nameserver交互获取broker地址
    • 3.5 与broker交互,执行重置操作
    • 3.6 消费客户端收到请求,开始处理
  • 4 核心流程
  • 5 总结
  • 6 延申
  • 7 对比

1 问题背景

前段时间,小A公司的短信服务出现了问题,导致一段时间内的短信没有发出去,等服务修复后,需要重新补发这批数据。

由于短信服务是直接通过RocketMQ触发,因此在修复这些数据的时候,小A犯了难,于是就有了以下对话

领导:小A呀,这数据这么多,你准备怎么修呀?

小A:头大呀领导,一般业务我们都有一个本地消息表来做幂等,我只需要把数据库表的状态重置,然后把数据捞出来重新循环执行就可以啦,但是短信服务我们没有本地表呀!

领导:那你有什么想法吗?

小A:简单的话,那就让上游重发吧,我们再消费一遍就好了。

领导:这样问题就更严重了呀,你想,上游重发一遍,那是不是所有的消费者组都要重新消费一遍,到时候其他业务同学就要来找你了。

小A:那就不好办了。。。

领导:其实RocketMQ有专门的消息回溯的能力,你可以试试

小A:这么神奇?我研究研究。。。

2 验证

2.1 生产者启动

准备一个新的topic,并发送1W条消息

public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 10000; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }

 

2.2 消费者启动

准备一个新的消费者组,消费topic下数据并记录总条数

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
final AtomicInteger count = new AtomicInteger();

consumer.registerMessageListener(new MessageListenerConcurrently() {

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List&lt;MessageExt&gt; msgs,
        ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        count.incrementAndGet();
        System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);
        System.out.println(count.get());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

consumer.start();

}
消费者消息记录

2.3 执行回溯

命令行执行

mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000

以下为mqadmin.cmd的内容,因此也可以直接通过调用MQAdminStartup的main方法执行

RocketMQ消息回溯实践与解析

MQAdminStartup手动执行

代码执行

public static void main(String[] args) {
    String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t", "TopicTest", "-g", "please_rename_unique_group_name_4", "-s", "1722240069000"};
    MQAdminStartup.main(params);
}

2.4 结果验证

RocketMQ消息回溯实践与解析
客户端重置成功记录
RocketMQ消息回溯实践与解析
消费者重新消费记录

2.5 验证小结

从结果上来看,消费者offset被重置到了指定的时间戳位置,由于指定时间戳早于最早消息的创建时间,因此重新消费了所有未被删除的消息。

那rocketmq究竟做了什么呢?

2.5.1 分析参数

动作标识:resetOffsetByTime

额外参数:

-n nameserver的地址

-t 指定topic名称

-g 指定消费者组名称

-s 指定回溯时间

2.5.2 思考

RocketMQ消息回溯实践与解析
消息回溯思考

3 分析

以下源码部分均出自4.2.0版本,展示代码有所精简。

3.1 策略模式,解析命令行

org.apache.rocketmq.tools.command.MQAdminStartup#main

/*根据动作标识解析除对应的处理类,我们本次请求实际处理策略类:ResetOffsetByTimeCommand*/
SubCommand cmd = findSubCommand(args[0]);
/*解析命令行*/
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
                            new PosixParser());

/提交请求执行/ cmd.execute(commandLine, options, rpcHook);

3.2 创建客户端,与服务端交互

org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute

public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
String group = commandLine.getOptionValue("g").trim();//消费者组
String topic = commandLine.getOptionValue("t").trim();//主题
String timeStampStr = commandLine.getOptionValue("s").trim();//重置时间戳
long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);//重置时间戳
boolean isC = false;//是否C客户端
boolean force = true;//是否强制重置,这里提前解释一下,有可能时间戳对应的offset比当前消费进度要大,强制的话会出现部分消息消费不到
if (commandLine.hasOption('f')) {
    force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
}

/*与nameserver以及broker交互的客户端启动*/
defaultMQAdminExt.start();
/*正式执行命令*/
Map&lt;MessageQueue, Long&gt; offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);

}
 

3.3 获取topic对应的broker地址,提交重置请求

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp

public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce,
    boolean isC)
    throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    /*从nameserver处获取broker地址*/
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    /*由于消息数据分区分片,topic下的messagequeue可能存在多个broker上,因此这是个列表*/
    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
    Map<MessageQueue, Long> allOffsetTable = new HashMap<MessageQueue, Long>();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                /*循环与各个broker交互,执行重置操作*/
                Map<MessageQueue, Long> offsetTable =
                    this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce,
                        timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

3.4 与 nameserver交互获取broker地址

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#examineTopicRouteInfo

public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
    boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
 /*同样的组装参数,请求码:105*/
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
/*创建请求与nameserver交互*/
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
byte[] body = response.getBody();
if (body != null) {
    return TopicRouteData.decode(body, TopicRouteData.class);
}

}

3.4.1 nameserver收到请求,获取路由信息并返回

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
/*nameserver内部存储topic的路由信息*/
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
 

3.4.2 RouteInfoManager的核心属性

//topic路由信息,根据这个做负载均衡,QueueData里面记录brokerName
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
//broke基本信息 名称  所在集群信息   主备broke地址  brokerId=0表示master   >0表示slave
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
//集群信息,包含集群所有的broke信息
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
//存活的broke信息,以及对应的channel
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
//broke的过滤类信息
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

3.5 与broker交互,执行重置操作

org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset

public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group,
    final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
    throws RemotingException, MQClientException, InterruptedException {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timestamp);
requestHeader.setForce(isForce);

/*同样的组装参数,请求码:222*/
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
if (isC) {
    request.setLanguage(LanguageCode.CPP);
}

/创建请求与broker交互,注意这里是同步invokeSync/
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
if (response.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
return body.getOffsetTable();
}
}

broker收到请求,开始处理

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset

public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
    boolean isC) {
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);

/*记录下该消费者组消费topic下的队列要重置到哪条offset*/
Map&lt;MessageQueue/*队列*/, Long/*offser*/&gt; offsetTable = new HashMap&lt;MessageQueue, Long&gt;();

for (int i = 0; i &lt; topicConfig.getWriteQueueNums(); i++) {
    MessageQueue mq = new MessageQueue();
    mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
    mq.setTopic(topic);
    mq.setQueueId(i);

    /*broker可以获取该topic下的consumergroup下的某个队列的offset*/
    long consumerOffset =
        this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);//消费者组当前已经消费的offset
    if (-1 == consumerOffset) {
        response.setCode(ResponseCode.SYSTEM_ERROR);
        response.setRemark(String.format("THe consumer group &lt;%s&gt; not exist", group));
        return response;
    }

    long timeStampOffset;
    if (timeStamp == -1) {

//没有指定表示当前队列最大的offset
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
//根据时间戳查到队列下对应的offset
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}

    if (timeStampOffset &lt; 0) {
        //&lt;0表示消息已经被删掉了
        log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
        timeStampOffset = 0;
    }

    /*如果isForce=false,则要重置的offset&lt;当前正在消费的offset才会重置。也过来,也就是说重置不仅会回溯,消费进度过慢也可以往后拨,加快消费进度*/
    if (isForce || timeStampOffset &lt; consumerOffset) {
        offsetTable.put(mq, timeStampOffset);
    } else {
        offsetTable.put(mq, consumerOffset);
    }
}

/*确定了要先重置的offset之后开始与客户端交互,准备客户端重置,请求码220*/
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request =
    RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
if (isC) {
    // c++ language
    ResetOffsetBodyForC body = new ResetOffsetBodyForC();
    List&lt;MessageQueueForC&gt; offsetList = convertOffsetTable2OffsetList(offsetTable);
    body.setOffsetTable(offsetList);
    request.setBody(body.encode());
} else {
    // other language
    ResetOffsetBody body = new ResetOffsetBody();
    body.setOffsetTable(offsetTable);
    request.setBody(body.encode());
}

/*拿到与当前broker建立连接的消费者组客户端信息*/
ConsumerGroupInfo consumerGroupInfo =
    this.brokerController.getConsumerManager().getConsumerGroupInfo(group);

if (consumerGroupInfo != null &amp;&amp; !consumerGroupInfo.getAllChannel().isEmpty()) {
    //获取长连接channel
    ConcurrentMap&lt;Channel, ClientChannelInfo&gt; channelInfoTable =
        consumerGroupInfo.getChannelInfoTable();
    for (Map.Entry&lt;Channel, ClientChannelInfo&gt; entry : channelInfoTable.entrySet()) {
        int version = entry.getValue().getVersion();
        /*这里版本可以判断,只有客户端版本&gt;3.0.7才支持重置*/
        if (version &gt;= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
            try {
                /*注意这里是只管发不管收,可以简单理解为异步了invokeOneway*/
                this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}",
                    topic, group, entry.getValue().getClientId());
            } catch (Exception e) {
                log.error("[reset-offset] reset offset exception. topic={}, group={}",
                    new Object[] {topic, group}, e);
            }
        } else {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark("the client does not support this feature. version="
                + MQVersion.getVersionDesc(version));
            log.warn("[reset-offset] the client does not support this feature. version={}",
                RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
            return response;
        }
    }
} else {
    String errorInfo =
        String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d",
            requestHeader.getGroup(),
            requestHeader.getTopic(),
            requestHeader.getTimestamp());
    log.error(errorInfo);
    response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
    response.setRemark(errorInfo);
    return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;

}
 

3.6 消费客户端收到请求,开始处理

org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset

public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        /*根据消费者组找到对应的消费实现,即我们熟悉的DefaultMQPushConsumerImpl或者DefaultMQPullConsumerImpl*/
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            //由于PullConsumer消费进度自己控制,因此直接返回
            log.info("[reset-offset] consumer dose not exist. group={}", group);
            return;
        }
    consumer.suspend();//暂停消费

    /*暂停消息拉取,以及待处理的消息缓存都清掉*/
    ConcurrentMap&lt;MessageQueue, ProcessQueue&gt; processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
    for (Map.Entry&lt;MessageQueue, ProcessQueue&gt; entry : processQueueTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        if (topic.equals(mq.getTopic()) &amp;&amp; offsetTable.containsKey(mq)) {
            ProcessQueue pq = entry.getValue();
            pq.setDropped(true);
            pq.clear();
        }
    }

    /*这里的等待实现比较简单,与broker交互是同步,broker与consumer交互是异步,因此这里阻塞10秒是为了保证所有的consumer都在这里存储offset并触发reblance*/
    try {
        TimeUnit.SECONDS.sleep(10);
    } catch (InterruptedException e) {
    }

    Iterator&lt;MessageQueue&gt; iterator = processQueueTable.keySet().iterator();
    while (iterator.hasNext()) {
        MessageQueue mq = iterator.next();
        //获取messagequeue应该被重置的offset
        Long offset = offsetTable.get(mq);
        if (topic.equals(mq.getTopic()) &amp;&amp; offset != null) {
            try {
                /*更新更新本地offset,这里注意集群模式是先修改本地,然后定时任务每五秒上报broker,而广播模式offset在本地存储,因此只需要修改消费者本地的offset即可*/
                consumer.updateConsumeOffset(mq, offset);
                consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                iterator.remove();
            } catch (Exception e) {
                log.warn("reset offset failed. group={}, {}", group, mq, e);
            }
        }
    }
} finally {
    if (consumer != null) {
        /*重新触发reblance,由于broker已经重置的该消费者组的offset,因此重分配后以broker为准*/
        consumer.resume();
    }
}

}
 

4 核心流程

RocketMQ消息回溯实践与解析

消息回溯全流程

5 总结

消息回溯功能是 RocketMQ 提供给业务方的定心丸,业务在出现任何无法恢复的问题后,都可以及时通过消息回溯来恢复业务或者订正数据。特别是在流或者批计算的场景,重跑数据往往是常态。

RocketMQ 能实现消息回溯功能得益于其简单的位点管理机制,可以很容易通过 mqadmin 工具重置位点。但要注意,由于topic的消息实际都是存储在broker上,且有一定的删除机制,因此首先要确认需要消息回溯的集群broker不能下线节点或者回溯数据被删除之前的时间点,确保消息不会丢失。

6 延申

通过消息回溯的功能,我们可以任意向前或者向后拨动offset,那当我们想要指定一个区间进行消费,这个时候怎么办呢。比如当消费进度过慢,我们选择向后拨动offset,那就会有一部分未消费的消息出现,针对这部分消息,我们应该在空余时间把他消费完成,就需要指定区间来消费了。

其实通过上面代码org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset中我们可以看到,对于DefaultMQPullConsumerImpl类型的消费者,消息重置是不生效的,这是因为DefaultMQPullConsumerImpl的消费进度完全由消费者来控制,那我们就可以采用拉模式来进行消费。

示例代码:

public class PullConsumerLocalTest {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
    private static final Map<MessageQueue, Pair<Long/*最小offset*/,Long/*最大offset*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();
    private static final Long MIN_TIMESTAMP = 1722240069000L;//最小时间戳
    private static final Long MAX_TIMESTAMP = 1722240160000L;//最大时间戳
public static void main(String[] args) throws MQClientException {
    DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.start();

    /*初始化待处理的offset*/
    String topic = "TopicTest";
    init(consumer, topic);

    Set&lt;MessageQueue&gt; mqs = consumer.fetchSubscribeMessageQueues(topic);
    for (MessageQueue mq : mqs) {
        System.out.printf("Consume from the queue: %s%n", mq);
        SINGLE_MQ:
        while (true) {
            try {
                PullResult pullResult =
                    consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                System.out.printf("%s%n", pullResult);
                putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                switch (pullResult.getPullStatus()) {
                    case FOUND:
                        //check max offset and dosomething...
                        break;
                    case NO_MATCHED_MSG:
                        break;
                    case NO_NEW_MSG:
                        break SINGLE_MQ;
                    case OFFSET_ILLEGAL:
                        break;
                    default:
                        break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    consumer.shutdown();
}

private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
    Set&lt;MessageQueue&gt; mqs = consumer.fetchSubscribeMessageQueues(topic);
    for (MessageQueue mq : mqs) {
        long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
        long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
        //记录区间内范围内最小以及最大的offset
        QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair&lt;&gt;(minOffset, maxOffset));
        //将最小offset写为下次消费的初始offset
        OFFSE_TABLE.put(mq, minOffset);
    }
}

private static long getMessageQueueOffset(MessageQueue mq) {
    Long offset = OFFSE_TABLE.get(mq);
    if (offset != null)
        return offset;

    return 0;
}

private static void putMessageQueueOffset(MessageQueue mq, long offset) {
    OFFSE_TABLE.put(mq, offset);
}

}
 

7 对比

方式 优点 缺点
消费者本地消息表 业务完全可控 额外存储开销,重复消费需要单独开发
消息重置 无需业务修改,支持广播/集群,顺序/无序消息(有幂等操作的需要重置状态) 低版本3.0.7之前不支持
pull手动控制 消费进度完全可控 需要考虑offset维护,复杂度较高

 

关于作者

李志浩,采货侠JAVA开发工程师

 

本文仅供学习!所有权归属原作者。侵删!文章来源: 转转技术 -李志浩 :http://mp.weixin.qq.com/s/gI303eszTbTi5hin-dw4UQ

 

 

 

更多文章:

  1. GitHub Copilot Chat默认Prompt
  2. 设计模式在外卖营销业务中的实践
  3. 全链路压测之影子库及ShardingSphere实现影子库源码剖析
  4. 系统设计入门:成为高级软件工程师的指南
  5. 我们在顺序消息和事务消息方面的实践
  6. 【进阶玩法】策略+责任链+组合实现合同签章
  7. 用图讲解SOLID设计原则
  8. 浅析设计模式5 -- 责任链模式
  9. 殷浩详解DDD 第四讲:领域层设计规范
  10. Eureka源码剖析之三:服务拉取
标签: 消息回溯 架构 后端 转转 MQ RocketMQ 消息队列
最后更新:2024-10-01

coder

分享干货文章,学习先进经验。

打赏 点赞
< 上一篇
下一篇 >
广告
文章目录
  • 1 问题背景
  • 2 验证
    • 2.1 生产者启动
    • 2.2 消费者启动
    • 2.3 执行回溯
    • 2.4 结果验证
    • 2.5 验证小结
  • 3 分析
    • 3.1 策略模式,解析命令行
    • 3.2 创建客户端,与服务端交互
    • 3.3 获取topic对应的broker地址,提交重置请求
    • 3.4 与 nameserver交互获取broker地址
    • 3.5 与broker交互,执行重置操作
    • 3.6 消费客户端收到请求,开始处理
  • 4 核心流程
  • 5 总结
  • 6 延申
  • 7 对比
最新 热点 推荐
最新 热点 推荐
干货 | 论Elasticsearch数据建模的重要性 马蜂窝消息总线——面向业务的消息服务设计 基于 MySQL Binlog 实现可配置的异构数据同步 视频笔记:Google发布Agent2Agent协议 视频笔记:什么是微服务,为什么是微服务? 视频笔记:什么是AI 智能体? 视频笔记:什么是Flink? 如何秒级实现接口间“幂等”补偿:一款轻量级仿幂等数据校正处理辅助工具
Elasticsearch 使用误区之六——富文本内容写入前不清洗基于 MySQL Binlog 实现可配置的异构数据同步马蜂窝消息总线——面向业务的消息服务设计干货 | 论Elasticsearch数据建模的重要性你可以不用RxJava,但必须得领悟它的思想!如何秒级实现接口间“幂等”补偿:一款轻量级仿幂等数据校正处理辅助工具视频笔记:什么是Flink?视频笔记:什么是AI 智能体?
猪八戒十年DevOps演进之路 系统设计 | 企业应用数据交换 系统设计 | RESTful API 使用问题和建议 大型系统架构重构10步法 记一次网络请求连接超时的事故 Kafka为什么要去掉ZooKeeper?一文了解Kafka 中 ZooKeeper 的演变过程 仅使用set属性值就把数据库数据给改了 26 条有效的AI提示词技巧

CRUD (1) Event Sourcing (1) graphql (1) id (1) NoSQL (1) quarkus (1) rest (1) RocketMQ (2) Spring Boot (1) zk (1) zookeeper (1) 上下文 (1) 事务消息 (1) 二级缓存 (1) 值对象 (1) 关系数据库 (1) 分布式缓存 (1) 原子性 (1) 唯一ID (1) 商品 (1) 多对多 (1) 子域 (1) 字符集 (1) 客户端心跳 (1) 幂等 (2) 干货 (1) 并发 (1) 应用场景 (1) 应用架构图 (1) 康威定律 (2) 异步复制 (1) 微服务架构 (2) 总体方案 (1) 技术方案 (2) 技术架构 (2) 技术架构图 (1) 技能 (1) 持续集成 (1) 支撑域 (1) 故障恢复 (1) 数据架构图 (1) 方案选型 (1) 日记 (1) 服务发现 (1) 服务治理 (1) 服务注册 (2) 机房 (1) 核心域 (1) 泄漏 (1) 洋葱架构 (1) 消息队列 (5) 源码剖析 (1) 灰度发布 (1) 熔断 (1) 生态 (1) 画图工具 (1) 研发团队 (1) 线程 (2) 组织架构 (1) 缓存架构 (1) 编码 (1) 视频 (18) 读写分离 (1) 贵州 (1) 软件设计 (1) 迁移 (1) 通用域 (1) 集群化 (1) 雪花算法 (1) 顺序消息 (1)

推荐链接🔗
  • AI工具集
  • 工具箱🛠️

COPYRIGHT © 2014-2025 verysu.com . ALL RIGHTS RESERVED.

Theme Kratos Made By Seaton Jiang

粤ICP备15033072号-2

x