跳到主内容
版本:5.0

事务消息

事务消息是 Apache RocketMQ 中一种高级消息类型。本文介绍事务消息的应用场景、工作机制、限制、用法和使用注意事项。

场景

分布式事务

当核心业务逻辑在分布式系统中执行时,会同时调用多个下游业务来处理该逻辑。因此,确保核心业务和下游业务之间执行结果的一致性是分布式事务需要解决的最大挑战。 事务消息诉求

在电商场景中,当用户下单时,会触发下游系统进行相应的变更。例如,物流系统必须启动发货,积分系统必须更新用户的积分,购物车系统必须清空用户的购物车。处理分支包括

  • 订单系统将订单状态从未支付变更为已支付。

  • 物流系统增加待发货记录并创建订单物流记录。

  • 积分系统更新用户的积分。

  • 购物车系统清空购物车并更新用户的购物车记录。

传统基于 XA 的方案:性能差

确保各分支结果一致性的典型方法是使用基于扩展架构(XA)协议的分布式事务系统。该系统将四个调用分支封装到一个包含四个独立事务分支的大事务中。虽然此解决方案可以确保结果一致性,但需要锁定大量资源才能实现。此数量随分支数量的增加而增加,导致系统并发性较低。随着下游分支的增加,系统性能会下降。

普通消息方案:结果一致性差

一种更简单的基于 XA 的解决方案是将订单系统的变更视为本地事务,将下游系统的变更视为下游任务。事务分支被视为添加了订单表事务的普通消息。此解决方案异步处理消息,以缩短处理生命周期并提高系统并发性。 普通消息方案

但是,此解决方案容易导致核心事务与事务分支之间结果不一致,例如

  • 消息已发送,但订单未执行。因此,整个事务需要回滚。

  • 订单已执行,但消息未发送。在这种情况下,消息必须重新发送以供消费。

  • 无法可靠地检测超时错误,这使得很难确定订单是否需要回滚或订单变更是否需要提交。

Apache RocketMQ 基于分布式事务消息的方案:彻底一致性

前述解决方案无法保证一致性的原因在于普通消息不具备独立数据库事务的提交、回滚和统一协调能力。

Apache RocketMQ 的事务消息功能在普通消息方案的基础上支持两阶段提交。该功能结合了两阶段提交和本地事务,以实现提交结果的全局一致性。 事务消息

Apache RocketMQ 的事务消息解决方案功能强大、可扩展且易于开发。有关事务消息的工作机制和流程的更多信息,请参阅工作机制。

工作机制

定义

事务消息是 Apache RocketMQ 提供的一种高级消息类型,旨在确保消息生产与本地事务之间的最终一致性。处理流程

下图展示了事务消息的交互流程。事务消息

  1. 生产者向 Apache RocketMQ Broker 发送消息。

  2. Apache RocketMQ Broker 保存消息并将其标记为不可投递。处于此状态的消息称为半消息。之后,Broker 向生产者发送确认消息(ACK)。

  3. 生产者执行本地事务。

  4. 生产者向 Broker 发送第二次 ACK,以提交本地事务的执行结果。执行结果可能是 Commit(提交)或 Rollback(回滚)。

    • 如果 Broker 收到的消息状态是 Commit(提交),则 Broker 将半消息标记为可投递,并将消息投递给消费者。

    • 如果 Broker 收到的消息状态是 Rollback(回滚),则 Broker 回滚事务,并且不将半消息投递给消费者。

  5. 如果网络断开或生产者应用程序重新启动,并且 Broker 未收到第二次 ACK 或半消息的状态为 Unknown(未知),则 Broker 会等待一段时间,然后向生产者集群中的某个生产者发送请求,查询半消息的状态。注意有关该时间段的长度和最大查询次数的更多信息,请参阅参数限制

  1. 生产者收到请求后,会检查与半消息对应的本地事务的执行结果。

  2. 生产者根据本地事务的执行结果向 Apache RocketMQ Broker 发送另一个 ACK。然后,Broker 按照步骤 4 处理半消息。

事务消息的生命周期 事务消息

  • 初始化:消息由生产者构建和初始化,并准备好发送到 Broker。

  • 事务挂起:半消息被发送到 Broker。但是,它不会立即写入磁盘进行永久存储。相反,它存储在事务存储系统中。直到系统验证本地事务的第二阶段成功后,消息才会被提交。在此期间,消息对下游消费者不可见。

  • 回滚:在第二阶段,如果事务的执行结果是回滚,Broker 会回滚半消息并终止工作流程。

  • 就绪:消息已发送到 Broker,并对消费者可见且可供消费。

  • 传输中:消息由消费者获取并根据消费者的本地业务逻辑进行处理。

    在此过程中,Broker 等待消费者完成消费并提交消费结果。如果在一定时间内未收到消费者的响应,Apache RocketMQ 会重试该消息。有关更多信息,请参阅消费重试

  • 已确认:消费者完成消费并将消费结果提交给 Broker。Broker 标记当前消息是否成功消费。

    默认情况下,Apache RocketMQ 保留所有消息。当提交消费结果时,消息数据被逻辑地标记为已消费,而不是立即删除。因此,在消息因保留期过期或存储空间不足而被删除之前,消费者可以回溯消息以进行重新消费。

  • 已删除:当消息的保留期过期或存储空间不足时,Apache RocketMQ 会以滚动方式从物理文件中删除最早保存的消息。有关更多信息,请参阅消息存储与清理

使用限制

消息类型一致性

事务消息只能用于 `MessageType` 为 `Transaction` 的 Topic。

以事务为中心的消费

Apache RocketMQ 的事务消息功能保证了本地核心事务与下游分支之间可以处理相同的事务。但是,它不保证消息消费结果与上游执行结果之间的一致性。因此,下游业务必须确保消息得到正确处理。我们建议消费者正确进行消费重试,以确保在发生故障时消息得到正确处理。

中间状态可见性

Apache RocketMQ 的事务消息功能只保证最终一致性,这意味着在消息投递给消费者之前,上游事务和下游分支之间的状态一致性无法保证。因此,事务消息仅适用于接受异步执行的事务场景。

事务超时机制

Apache RocketMQ 为事务消息实现了超时机制。Broker 收到半消息后,如果在一定时间内无法确定是提交还是回滚事务,则 Broker 默认回滚该消息。有关超时期的更多信息,请参阅参数限制

示例

创建 Topic

在 Apache RocketMQ 5.0 中创建 Topic 建议使用 mqadmin 工具。但是,值得注意的是,消息类型需要作为属性参数添加。这是一个示例

sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=Transaction

发送消息

发送事务消息与发送普通消息在以下方面有所不同

  • 在发送事务消息之前,必须启用事务检查器并将其与本地事务执行关联。

  • 创建生产者时,必须设置事务检查器并绑定要发送的消息的 Topic 列表。这些操作使客户端的内置事务检查器能够在发生异常时恢复 Topic。

创建 TRANSACTION Topic

NORMAL Topic 不支持投递 TRANSACTION 消息,如果您向 NORMAL Topic 发送 TRANSACTION 消息,将会收到错误。

./bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster -a +message.type=TRANSACTION
  • -c 集群名称
  • -t Topic 名称
  • -n NameServer 地址
  • -a 额外属性,我们添加一个 message.type 属性,值为 TRANSACTION 以支持投递 TRANSACTION 消息。

以下示例使用 Java 作为示例,向您展示如何发送事务消息

    // The demo is used to simulate the order table query service to check whether the order transaction is submitted. 
private static boolean checkOrderById(String orderId) {
return true;
}
// The demo is used to simulate the execution result of a local transaction.
private static boolean doLocalTransaction() {
return true;
}
public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
// Build a transaction producer: The transactional message requires the producer to build a transaction checker to check the intermediate status of an exceptional half message.
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* The transaction checker checks whether the local transaction is correctly committed or rolled back based on the business ID, for example, an order ID.
* If this order is found in the order table, the order insertion action is committed correctly by the local transaction. If no order is found in the order table, the local transaction has been rolled back.
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// Message error. Rollback is returned.
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
// Create a transaction branch.
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
// If the transaction branch fails to be created, the transaction is terminated.
return;
}
Message message = messageBuilder.setTopic("topic")
// Specify the message index key so that the system can use a keyword to accurately locate the message.
.setKeys("messageKey")
// Specify the message tag so that consumers can use the tag to filter the message.
.setTag("messageTag")
// For transactional messages, a unique ID associated with the local transaction is created to verify the query of the local transaction status.
.addProperty("OrderId", "xxx")
// Message body.
.setBody("messageBody".getBytes())
.build();
// Send a half message.
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
// If the half message fails to be sent, the transaction can be terminated and the message is rolled back.
return;
}
/**
* Execute the local transaction and check the execution result.
* 1. If the result is Commit, deliver the message.
* 2. If the result is Rollback, roll back the message.
* 3. If an unknown exception occurs, no action is performed until a response is obtained from a half message status query.
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// You can determine whether to retry the message based on your business requirements. If you do not want to retry the message, you can use the half message status query to submit the transaction status.
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// We recommend that you record the exception information. This enables you to submit the transaction status based on the half message status query in the event of a rollback exception. Otherwise, you have to retry the message.
e.printStackTrace();
}
}
}

使用注意事项

避免大量半消息超时。

Apache RocketMQ 允许您在事务提交期间发生异常时检查事务,以确保事务一致性。但是,生产者应尽量避免本地事务返回未知结果。大量的事务检查会导致系统性能下降并延迟事务处理。

正确处理进行中的事务。

在半消息状态查询期间,请勿对进行中的事务返回 Rollback 或 Commit。相反,请保持事务的 Unknown 状态。

通常,事务正在进行的原因是事务执行缓慢且查询频繁。建议采用两种解决方案

  • 将第一次查询的间隔设置为较大的值。但是,这可能会导致依赖查询结果的消息出现较大的延迟。

  • 使程序正确识别进行中的事务。