RocketMQ简介
为什么使用
系统的耦合性越高,容错性就越低。以电商为例,用户创建完订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个系统出现故障或者因为升级等原因暂时不可用,都回造成下单的异常,影响用户的体验。
应用系统如果遇到系统请求流量瞬间猛增,有可能会将系统压垮。如果有消息队列,遇到此情况,可以将大量请求存储起来,将一瞬间的峰值请求分散到一段时间进行处理,这样可以大大提高系统的稳定性
用户调用一个接口的时候,可能该接口调用了别的方法。例如:用户注册的时候,后台可能需要调用:查询数据库,插入数据库,发送邮件等等…
但是用户可能并不需要后台将所有的任务执行完毕,那么此时在初入数据口后面加入MQ,用户就能很快得到注册成功的响应而去做一些别的事情。mq的机制又能保证最终的一致性,所以使用起来很安全很稳定。
RocketMQ作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
Apache Alibaba RocketMQ 是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。
订单操作:打车 司机+乘客
乘客下单(消息的提供者),司机接单(消息的消费者)–点对点
公众号:发布订阅模式
公众号发布一个消息,订阅该公众号的都可以收到消息
更新通知:广播模式
削峰填谷: 主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
异步处理(在注册的同时,异步去处理短信的发送与邮件的发送)
系统解耦: 解决不同重要程度、不同能力级别系统之间依赖导致一死全死
提升性能: 当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
蓄流压测: 线上有些链路不好压测,可以通过堆积一定量消息再放开来压测
RocketMQ特点
- RocketMQ是一款分布式、队列模型的消息中间件,支持严格的消息顺序,支持topic与Queue两种模式,有亿级消息堆积能力,同时支持pull和push方式的消费消息
优势
目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:
- 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
- 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
- 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
- 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
- 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
- 支持重复消费(RabbitMQ 不支持,Kafka 支持)
性能
- 吞吐量
- 实效性,多少时间内能处理的有效的请求数
- 消息的可靠性
简单使用
版本说明
RocketMQ安装包下载
进入到xshell中 cd /home ls 查看rocket压缩包 使用 unzip rocketmq-all-4.7.1-bin-release.zip 解压
解压文件的时候说没有zip的命令 安装命令: yum install zip #提示输入时,请输入y; 安装命令:yum install unzip #提示输入时,请输入y;
cd rocketmq-all-4.7.1-bin-release cd bin/ ls
|
nohup ./bin/mqnamesrv & 查看进程 netstat -ntlp 看到有9876端口就说明启动成功了
|
- 启动docker,需要编辑配置文件,修改JVM内存位置,默认的内存可能过大,超过JVM内存大小
cd bin/ vi runserver.sh
xmn 新生代的内存大小 xmx 堆区最大的内存大小 xms 堆区的初始值大小
--------- cd bin/ vi runbroker,sh
---------启动broker nohup ./mqbroker -n localhost:9876 &
|
----消息发送 cd bin/ export NAMESRV_ADDR=localhost:9876 ./tools.sh org.apache.rocketmq.example.quickstart.Producer
---消息接收 cd bin/ export NAMESRV_ADDR=localhost:9876 ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
---关闭RocketMQ ./bin/mqshutdown broker ./bin/mqshutdown namesrv
|
Java实现步骤
实现消息发送
---导入pom依赖 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
---生产消息 import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; public class MsgProvider { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); producer.setNamesrvAddr("192.168.106.130:9876"); producer.start(); Message message = new Message("myTopic","myTag",("Test MQ").getBytes()); SendResult result = producer.send(message, 6000); System.out.println(result); producer.shutdown(); } } ---启动
|
实现消息消费
-----pom.xml
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> ---- @Slf4j public class MsgConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group"); consumer.setNamesrvAddr("192.168.106.130:9876"); consumer.subscribe("myTopic","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { log.info("Message=>{}",new String(list.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
|
springboot整合rocketMQ
provider
- 创建springboot项目整合模块boot-mq-provider生产者
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
|
rocketmq: name-server: 192.168.248.129:9876 producer: group: myprovider server.port=8080
|
@Data @AllArgsConstructor @NoArgsConstructor public class Order { private Integer id; private String buyerName; private String buyerTel; private String address; private Date createDate; }
|
@RestController public class ProviderController {
@Autowired private RocketMQTemplate rocketMQTemplate;
@GetMapping("/sendMsg") public Order sendMsg(){ Order order = new Order( 1, "阿松大", "123123", "软件园", new Date() ); this.rocketMQTemplate.convertAndSend("orderTopic",order); return order; }
}
|
consumer
- 创建springboot项目模块boot-mq-consumer消费者
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
|
rocketmq: name-server: 192.168.248.129:9876 server.port=8081
|
- 创建service,把provider订单实体类放到consumer
@Slf4j @Service @RocketMQMessageListener(consumerGroup = "myConsumer",topic = "orderTopic") public class ConsumerService implements RocketMQListener<Order> { @Override public void onMessage(Order order) { log.info("新订单{},发短信",order);
System.out.println("需要执行减库存的操作....."); } } }
|