场景
异步处理:用户注册后,发送注册邮件和注册短信。用户注册完成后,提交任务到 MQ,发送模块
并行获取 MQ 中的任务。
系统解耦:比如用注册完成,再加一个发送微信通知。只需要新增发送微信消息模块,从 MQ 中读取任务,发送消息即可。无需改动注册模块的代码,这样注册模块与发送模块通过 MQ 解耦。
流量削峰:秒杀和抢购等场景经常使用 MQ 进行流量削峰。活动开始时流量暴增,用户的请求写入MQ,超过 MQ 最大长度丢弃请求,业务系统接收 MQ 中的消息进行处理,达到流量削峰、保证系统可用性的目的。
日志处理:日志采集方收集日志写入 kafka 的消息队列中,处理方订阅并消费 kafka 队列中的日志数据。
消息通讯:点对点或者订阅发布模式,通过消息进行通讯。如微信的消息发送与接收、聊天室等。
特点
优点
见场景
缺点
复杂度提升,可用性降低,要考虑一致性(事务)问题
原理
Kafka:
有offset概念
高可用
ActiveMQ:
Master-Slave 部署方式主从热备,方式包括通过共享存储目录来实现(shared filesystem MasterSlave)、通过共享数据库来实现(shared database Master-Slave)、5.9版本后新特性使用ZooKeeper 协调选择 master(Replicated LevelDB Store)。
Broker-Cluster 部署方式进行负载均衡。
RabbitMQ:
单机模式与普通集群模式无法满足高可用,镜像集群模式指定多个节点复制 queue 中的消息做到高可用,但消息之间的同步网络性能开销较大。
RocketMQ:
有多 master 多 slave 异步复制模式和多 master 多 slave 同步双写模式支持集群部署模式。
Producer 随机选择 NameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Master 发送心跳,只能将消息发送到 Broker master。
Consumer 同时与提供 Topic 服务的 Master、Slave 建立长连接,从 Master、Slave 订阅消息都可以,订阅规则由 Broker 配置决定。
Kafka:
由多个 broker 组成,每个 broker 是一个节点;topic 可以划分为多个 partition,每个 partition 可以存在于不同的 broker 上,每个 partition 存放一部分数据,这样每个 topic 的数据就分散存放在多个机器上的。
replica 副本机制保证每个 partition 的数据同步到其他节点,形成多 replica 副本;所有 replica 副本会选举一个 leader 与 Producer、Consumer 交互,其他 replica 就是 follower;写入消息leader 会把数据同步到所有 follower,从 leader 读取消息。
每个 partition 的所有 replica 分布在不同的机器上。某个 broker 宕机,它上面的 partition 在其他节点有副本,如果有 partition 的 leader,会进行重新选举 leader。
幂等性
一次消费和多次消费影响一致。
解决:
消息体加入业务唯一标识key、或者全局messageId,消费方根据该key查询持久化文件看是否消费过。
可通过组合属性保证唯一:就比如商品退款的时候,通过子单号+退款来源类型+商品id+商品价格+商品退款金额等属性来保证唯一性。
更新:cas操作,传入condition和target。
插入:通过唯一索引保证只插一条,或写redis的set里。
可靠性(丢失问题)
RabbitMQ:
是默认保存再内存的,如果不设置持久化的话,一旦重启数据就没了;要开启持久化必须交换器、队列、message持久化发送
生产者->MQ
1.开启事务,但会变成同步阻塞操作,channel.txSelect() channel.txRollback()
2.开启confirm模式(异步),channel.confirm() ack(String messageId),nack(String messageId)
MQ
queue必须设置为持久化的,发消息的时候deliveryMode要设置为2,两个条件必须同时开启;
另外可以结合生产者的confir。
MQ->消费者
关闭自动ack,开启手动ack。
Kafka:
生产者->MQ
request.required.acks 有三个值 0 1 -1
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候
就会丢数据
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader 挂掉后他
不确保是否复制完成新 leader 也会导致数据丢失
-1:同样在 1 的基础上 服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出
的 ack,这样数据不会丢失
MQ
MQ->消费者
顺序性
延迟性
RabbitMQ:
通过死信队列发送延迟消息,有两种方式;
延迟时间设置在队列上:
普通exchange:无额外设置
普通queue(AB)设置:x-dead-letter-exchange=死信exchange,x-dead-letter-routingKey=死信A、B,x-message-ttl=6000 (ms)
死信exchange:无额外设置
死信queue(AB):绑定到死信exchange时分别设置routingKey为死信A、B
延迟时间设置在消息上:
由于queue里消息死亡是排队死的,若前面的消息设置了很长的ttl时间如60s,后面来的消息就算很短如2s,也要等到前面的消息死亡后才会跟着死亡,这个要用插件rabbitmq-delayed-message-exchange解决。
关于插件rabbitmq-delayed-message-exchange——
配置:
添加插件abbitmq_delayed_message_exchange,解压放到插件目录
rabbitmq-plugins enable rabbitmq_delayed_message_exchange 重启生效
使用:
声明exchange的时候不能用DirectExchange了,要用CustomExchange,设置属性x-delayed-type=direct,x-delayed-message,然后将一个queue绑定到该exchange上;
发消息时要设置setDelay(xxx);
坑:
对于RabbitMQ,如果消息在queue中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了,只能写代码找出来重新发mq。
异常情况
消息堆满
方案一:消费一个丢有一个
方案二:空闲时写程序重新查了补mq