跳至主要内容
版本: 5.0

事务消息

事务消息是 Apache RocketMQ 中一种高级消息类型。本主题介绍事务消息的应用场景、工作机制、限制、使用方式和使用注意事项。

场景

分布式事务

当核心业务逻辑在分布式系统中执行时,会同时调用多个下游业务来处理逻辑。因此,确保核心业务与下游业务之间执行结果的一致性,是分布式事务需要解决的最大挑战。 事务消息诉求

在电商场景中,当用户下单时,会触发下游系统进行相应的变更。例如,物流系统需要发起发货,积分系统需要更新用户的积分,购物车系统需要清空用户的购物车。处理分支包括

  • 订单系统将订单状态从未支付变更为已支付。

  • 物流系统添加待发货记录,并创建订单物流记录。

  • 积分系统更新用户的积分。

  • 购物车系统清空购物车,并更新用户的购物车记录。

传统 XA 解决方案:性能低下

通常使用基于 XA 协议的分布式事务系统来确保分支之间结果的一致性。该系统将四个调用分支封装成一个包含四个独立事务分支的大事务。虽然这种方案可以确保结果一致性,但需要锁定大量资源才能实现。随着分支数量的增加,这个数字也会增加,导致系统并发性降低。随着更多下游分支的添加,系统性能会下降。

基于普通消息的解决方案:结果一致性差

基于 XA 解决方案的更简单方案将订单系统的变更视为本地事务,将下游系统的变更视为下游任务。事务分支被视为添加了订单表事务的普通消息。这种方案异步处理消息,缩短了处理生命周期,提高了系统并发性。 普通消息方案

但是,这种方案容易导致核心事务与事务分支之间结果不一致,例如

  • 消息发送成功,但订单未执行。导致整个事务需要回滚。

  • 订单执行成功,但消息未发送。在这种情况下,需要重新发送消息以供消费。

  • 无法可靠地检测超时错误,难以判断是否需要回滚订单或提交订单变更。

Apache RocketMQ 基于分布式事务消息的解决方案:彻底一致性

前面方案无法保证一致性的原因是,普通消息不具备独立数据库事务的提交、回滚和统一协调能力。

Apache RocketMQ 的事务消息功能在基于普通消息的解决方案基础上,支持两阶段提交。该功能将两阶段提交与本地事务相结合,实现提交结果的全局一致性。 事务消息

Apache RocketMQ 的事务消息解决方案功能强大、可扩展性高且易于开发。有关事务消息工作机制和流程的更多信息,请参见工作机制。

工作机制

定义

事务消息是 Apache RocketMQ 提供的一种高级消息类型,用于确保消息生产与本地事务之间的最终一致性。处理流程

下图显示了事务消息的交互过程。 事务消息

  1. 生产者将消息发送到 Apache RocketMQ 代理。

  2. Apache RocketMQ 代理保存消息,并将其标记为不可交付。处于这种状态的消息称为半消息。之后,代理向生产者发送确认消息 (ACK)。

  3. 生产者执行本地事务。

  4. 生产者向代理发送第二个 ACK,以提交本地事务的执行结果。执行结果可能是 Commit 或 Rollback。

    • 如果代理接收到的消息状态为 Commit,代理将半消息标记为可交付,并将消息传递给消费者。

    • 如果代理接收到的消息状态为 Rollback,代理将回滚事务,不会将半消息传递给消费者。

  5. 如果网络断开连接或生产者应用程序重新启动,并且代理未收到第二个 ACK 或半消息状态为 Unknown,代理将等待一段时间,并向生产者集群中的生产者发送请求,以查询半消息的状态。注意 有关时间段长度和最大查询次数的更多信息,请参见参数限制

  1. 生产者收到请求后,会检查与半消息对应的本地事务的执行结果。

  2. 生产者根据本地事务的执行结果,向 Apache RocketMQ 代理发送另一个 ACK。然后,代理按照步骤 4 处理半消息。

事务消息的生命周期 事务消息

  • 初始化:消息由生产者构建和初始化,并准备发送到代理。

  • 事务挂起:半消息发送到代理。但是,它不会立即写入磁盘以进行永久存储。相反,它存储在事务存储系统中。在系统验证本地事务的第二阶段成功之前,不会提交消息。在此期间,消息对下游消费者不可见。

  • 回滚:在第二阶段,如果事务的执行结果为回滚,代理将回滚半消息并终止工作流程。

  • 准备就绪:消息发送到代理,对消费者可见,并可供消费。

  • 飞行中:消息由消费者获取,并根据消费者的本地业务逻辑进行处理。

    在此过程中,代理等待消费者完成消费并提交消费结果。如果在一定时间段内未收到消费者的响应,Apache RocketMQ 将重试消息。有关更多信息,请参见 消费重试

  • 已确认:消费者完成消费并向代理提交消费结果。代理标记当前消息是否已成功消费。

    默认情况下,Apache RocketMQ 会保留所有消息。当提交消费结果时,消息数据在逻辑上被标记为已消费,而不是立即删除。因此,消费者可以在消息因保留期到期或存储空间不足而被删除之前,回溯消息以重新消费。

  • 已删除:当消息的保留期到期或存储空间不足时,Apache RocketMQ 会以滚动方式从物理文件中删除最早保存的消息。有关更多信息,请参见 消息存储和清理

使用限制

消息类型一致性

事务消息只能用于 MessageType 为 Transaction 的主题。

以事务为中心的消费

Apache RocketMQ 的事务消息功能保证本地核心事务与下游分支之间可以处理相同的事务。但是,它不保证消息消费结果与上游执行结果之间的一致性。因此,下游业务必须确保消息被正确处理。我们建议消费者 消费重试 适当,以确保消息在发生故障时被正确处理。

中间状态可见性

Apache RocketMQ 的事务消息功能仅确保最终一致性,这意味着在消息传递给消费者之前,上游事务与下游分支之间不保证状态一致性。因此,事务消息仅适用于接受异步执行的事务场景。

事务超时机制

Apache RocketMQ 为事务消息实现了超时机制。在收到半消息后,如果代理在一定时间段内无法确定是提交还是回滚事务,代理默认会回滚消息。有关超时时间段的更多信息,请参见参数限制

示例

创建主题

对于在 Apache RocketMQ 5.0 中创建主题,建议使用 mqadmin 工具。但是,需要注意的是,消息类型需要作为属性参数添加。以下是一个示例

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Transaction

发送消息

发送事务消息与发送普通消息的不同之处在于以下几个方面

  • 在发送事务消息之前,必须启用事务检查器并将其与本地事务执行相关联。

  • 创建生产者时,必须设置事务检查器并绑定要发送的消息的主题列表。这些操作使客户端的内置事务检查器能够在发生异常时恢复主题。

创建 TRANSACTION 主题

NORMAL 主题不支持发送 TRANSACTION 消息,如果将 TRANSACTION 消息发送到 NORMAL 主题,则会收到错误。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION
  • -c 集群名称
  • -t 主题名称
  • -n 命名服务器的地址
  • -a 额外属性,我们添加了一个值为 TRANSACTIONmessage.type 属性来支持发送 TRANSACTION 消息。

以下示例使用 Java 作为示例,向您展示如何发送事务性消息

    // The demo is used to simulate the order table query service to check whether the order transaction is submitted. 
private static boolean checkOrderById(String orderId) {
return true;
}
// The demo is used to simulate the execution result of a local transaction.
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
// Build a transaction producer: The transactional message requires the producer to build a transaction checker to check the intermediate status of an exceptional half message.
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* The transaction checker checks whether the local transaction is correctly committed or rolled back based on the business ID, for example, an order ID.
* If this order is found in the order table, the order insertion action is committed correctly by the local transaction. If no order is found in the order table, the local transaction has been rolled back.
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// Message error. Rollback is returned.
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
// Create a transaction branch.
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
// If the transaction branch fails to be created, the transaction is terminated.
return;
}
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
.setTag("messageTag")
// For transactional messages, a unique ID associated with the local transaction is created to verify the query of the local transaction status.
.addProperty("OrderId", "xxx")
// Message body.
.setBody("messageBody".getBytes())
.build();
// Send a half message.
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
// If the half message fails to be sent, the transaction can be terminated and the message is rolled back.
return;
}
/**
* Execute the local transaction and check the execution result.
* 1. If the result is Commit, deliver the message.
* 2. If the result is Rollback, roll back the message.
* 3. If an unknown exception occurs, no action is performed until a response is obtained from a half message status query.
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// You can determine whether to retry the message based on your business requirements. If you do not want to retry the message, you can use the half message status query to submit the transaction status.
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// We recommend that you record the exception information. This enables you to submit the transaction status based on the half message status query in the event of a rollback exception. Otherwise, you have to retry the message.
e.printStackTrace();
}
}
}

使用说明

避免大量半消息超时。

Apache RocketMQ 允许您在事务提交期间发生异常时检查事务,以确保事务一致性。但是,生产者应尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能下降并延迟事务处理。

正确处理正在进行的事务。

在半消息状态查询期间,不要为正在进行的事务返回 Rollback 或 Commit。相反,请将事务的 Unknown 状态保留下来。

通常,事务正在进行的原因是事务执行速度慢且查询频繁。建议两种解决方案

  • 将第一次查询的间隔设置为较大的值。但是,这可能会导致依赖查询结果的消息出现较大延迟。

  • 使程序能够正确识别正在进行的事务。