RocketMQ教程
介绍
RocketMQ 是阿里巴巴开源的分布式消息中间件。目前已经成为Apache中的开源基金会项目。它的同类产品有RabbitMQ、Kafka、ActiveMQ。
RocketMQ 支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue
等。系统组成则由Producer、Consumer、Broker、NameServer
等。
为什么选择RocketMQ?
- 是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式等特点
- Producer、Consumer、队列都可以分布式
- Producer 向一些队列轮流发送消息,队列集合称为 Topic,Consumer 如果做广播消费,则一个 Consumer 实例消费这个 Topic 对应的所有队列,如果做集群消费,则多个 Consumer 实例平均消费这个 Topic 对应的队列集合
- 能够保证严格的消息顺序
- 支持拉(pull)和推(push)两种消息模式
- 高效的订阅者水平扩展能力
- 实时的消息订阅机制
- 亿级消息堆积能力
- 支持多种消息协议,如 JMS、OpenMessaging 等
- 较少的依赖
关于消息队列优点
在项目中为什么要使用消息队列?
目前大部分项目中都有三种应用处理:
解耦(耦合性指的就是个模块之间的关联程度。耦合性越低模块的独立性越强)、耗时方面、削峰方面(通过技术手段减少峰值访问的压力)
当然还有其他的,这里就不做过多说明了。
解耦方面
传统模式设计:
很明显,传统模式设计下,系统间耦合性太强(耦合性指的就是个模块之间的关联程度。耦合性越低模块的独立性越强),如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!当然不可否认的是,目前绝大部分中小项目都是这种设计,因为它设计简单易懂省事。
中间件模式:
使用中间件模式,将系统BCD的要求通过消息形式写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改,这种设计显然更加先进。
耗时方面
传统模式设计:
使用传统模式设计,一些非必要的业务逻辑以同步的方式运行,太耗费时间。
中间件模式:
使用中间件模式,将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度。
削峰方面
传统模式设计:
使用传统模式设计,并发量大的时候,所有的请求直接一个一个怼到数据库,造成数据库连接异常。
中间件模式:
使用中间件模式,系统A慢慢的按照数据库能处理的并发量,通过用户传来的消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
关于消息队列缺点
上面讲述了使用消息队列的优点好处。当然作为好东西,不可能完全完美,那自然也会存在缺点。
一个使用了消息队列 的项目,如果连具体问题都没有考虑过,就把把消息队列组件引进去了,那就给自己的项目带来了风险。我们引入一个技术,要对这个技术的弊端有充分的认识,才能做好预防。要记住,不要给公司挖坑,当然面对要随时要跑路的黑心公司除外!
除了这个以外,还要考虑很多系统处理方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,引入信息队列会让项目系统复杂性直线增加。
RocketMQ核心以及概念
讲了这么多消息队列的方面内容,那么回归正题RocketMQ,说说RocketMQ的核心概率。
RocketMQ 在任何一个环境都是可扩展的,生产者(Producer)必须是一个集群,消息服务器(Name Server)必须是一个集群,消费者(Consumer)也同样。
集群级别的高可用,是消息队列 RocketMQ 跟其他的消息服务器的主要区别。
消息生产者发送一条消息到消息服务器,消息服务器会随机的选择一个消费者,只要这个消费者消费成功就认为是成功了。
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。
Producer :负责生产消息,一般由业务系统负责生产消息,简单形象点来说就是 买票时创建买票的订单。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,而单向发送不需要。
Broker :消息中转角色,负责存储消息、转发消息,简单形象点来说就是 存储用户创建买票的订单的仓库。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Consumer :负责消费消息,一般是后台系统负责异步消费,简单形象点来说就是 从买票的订单仓库后台处理订单进行买票。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。
Topic:表示一类消息的集合,简单形象点来说就是将同类买票的订单归为一个集合。每个Topic(主题)包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。
除了这些以外,还有Name Server,即代理服务器。
Name Server:名称服务充当路由消息的提供者,简单形象点来说 就是类似于服务注册服务器。生产者或消费者能够通过名字服务查找定位各Topic(主题)相应的Broker IP列表。多个Name server实例组成集群,但相互独立,没有信息交换。
上面差不多就是其RocketMQ全部的关键部分组成简绍了。
总体来说 整个使用RocketMQ的流程就是:
Consumer生成消息 -》 Consumer通过NameServer归类到对应Topic -》Topic存储在Broker中 -》Consumer 通过NameServer调取到对应Topic 所在的Broker -》Consumer 依次消费消息
RocketMQ搭建使用
要记住RocketMQ是由集群模式下进行的,在实际生成环境下是分布式模式搭建的,而不是单机搭建。
在安装RocketMQ之前我们先来理解Group概念,在RocketMQ中Group是很重要的。通过Group机制,让RocketMQ天然的支持消息负载均衡!比如某个Topic有9条消息,其中一个Consumer Group有3个实例(3个进程 OR 3台机器),那么每个实例将均摊3条消息!(注意RocketMQ只有一种模式,即发布订阅模式。)如下图所示:
与众多集群一样它的模式类似于Mysql的主从机制。
- 单Master模式:无需多言,一旦单个broker重启或宕机,一切都结束了!很显然,线上不可以使用。
- 多Master模式:全是Master,没有Slave。当然,一个broker宕机了,应用是无影响的,缺点在于宕机的Master上未被消费的消息在Master没有恢复之前不可以订阅。
- 多Master多Slave模式(异步复制):多对Master-Slave,高可用!采用异步复制的方式,主备之间短暂延迟,MS级别。Master宕机,消费者可以从Slave上进行消费,不受影响,但是Master的宕机,会导致丢失掉极少量的消息。
- 多Master多Slave模式(同步双写):和上面的区别点在于采用的是同步方式,也就是在Master/Slave都写成功的前提下,向应用返回成功,可见不论是数据,还是服务都没有单点,都非常可靠!缺点在于同步的性能比异步稍低。
搭建操作
我们将要搭建一个简易的RocketMQ集群,它包含1个nameserver,两个master(Broker服务器)。至于Producer和Consumer服务器集群则普通的集群服务,这里不做简绍。
下载RocketMQ
首先我们下载RocketMQ官方版本:
下载官方二进制二进制压缩包:https://dlcdn.apache.org/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip ,然后解压。
部署NameServer服务器
由于RocketMQ由JAVA语言编辑而来,务必确保安装JAVA,并且正确配置JAVA环境。
–Windows操作:–
由于其默认JAVA配置大小为默认2G,对于我们来说明显大了,所以我们需要先修改内存使用大小,用编辑器修改一下runserver.cmd
脚本:
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
使用Windows进行部署前,需要将其RocketMQ文件夹配置到系统变量下。
变量名:ROCKETMQ_HOME
变量值:MQ解压路径\MQ文件夹名
然后启动Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqnamesrv.cmd’,启动NAMESERVER。成功后会弹出提示框,此框勿关闭。
– Linux操作:–
先使用Vim进行编辑MQ文件夹下的/bin/runserver.sh 文件,因为Name Server 默认配置Xmx=4G ,对于小型Linux服务器来说应该进行修改。这里我们修改为1GB。
vi /bin/runserver.sh
JAVA_OPT="${JAVA_OPT}-server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
修改后:
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
后台启动NameServer,并动态查看启动状态:
nohup sh bin/mqnamesrv -n "150.205.110.91:9876" > /dev/null 2>&1 &
tail -f ~/logs/rocketmqlogs/namesrv.log
这里的的 启动时 -n 后为启动IP即端口,对于本地的话,可以使用127.0.0.1。
部署Broker服务器
由于RocketMQ由JAVA语言编辑而来,务必确保安装JAVA,并且正确配置JAVA环境。
配置RocketMQ的Broker服务器需要设置其Broker配置文件,用来进行确定其Broker配置。我们需要2台Broker服务器,所以我们需要准备2个Broker配置文件。
好在我们下载的二进制RocketMQ中的Conf文件夹中有一系列的测试配置,我们直接使用其 2m-noslave
文件夹下的broker-a.properties
和broker-b.properties
配置即可。
–Windos操作:–
与NameServer一样,我们需要修改它的默认内存使用大小,使用编辑器修改runbroker.cmd文件:
rem set "JAVA_OPT=%JAVA_OPT% -server -Xms2g -Xmx2g"
set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx512m"
然后 Cmd命令框执行进入至‘MQ文件夹\bin’下,然后执行‘start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/2m-noslave/broker-a.properties’,启动BROKER-A。
要在同一台电脑上进行学习的话,我们需要修改下另一个Broker服务器配置,新增额外内容:
#存储路径
storePathRootDir=~/logs/rocketmqstore
#commitLog 存储路径
storePathCommitLog=~/logs/rocketmqstore/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=~/logs/rocketmqstore/consumequeue
#消息索引存储路径
storePathIndex=~/logs/rocketmqstore/index
#checkpoint 文件存储路径
storeCheckpoint=~/logs/rocketmqstore/checkpoint
#abort 文件存储路径
abortFile=~/logs/rocketmqstore/abort
#端口设置
listenPort=10950
因为RocketMQ中自带的配置文件时用于多台主机的集群搭建的,所以自然不会将其存储路径固定,端口也都是默认的10911。在单机下进行搭建必须要进行修改,否则启动会报出RocketMq Lock failed,MQ already started
错误,表明端口 以及路径被占用。
然后再执行‘start mqbroker.cmd -n 127.0.0.1:9876 -c ../conf/2m-noslave/broker-b.properties’,启动BROKER-B。
–Linux操作:–
与NameServer一样,它的默认JAVA配置Xmx= 8G,我们需要进行修改:
调整java opt:
vi /bin/runbroker.sh
JAVA_OPT="${JAVA_OPT}-server -Xms8g -Xmx8g -Xmn4g"
调整后:
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
后台启动:
nohup sh bin/mqbroker -n 10.200.110.91:9876 -c ./conf/2m-noslave/broker-a.properties &
在另外一台服务器上 再启动一个Broker服务器:
nohup sh bin/mqbroker -n 10.200.110.91:9876 -c ./conf/2m-noslave/broker-b.properties & tail -f ~/logs/rocketmqlogs/broker.log-n ip:9876
注意其中两个配置文件中brokerName不同。所以在自定义配置Broker配置时,brokerName不能相同。
关闭RocketMQ服务
对于Windos来说关闭服务很简单,ctrl+C即可,当然也可以使用mqshutdown.cmd + broker/namesrv 方式来进行关闭。
对于Linux的话,使用其bin下内置的mqshutdown来进行关闭。
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
控制台搭建
RocketMQ和其他分布式队列组件一样,也有自己的控制台产品,那就是Rocket-console,它是用RocketMQ社区来进行在Github维护的。
目前Rocket-console已经被重命名为**rocketmq-dashboard**。
我们需要克隆一个Rocketmq-dashboard项目到本地来进行部署:
git clone https://github.com/apache/rocketmq-dashboard.git
克隆后,我们进入项目文件下的src\main\resources’文件夹,打开‘application.properties’进行相关配置。
...
# 绑定控制台的地址以及端口
server.address=127.0.0.1
server.port=8083
...
# 配置NameServer地址
rocketmq.config.namesrvAddr=127.0.0.1:9876
...
不管是在Windos下还是Linux下,使用Maven进行打包然后运行:
mvn clean package -Dmaven.test.skip=true
java -jar rocketmq-console-ng-1.0.0.jar &
当然如果在Windos界面下也可以直接在一些IDE直接启动即可。
启动成功后在浏览器中输入配置中的访问地址,即可加入RocketMQ控制台。
点击 集群 ,我们也可以轻松看见当前的Broker服务器状态:
RocketMQ操作
下面介绍RocketMQ关键操作
验证功能
RocketMQ自带了恬送与接收消息的脚本tools.cmd
,它用来验证RocketMQ的功能是否正常。
tool.cmd
脚本需要带参数执行,无法用简单的双击方式启动。因此,我们打开一个cmd窗口,并跳转到bin目录下。
启动消费者
与mqbroker.cmd
脚本类似,启动tool.cmd
命令之前我们要指定NameServer地址。
这里我们采用命令方式指定,并启动消费者。我们需要将其sh或者cmd进入其RocketMQ下的bin文件夹中,依次执行如下命令:
set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Consumer
# Linux下使用tools.sh
tools.sh org.apache.rocketmq.example.quickstart.Consumer
其中org.apache.rocketmq.example.quickstart.Consumer
是RocketMQ下的一个测试 消费者类,就是用来进行测试学习使用的。
启动生产者
操作同上类似。
set NAMESRV_ADDR=localhost:9876
tools.cmd org.apache.rocketmq.example.quickstart.Producer
# Linux下使用tools.sh
tools.sh org.apache.rocketmq.example.quickstart.Producer
执行成功后,生产者会发送1000个消息,然后自动退出。
此时,在消费者界面按下Ctrl + C
,就会收到刚刚生产者发出的消息。
我们也看通过RocketMQ控制台看见其消费情况。
到这儿,RocketMQ最小应用己经可以正常工作,能满足我们开发环境下调试代码的需求。
项目中使用
上面验证功能只是在学习和测试中进行使用,作为开发者,需要在其项目中集成RocketMQ组件功能,实现其信息队列功能的话,需要在项目中进行实现消费者、生产者。
导入Maven依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
生产者发送端
作为生产者端 ,需要配置以下内容在SpringBoot配置中:
spring
rocketmq
# 绑定name Server服务器地址
name-server: localhost:9876
# 定义生产者发布 组名
producer
group: my-group1
# 定义要发送的信息的主题集合
topic: string-topic
发送信息操作
@Autowired
private RocketMQTemplate rocketMQTemplate;
// 注入定义的主题-topci
@Value("spring.rocketmq.topic")
private String topic;
/**
* 发送简单的MQ消息
* @param msg
* @return
*/
@GetMapping("/send")
public Results send(String msg) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
...
// 如下两种方式等价
rocketMQTemplate.convertAndSend(topic, "Hello, World!");
rocketMQTemplate.send(topic, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
// 第三个参数为key
rocketMQTemplate.syncSend(topic, "Hello, World! I'm from simple message", "18122811143034568830");
// topic: ORDER,tag: paid, cacel
rocketMQTemplate.convertAndSend(topic, "Hello, World!");
rocketMQTemplate.convertAndSend(topic, "Hello, World!");
// 消息体为自定义对象
rocketMQTemplate.convertAndSend(topic, new OrderPaidEvent("T_001", new BigDecimal("88.00")));
// 发送延迟消息
rocketMQTemplate.sendDelayed(topic, "I'm delayed message", MessageDelayLevel.TIME_1M);
// 发送即发即失消息(不关心发送结果)
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload("I'm one way message").build());
// 发送顺序消息
rocketMQTemplate.syncSendOrderly(topic, "I'm order message", "1234");
...
}
RocketMQTemplate是rocketMQ-SpringBoot-starter这个依赖直接封装好了的工具,直接使用它来进行发送、接受等操作。
消费者端
作为消费者端 ,需要配置以下内容在SpringBoot配置中:
spring:
rocketmq:
name-server: 127.0.0.1:9876
接受消费消息操作
不同于生产者,使用消费者时,需要随时接受到对应的消息载体,所以需要使用监听器类。
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer implements RocketMQListener<String> {
public void onMessage(String message) {
log.info("received message: " + message);
}
}
RocketMQ消费者监听器实现RocketMQListener类,它接受一个泛型,取决于接受的消息载体类型。通过复写onMessage方法来实现其接受消息的功能。
其中@RocketMQMessageListener中主要注解,它定义了对应的topic集合名,用于寻找对应的Broker,来依次获取里面的消息。而consumerGroup则为其消费者的组定义。
除了这样使用,还可以专门定位到某个RocketMQ集群上:
@RocketMQMessageListener(nameServer = "127.0.0.1:9877", instanceName = "tradeCluster", topic = "test-topic-3", consumerGroup = "my-consumer_test-topic-3")
以及定义其对应的消费模式和重试次数:
// ConsumeMode为消费模式,ConsumeMode.ORDERLY为按顺序消费
// 配置重试次数 reconsumeTimes = -1 代表一直重试
@RocketMQMessageListener(topic = "test-topic-4", consumerGroup = "my-consumer_test-topic-6",
consumeMode = ConsumeMode.ORDERLY, reconsumeTimes = -1)
它的所有说明属性如下:
/**
* Consumer 所属消费者分组
*
* Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
* load balance. It's required and needs to be globally unique.
*
* See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
*/
String consumerGroup();
/**
* 消费的 Topic
*
* Topic name.
*/
String topic();
/**
* 选择器类型。默认基于 Message 的 Tag 选择。
*
* Control how to selector message.
*
* @see SelectorType
*/
SelectorType selectorType() default SelectorType.TAG;
/**
* 选择器的表达式。
* 设置为 * 时,表示全部。
*
* 如果使用 SelectorType.TAG 类型,则设置消费 Message 的具体 Tag 。
* 如果使用 SelectorType.SQL92 类型,可见 https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/ 文档
*
* Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
*/
String selectorExpression() default "*";
/**
* 消费模式。可选择并发消费,还是顺序消费。
*
* Control consume mode, you can choice receive message concurrently or orderly.
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
/**
* 消息模型。可选择是集群消费,还是广播消费。
*
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
/**
* 消费的线程池的最大线程数
*
* Max consumer thread number.
*/
int consumeThreadMax() default 64;
/**
* 消费单条消息的超时时间
*
* Max consumer timeout, default 30s.
*/
long consumeTimeout() default 30000L;
事务消费
关于RocketMQ-SpringBoot-Starter默认的消费模式是 按顺序消费,意思就是 “通过消息的发送顺序来进行依次消费处理”,这一种默认的消费模式满足大部分设计要求,但是它对于 一些重要业务来说则显得不足。
一款买票软件,它的买票服务通常逻辑是这样的:订票 -》选择票类型-》生成订单-》支付-》完成购票。
这种涉及到钱的操作一般设计就要十分严谨,如果不使用事务处理的话,当用户订票未付款就会发生订单一直保留,这样不满足原子性(要么同时成功要么全部失败)。
所以我们来使用RocketMQ进行事务消费操作:
生产者:
@Component
public class SpringTransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送消息
* @param topic
* @param msg
*/
public void sendMsg(String topic, String msg) {
/**
* 这里的Message不是rocketmq.commen的
* 是springframework的接口
* /
Message message = MessageBuilder.withPayload(msg).build();
/**
* myTransactionGroup要和@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")定义的Group一致
* 消息会通过TransactionGroup找到事务消费者、通过topic普通消费者 只有事务消费者commit 普通消费者的结果才会执行
* /
this.rocketMQTemplate.sendMessageInTransaction("myTransactionGroup", topic, message, null);
System.out.println("发送消息成功");
}
}
这里使用了RocketMQTemplate的sendMessageInTransaction方法进行发送事务消息。
这里它第一个参数需要一个Transaction组参数,用于存放在其业务组下,在消费者中根据TransactionGroup进行额外操作。
我们还要在**生产者(这里是生产者!)**中创建一个RocketMQ业务监听器:
@RocketMQTransactionListener(txProducerGroup = "myTransactionGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
/**
* 可以定义一个static final的Map 用来保存返回unknown要回查消息的一些属性 那么所有对象都可以获取该消息回滚前的一些信息
* private static Map<String, Object> INFO_MAP = new HashMap<>();
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
/**
* 该Message是springframework包下的 其获取事务消息的唯一id的方法
* String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
* /
try {
System.out.println("执行操作1");
Thread.sleep(500);
System.out.println("执行操作2");
Thread.sleep(800);
if(...) return LocalTransactionState.COMMIT_MESSAGE
if(...) return LocalTransactionState.ROLLBACK_MESSAGE
//如果在检查事务时数据库出现宕机可以让broker过一段时间回查 和return null 效果相同
if(...) return LocalTransactionState.UNKNOW
} catch (Exception e) {
e.printStackTrace();
/**
* 回滚
*
* 可以在该处给 INFO_MAP放一些信息 以便会查时调用
* INFO_MAP.put(transId,...);
* /
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
/**
* 只去返回commit或者rollback
*
* 可以用INFO_MAP取得一些回滚前的信息
* String transId = (String)message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
* INFO_MAP.get(transId);
* /
}
}
其中RocketMQ业务监听器需要实现RocketMQLocalTransactionListener,其中包含两个重要方法:
executeLocalTransaction:用来执行本地事务,用来执行业务逻辑的地方,返回事务运行状态。
checkLocalTransaction:只是检测事务运行状态,不做处理,处理事务回查的代码部分。
其中LocalTransactionState.COMMIT_MESSAGE
为本地事务提交,代表事务过程中没有错误,消费者可以消费到该值。
LocalTransactionState.ROLLBACK_MESSAGE
为本地事务回滚,代表事务过程中出错,该消息将被删除,消费者无法消费该消息。
LocalTransactionState.Unknown
: 中间状态,它代表需要检查消息队列来确定状态,最终决定这个消息的是checkLocalTransaction这个方法。
消费者的内容基本不变。
RocketMQ它的具体事务逻辑就是 生产者发送消息成功接受后,生产者执行RocketMQ事务监听器中的executeLocalTransaction方法,根据返回的状态,来决定消息是否被消费者消费。
如果状态为Unknown则为待定,那么RocketMQ会每隔一段时间调用一次checkLocalTransaction方法,来决定这个消息的最终归宿。至于这个时间取决于Broker配置文件中的transactionCheckInterval
,它的默认值是60*1000,也就是1分钟。其中还有transactionCheckMax
,它规定了最大次数,超过这个次数,如果还返回UNKNOW,这个消息将被删除。
RocketMQ的事务简单来说就是,发送后检查是否将其发送前的业务完成,最终确认的操作。
这而举个例子:
CREATE TABLE `s_term` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`term_year` year(4) NOT NULL ,
`type` int(1) NOT NULL DEFAULT '1' ,
PRIMARY KEY (`id`)
)
这是一张订单表(假如)
那么我们的生产者在创建订单后将其发送消息,提醒消费者去进行核对表,然后购买…等等操作。
@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
Term term = new Term();
term.setTermYear(2020);
term.setType(1);
int insert = termMapper.insert(term);
Message message = MessageBuilder.withPayload(term).build();
rocketMQTemplate.sendMessageInTransaction("myTransactionGroup", topic, message, null);
}
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
Integer termId = (Integer)arg;
Term term = termMapper.selectById(termId);
System.out.println("executeLocalTransaction termId="+termId+" term:"+term);
if (term != null) return COMMIT_MESSAGE;
return LocalTransactionState.UNKNOW;
}
如果前置订单创建成功这儿就会返回COMMIT_MESSAGE,该消息即可被消费者进行消费处理;
事务消费的优化
上面的事务消费通常是正常的写法,但是它存在一个问题,那就是消息消费不对等。
当我们在生产者中发送消息后报出操作,因为Spring的@Transactional存在,那么就会导致所有的该方法内所有内容滚回。
但是生产者在发送消息后就自动会执行事务监听器的executeLocalTransaction方法,从而导致消息依旧可能被发送出去,并且被消费者消费…而该业务方法后抛错,导致@Transactional滚回。从而就出现了消息发送了并被消费,而订单却没被提交的情况。。。
对此我有个方法修护这个问题。
那就是将其executeLocalTransaction中的内容不要,直接返回LocalTransactionState.UNKNOW
;
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
return LocalTransactionState.UNKNOW;
}
由于是UNKNOW自然 就会等待周期性的触发checkLocalTransaction方法,然后进行核查。这样当业务方法发送操作后,其抛出错误,依旧可以根据周期性的触发checkLocalTransaction方法来进行 消息滚回,放置消息被发送出去。
但是这也有个缺点,那就是会造成消息业务处理的速度降低,因为它需要等待周期性的触发checkLocalTransaction方法。
关于消费者组
我们在设置消费者服务器监听器时,在其@RocketMQMessageListener
中往往需要加上一个自定义的consumerGroup
这个参数。
它的作用其实是分流处理,相对于管道,3个不同的consumerGroup就有3个管道,1条消息就会同时被3个consumerGroup的消费者消费。
当多个同业务消费者 定义为同一个 consumerGroup下,那么消息会被平均分摊处理,比如发送4条消息,2个消费者会分别处理2个消息。
一般情况下,我们建议一个消费者分组,仅消费一个 Topic 。,比如做订单处理的,消费者就在订单消费者组中。这样做会有两个好处:
- 每个消费者分组职责单一,只消费一个 Topic 类型,不干其他活。
- 每个消费者分组是独占一个线程池,这样能够保证多个 Topic 隔离在不同线程池,保证隔离性,从而避免一个 Topic 消费很慢,影响到另外的 Topic 的消费。