消息队列

场景

异步处理:用户注册后,发送注册邮件和注册短信。用户注册完成后,提交任务到 MQ,发送模块
并行获取 MQ 中的任务。

系统解耦:比如用注册完成,再加一个发送微信通知。只需要新增发送微信消息模块,从 MQ 中读取任务,发送消息即可。无需改动注册模块的代码,这样注册模块与发送模块通过 MQ 解耦。

流量削峰:秒杀和抢购等场景经常使用 MQ 进行流量削峰。活动开始时流量暴增,用户的请求写入MQ,超过 MQ 最大长度丢弃请求,业务系统接收 MQ 中的消息进行处理,达到流量削峰、保证系统可用性的目的。

日志处理:日志采集方收集日志写入 kafka 的消息队列中,处理方订阅并消费 kafka 队列中的日志数据。

消息通讯:点对点或者订阅发布模式,通过消息进行通讯。如微信的消息发送与接收、聊天室等。

特点

优点
见场景

缺点
复杂度提升,可用性降低,要考虑一致性(事务)问题

原理

Kafka
有offset概念

高可用

ActiveMQ
Master-Slave 部署方式主从热备,方式包括通过共享存储目录来实现(shared filesystem Master￾Slave)、通过共享数据库来实现(shared database Master-Slave)、5.9版本后新特性使用ZooKeeper 协调选择 master(Replicated LevelDB Store)。
Broker-Cluster 部署方式进行负载均衡。

RabbitMQ
单机模式与普通集群模式无法满足高可用,镜像集群模式指定多个节点复制 queue 中的消息做到高可用,但消息之间的同步网络性能开销较大。

RocketMQ
有多 master 多 slave 异步复制模式和多 master 多 slave 同步双写模式支持集群部署模式。
Producer 随机选择 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Master 发送心跳,只能将消息发送到 Broker master。
Consumer 同时与提供 Topic 服务的 Master、Slave 建立长连接,从 Master、Slave 订阅消息都可以,订阅规则由 Broker 配置决定。

Kafka
由多个 broker 组成,每个 broker 是一个节点;topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 存放一部分数据,这样每个 topic 的数据就分散存放在多个机器上的。
replica 副本机制保证每个 partition 的数据同步到其他节点,形成多 replica 副本;所有 replica 副本会选举一个 leader 与 Producer、Consumer 交互,其他 replica 就是 follower;写入消息leader 会把数据同步到所有 follower,从 leader 读取消息。
每个 partition 的所有 replica 分布在不同的机器上。某个 broker 宕机,它上面的 partition 在其他节点有副本,如果有 partition 的 leader,会进行重新选举 leader。

幂等性

一次消费和多次消费影响一致。

解决:
消息体加入业务唯一标识key、或者全局messageId,消费方根据该key查询持久化文件看是否消费过。
可通过组合属性保证唯一:就比如商品退款的时候,通过子单号+退款来源类型+商品id+商品价格+商品退款金额等属性来保证唯一性。
更新:cas操作,传入condition和target。
插入:通过唯一索引保证只插一条,或写redis的set里。

可靠性(丢失问题)

RabbitMQ
是默认保存再内存的,如果不设置持久化的话,一旦重启数据就没了;要开启持久化必须交换器、队列、message持久化发送
生产者->MQ
1.开启事务,但会变成同步阻塞操作,channel.txSelect() channel.txRollback()
2.开启confirm模式(异步),channel.confirm() ack(String messageId),nack(String messageId)
MQ
queue必须设置为持久化的,发消息的时候deliveryMode要设置为2,两个条件必须同时开启;
另外可以结合生产者的confir。
MQ->消费者
关闭自动ack,开启手动ack。

Kafka
生产者->MQ
request.required.acks 有三个值 0 1 -1
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候
就会丢数据
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他
不确保是否复制完成新 leader 也会导致数据丢失
-1:同样在 1 的基础上 服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出
的 ack,这样数据不会丢失
MQ
MQ->消费者

顺序性

延迟性

RabbitMQ
通过死信队列发送延迟消息,有两种方式;

延迟时间设置在队列上
普通exchange:无额外设置
普通queue(AB)设置:x-dead-letter-exchange=死信exchange,x-dead-letter-routingKey=死信A、B,x-message-ttl=6000 (ms)
死信exchange:无额外设置
死信queue(AB):绑定到死信exchange时分别设置routingKey为死信A、B

延迟时间设置在消息上
由于queue里消息死亡是排队死的,若前面的消息设置了很长的ttl时间如60s,后面来的消息就算很短如2s,也要等到前面的消息死亡后才会跟着死亡,这个要用插件rabbitmq-delayed-message-exchange解决。

关于插件rabbitmq-delayed-message-exchange——
配置:
添加插件abbitmq_delayed_message_exchange,解压放到插件目录
rabbitmq-plugins enable rabbitmq_delayed_message_exchange 重启生效
使用:
声明exchange的时候不能用DirectExchange了,要用CustomExchange,设置属性x-delayed-type=direct,x-delayed-message,然后将一个queue绑定到该exchange上;
发消息时要设置setDelay(xxx);


对于RabbitMQ,如果消息在queue中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了,只能写代码找出来重新发mq。

异常情况

消息堆满
方案一:消费一个丢有一个
方案二:空闲时写程序重新查了补mq

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注