月度归档:2021年02月

BookKeeper 原理浅谈

接着之前的一篇文章 BookKeeper 集群搭建及使用,本文是 BookKeeper 系列的第二篇,短期来看应该也是最后一篇,本篇文章主要聚焦于 BookKeeper 内核的实现机制上,会从 BookKeeper 的基本概念、架构、读写一致性实现、读写分离实现、容错机制等方面来讲述,因为我并没有看过 BookKeeper 的源码,所以这里的讲述主要还是从原理、方案实现上来介绍,具体如何从解决方案落地到具体的代码实现,有兴趣的可以去看下 BookKeeper 的源码实现。

BookKeeper 基础

正如 Apache BookKeeper 官网介绍的一样:A scalable, fault-tolerant, and low-latency storage service optimized for real-time workloads。BookKeeper 的定位是一个可用于实时场景下的高扩展性、强容错、低延迟的存储服务。Pulsar-Cloud Native Messaging & Streaming - 示说网 中也做了一个简单总结:

  1. 低延迟多副本复制:Quorum Parallel Replication;
  2. 持久化:所有操作保证在刷盘后才 ack;
  3. 强一致性:可重复读的一致性(Repeatable Read Consistency);
  4. 读写高可用;
  5. 读写分离。

BookKeeper 基本概念

BookKeeper 简介 部分已经对 BookKeeper 的基本概念做了一些讲解,这里再重新回顾一下,只有明白这些概念之后才能对更好地理解后面的内容,如下图所示,一个 Log/Stream/Topic 可以由下面的部分组成(图片来自 Pulsar-Cloud Native Messaging & Streaming)。

技术图片BookKeeper 中的基本概念

其中:

  1. Ledger:它是 BK 的一个基本存储单元(本质上还是一种抽象),BK Client 的读写操作也都是以 Ledger 为粒度的;
  2. Fragment:BK 的最小分布单元(实际上也是物理上的最小存储单元),也是 Ledger 的组成单位,默认情况下一个 Ledger 会对应的一个 Fragment(一个 Ledger 也可能由多个 Fragment 组成);
  3. Entry:每条日志都是一个 Entry,它代表一个 record,每条 record 都会有一个对应的 entry id;

关于 Fragment,它是 Ledger 的物理组成单元,也是最小的物理存储单元,在以下两种情况下会创建新的 Fragment:

  1. 当创建新的 Ledger 时;
  2. 当前 Fragment 使用的 Bookies 发生写入错误或超时,系统会在剩下的 Bookie 中新建 Fragment,但这时并不会新建 Ledger,因为 Ledger 的创建和关闭是由 Client 控制的,这里只是新建了 Fragment(需要注意的是:这两个 Fragment 对应的 Ensemble Bookie 已经不一样了,但它们都属于一个 Ledger,这里并不一定是一个 Ensemble Change 操作)。

BookKeeper 架构设计

Apache BookKeeper 的架构如下图所示,它主要由三个组件构成:客户端 (client)、数据存储节点 (Bookie) 和元数据存储 Service Discovery(ZooKeeper),Bookies 在启动的时候向 ZooKeeper 注册节点,Client 通过 ZooKeeper 发现可用的 Bookie。

技术图片Apache BookKeeper 架构

这里,我们可以看到 BookKeeper 架构属于典型的 slave-slave 架构,zk 存储其集群的 meta 信息(zk 虽是单点,但 zk 目前的高可用还是很有保障的),这种模式的好处显而易见,server 端变得非常简单,所有节点都是一样的角色和处理逻辑,能够这样设计的主要原因是其副本没有 leader 和 follower 之分,这是它与一些常见 mq(如:kafka、RocketMQ)系统的典型区别,每种设计都有其 trade-off,BeekKeeper 从设计之初就是为了高可靠而设计。

BookKeeper 存储层实现

Apache BookKeeper 是一个高可靠的分布式存储系统,存储层的实现是其核心,对一个存储系统来说,关键的几点实现,无非是:一致性如何保证、IO 如何优化、高可用如何实现等,这小节就让我们揭开其神秘面纱。

新建 Ledger

Ledger 是 BookKeeper 的基本存储抽象单元,这里先看下一个 Ledger 是如何创建的,这里会介绍一些关于 Ledger 存储层的一些重要概念(图片来自 Pulsar-Cloud Native Messaging & Streaming)。

技术图片BookKeeper Ledger 的创建

Ledger 是一组追加有序的记录,它是由 Client 创建的,然后由其进行追加写操作。每个 Ledger 在创建时会被赋予全局唯一的 ID,其他的 Client 可以根据 Ledger ID,对其进行读取操作。创建 Ledger 及 Entry 写入的相关过程如下:

  1. Client 在创建 Ledger 的时候,从 Bookie Pool 里面按照指定的数据放置策略挑选出一定数量的 Bookie,构成一个 Ensemble;
  2. 每条 Entry 会被并行地发送给 Ensemble 里面的部分 Bookies(每条 Entry 发送多少个 Bookie 是由 Write Quorum size 设置、具体发送哪些 Bookie 是由 Round Robin 算法来计算),并且所有 Entry 的发送以流水线的方式进行,也就是意味着发送第 N + 1 条记录的写请求不需要等待发送第 N 条记录的写请求返回;
  3. 对于每条 Entry 的写操作而言,当它收到 Ensemble 里面大多数 Bookie 的确认后(这个由 Ack Quorum size 来设置),Client 认为这条记录已经持久化到这个 Ensemble 中,并且有大多数副本。

技术图片BookKeeper Ledger 多副本复制

这里引入了三个重要的概念,它们也是 BookKeeper 一致性的基础:

  1. Ensemble size(E):Set of Bookies across which a ledger is striped,一个 Ledger 所涉及的 Bookie 集合;
  2. Write Quorum Size(Qw):Number of replicas,副本数;
  3. Ack Quorum Size(Qa):Number of responses needed before client’s write is satisfied。

从上面 Ensemble、Qw、Qa 的概念可以得到以下这些推论:

  1. Ensemble:可以控制一个 Ledger 的读写带宽;
  2. Write Quorum:控制一条记录的复本数;
  3. Ack Quorum:写每条记录需要等待的 Ack 数 ,控制时延;
  4. 增加 Ensemble,可以增加读写带宽(增加了可写的机器数);
  5. 减少 Ack Quorum,可以减长尾时延。

一致性

对于分布式存储系统,为了高可用,多副本是其通用的解决方案,但也带来了一致性的问题,这里就看下 Apache BookKeeper 是如何解决其带来的一致性问题的。

一致性模型

在介绍其读写一致性之前,先看下 BK 的一致性模型(图片来自 Twitter高性能分布式日志系统架构解析)。

技术图片BookKeeper 一致性模型-开始时的状态

对于 Write 操作而言,writer 不断添加记录,每条记录会被 writer 赋予一个严格递增的 id,所有的追加操作都是异步的,也就是说:第二条记录不用等待第一条记录返回结果。所有写成功的操作都会按照 id 递增顺序返回 ack 给 writer。(图片来自 Twitter高性能分布式日志系统架构解析)。

技术图片BookKeeper 一致性模型-追加数据时的中间状态

伴随着写成功的 ack,writer 不断地更新一个指针叫做 Last-Add-Confirm(LAC),所有 Entry id 小于等于 LAC 的记录保证持久化并复制到大多数副本上,而 LAC 与 LAP(Last-Add-Pushed)之间的记录就是已经发送到 Bookie 上但还未被 ack 的数据。

读的一致性

所有的 Reader 都可以安全读取 Entry ID 小于或者等于 LAC 的记录,从而保证 reader 不会读取未确认的数据,从而保证了 reader 之间的一致性(图片来自 Twitter高性能分布式日志系统架构解析)。

技术图片BookKeeper 一致性模型-读的一致性

写的一致性

从上面的介绍中,也可以看出,对于 BK 的多个副本,其并没有 leader 和 follower 之分,因此,BK 并不会进行相应的选主(leader election)操作,并且限制每个 Ledger 只能被一个 Writer 写,BK 通过 Fencing 机制来防止出现多个 Writer 的状态,从而保证写的一致性。

读写分离

下面来看下 BK 存储层一个很重要的设计,那就是读写分离机制。在论文 Durability with BookKeeper 中,关于读写分离机制的介绍如下所示(图片来自 Durability with BookKeeper):

技术图片BookKeeper 读写分离

A bookie uses two devices, ideally in separate physical disks:

  1. The journal device is a write-ahead log and stores synchronously and sequentially all updates the bookie executes.
  2. The ledger device contains an indexed copy of a ledger fragment, which a bookie uses to respond to read requests.

上面是论文中关于 BK 读写分离机制实现的介绍,我当时在看完上面的记录之后,脑海中有以下疑问:

  1. 一个写请求是怎么处理?什么时候数据被认为是 ack 了;
  2. 数据肯定先写到 Journal Device 中的,那么数据是如何到 Ledger Device 中的?
  3. Ledger Device 中的顺序写跟随机读是什么意思?难道跟 RocketMQ 的存储结构一样?
  4. Ledger Device 底层是怎么切分实际的物理文件的?
  5. 数据在什么时候才能可见?
  6. 在从 Ledger Device 读数据时,它是通过什么机制提高查询速度的?

带着这些疑问,接下来来分析其实现(图片来自 Pulsar-Cloud Native Messaging & Streaming):

技术图片BookKeeper 读写分离

Journal Device 分析:

  • 处理写入请求时,如果 Journal 是在专用的磁盘上,由于是顺序写入刷盘,性能会很高;

Ledger Device 的实现:

  • Bookie 最初的设计方案是每个 Ledger 对应一个物理文件,但这样会极大消耗写性能,所以 Bookie 当前的设计方案是所有 Ledger 都写一个单独的文件中,这个文件又叫 entry log;
  • 写入时,不但会写入到 Journal 中还会写入到缓存(memtable)中,定期会做刷盘(刷盘前会做排序,通过 聚合+排序 优化读取性能);
  • 优化查找:Ledger Device 中会维护一个索引结构,存储在 RocksDB 中,它会将 (LedgerId,EntryId) 映射到(EntryLogId,文件中的偏移量)。

读写流程

了解完 BK 的一致性模型和读写分离机制之后,这里来看下 BK 的读写流程。

Entry 写入流程

这里以一个例子来说明,假设 E 是3,Qw 和 Qa 是2,那么 Entry 写入如下图(图片来自 Durability with BookKeeper):

技术图片BookKeeper Entry 写入流程

  1. Writer 会先分配对应的 id,然后按照 round-robin 算法从3个 Bookie 中选取2个 Bookie;
  2. Writer 会向两个 Bookie 发送写入请求,因为 Qa 设置为2,只有收到两个 ack 响应后,才会认为这条 Entry 写入成功;

如果写入过程中有一台 Bookie 挂了怎么办?

  1. 那么只能向另外2台 Bookie 写入数据;
  2. 这时候这个 Ledger 会新建一个 Fragment,假设挂的是A,之前 Ensemble 是 A、B、C,现在的是 B、C;
  3. 这个变化会更新到 zk 中这个 Ledger 的 meta 中。

如果写入过程中有两个 Bookie 挂了怎么办?

  1. Ensemble 里面的存活的 Bookies 不能满足 Qw 的要求;
  2. Client 会进行一个 Ensemble Change 操作;
  3. Ensemble Change 将从 Bookie Pool 中根据数据放置策略挑选出额外的 Bookie 用来取代那些不存活的 Bookie 。

Entry 读取流程

这里依然以一个例子做说明,例子是紧接着上面的示例,如下图所示(图片来自 Durability with BookKeeper):

技术图片BookKeeper Entry 读取流程

如何想要读取 id 为1的那条 Entry 应该怎么做?

  • 在读取会选择最优的 Bookie,有了 Entry 的 id 和 Ledger 的 Ensemble 就可以根据 round-robin 计算出其所在 Bookie 信息,会选择向其中一个 Bookie 发送读请求。

这种机制会导致,读取数据时可能需要从多个 Bookie 获取数据,需要并发访问多个 Bookie,性能会变差,极端情况会有这个问题。

  • BK 有一个优化策略:读取时一般是选择读一段数据,如果 entries 在同一台机器上,会从同一个 Bookie 把这批 Entry 全部读取。

BK 怎么处理长尾效应的问题(长尾效应指的是某台机器上某段或者某条数据读取得比较慢,进而影响了整体的效率)?

  • Client 可以向任意一个副本读取相应的 Entry,但为了保证低延时,这里使用了一个叫 Speculative Read 的机制。读请求首先发送给第一个副本后,如果在指定的时间内没有收到 reponse,则发送读请求给第二个副本,然后同时等待第一个和第二个副本。谁第一个返回,即读取成功。通过有效的 Speculative read,可以很大程度减少长尾效应。

BookKeeper 容错机制

这里来简单来看下 BookKeeper 容错机制的实现。

Fencing 机制

Fencing 机制在前面已经简单介绍过了,它目的主要是为了保证写的一致性,严格保证一个 Ledger 只能被一个 Writer 来写。

Fencing 怎么触发呢?

  • 如果一个 Writer 打开一个 Ledger,发现这个 Ledger 存在,并且没有 close,这种情况下,就会触发 Fencing 策略,并且触发 Ledger Recovery。

Log Recovery 机制

一个 Ledger 正常关闭后,会在其 Metadata 中存储 the last entry 的信息,所以正常关闭一个 Ledger 是非常重要的(Ledger 一旦关闭,其就是不可变的,读取的时候可以从任意一个 Bookie 上读取,而不需要再取 care 这个 Ledger 的 LAC 信息),否则可能会出现这样一种情况:

由于 Writer 挂了(Ledger 未正常关闭),导致部分数据写入成功,实际上这个条消息并不满足 Qw(可能满足了 Qa),会导致不同 Reader 读取的结果不一致!如下图所示:

技术图片不同 Reader 读取不一致的情况

解决方案就是: Log Recovery,正常关闭这个 Ledger,并将 The Last Entry 及状态更新到 metadata 中。

Log Recovery 怎么实现呢?通常有两种方案:

  1. 遍历这个 Ledger 所有 Entry 进行恢复;
  2. 利用 LAC 机制可以加速 recovery:恢复前,先获取每个 Ledger 的 LAC 信息,然后从 LAC 开始恢复;

很明显,第二种方案是比较合理的恢复速度更快。

Bookie 容错

当一个 Bookie 故障时:

  • 所有在这个 Bookie 上的 Ledgers 都处于 under-replica 状态,恢复就是复制 Fragment (Ledger 的组成单位)的过程,以确保每个 Ledger 维护的副本数打到 Qw。

Bk 提供自动和手动两种方式:两种方式的复制协议是一样的;自动恢复是 BK 内部自动触发,手动过程需要手动干预,这里重点介绍自动过程:

  1. 自动恢复是在 Bookie 上运行 AutoRecoveryMain 线程来实现,它会首先通过 zk 选举一个 Auditor;
  2. Auditor 的作用是检查不可用的 Bookie,然后做下面的操作:读取 zk 上完整的 Ledgers 信息,找到失败的 Ledgers(副本不满足条件的);然后在 zk 的 /underreplicated znode 节点创建重新复制任务;
  3. AutoRecoveryMain 还有 Replicator Worker 线程会复制相应的 Fragment 到自己的 Ledger 上,如果复制后满足 Fully Replicated,那么就从 zk 的节点中删除这个任务;

技术图片Bookie 容错机制

每个 Bookie 在发现任务时会尝试锁定,如果无法锁定就会执行后面的任务。如果获得锁,那么:

  1. 扫描 Ledgers,查找不属于当前 Bookie 的 Fragment;
  2. 对于每个匹配的 Fragment,它将另一个 Bookie 的数据复制到它自己的 Bookie,用新的集合更新 Zookeeper 并将 Fragment 标识为 Fully Replicated。

如果 Ledgers 仍然存在副本数不足的 Fragment,则释放锁。如果所有 Fragment 都已经Fully Replicated,则从 /underreplicated 删除重复复制任务。

BookKeeper介绍

 

BookKeeper带有多个读写日志的server,称为 bookies。每一个bookie是一个BookKeeper的存储服务,存储了写到BookKeeper上的write-ahead日志,及其数据内容。写入的log流(称它为流是因为BookKeeper记录的是byte[])称为 ledgers,一个ledger是一个日志文件,每个日志单元叫 ledger entry,也就是bookies是存ledgers的。ledger只支持append操作,而且同时只能有一个单线程来写。ZK充当BookKeeper的元数据存储服务,在zk中会存储ledger相关的元数据,包括当前可用的bookies,ledger分布的位置等。

 

BookKeeper通过读写多个存储节点达到高可用性,同时为了恢复由于异常造成的多节点数据不一致性,引入了数据一致性算法。BookKeeper的可用性还体现在只要有足够多的bookies可用,整个服务就可用。实际上,一份entry的写入需要确保N份日志冗余在N个bookie上写成功,而我们需要>N个bookie提供服务。在启动BookKeeper的时候,需要指定一个ensemble值,即bookie可用的最小节点数量,还需要指定一个quorums值,即日志写入BookKeeper服务端的冗余份数。BookKeeper的可扩展性体现在可以增加bookie数目,增加bookies可以提升读写吞吐量。

技术图片

下面这张图,展示了序列化日志怎样写入到Bookie上。

Ledger记录首先写入到Journal,然后再写入到Indexes和Entry Log。写入到Journal是同步落盘持久化的。写入到Entry Log的是先缓存在Page Cache中,异步刷盘。一般建议Journal与日志实体(Entry Log/Ledger Indexes)分开存储,避免写入IO竞争。另外,为了写入的高性能,Journal选择SSD保存;日志实体可以存储在通用的硬盘设备上,比如JBOD。
由于不同Ledgers的记录都是汇聚到一起写入Entry Log的,即Bookies是顺序写,随机读的。为了提升读取性能,Bookies给每个Ledger维护了一个Ledger Indexes。这个索引映射日志实体(entries)位置与Ledger的关系(即entry log上哪个位置开始到哪个位置结束的数据属于哪个Ledger)。

最后

到这里,关于 BK 内核实现的主要部分已经介绍完毕,这篇文章的主要内容来自之前在团队的一次分享,一直想整理成博客的,但一直拖到了现在(因为并没有去看代码实现,主要是跟 bk 的论文及相关资料来整理的,有问题的地方欢迎指正)。


参考:

    • Durability with BookKeeper;
    • Pulsar-Cloud Native Messaging & Streaming;
    • Apache BookKeeper Documentation;
    • Introduction to Apache BookKeeper;
    • Why Apache BookKeeper? Part 1: consistency, durability, availability;
    • Why Apache Bookkeeper? Part 2;
    • Understanding How Apache Pulsar Works;
    • How to Lose Messages on a RabbitMQ Cluster;
    • Pulsar-Cloud Native Messaging & Streaming - 示说网;
    • Twitter高性能分布式日志系统架构解析;

    RabbitMQ的应用场景以及基本原理介绍

    1.背景

    RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。

    2.应用场景

    2.1异步处理

    场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式
    (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
    这里写图片描述
    (2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
    这里写图片描述
    假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,英爱是写入数据库后就返回.
    (3)消息队列
    引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
    这里写图片描述
    由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

    2.2 应用解耦

    场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
    这里写图片描述
    这种做法有一个缺点:

    • 当库存系统出现故障时,订单就会失败。(这样马云将少赚好多好多钱^ ^)
    • 订单系统和库存系统高耦合.
      引入消息队列
      这里写图片描述
    • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
    • 库存系统:订阅下单的消息,获取下单消息,进行库操作。
      就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失(马云这下高兴了).

    流量削峰

    流量削峰一般在秒杀活动中应用广泛
    场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
    作用:
    1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
    2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
    这里写图片描述
    1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
    2.秒杀业务根据消息队列中的请求信息,再做后续处理.

    3.系统架构

    这里写图片描述
    几个概念说明:
    Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
    Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
    Queue:消息的载体,每个消息都会被投到一个或多个队列。
    Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
    Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
    vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
    Producer:消息生产者,就是投递消息的程序.
    Consumer:消息消费者,就是接受消息的程序.
    Channel:消息通道,在客户端的每个连接里,可建立多个channel.

    4.任务分发机制

    4.1Round-robin dispathching循环分发

    RabbbitMQ的分发机制非常适合扩展,而且它是专门为并发程序设计的,如果现在load加重,那么只需要创建更多的Consumer来进行任务处理。

    4.2Message acknowledgment消息确认

    为了保证数据不被丢失,RabbitMQ支持消息确认机制,为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack,而应该是在处理完数据之后发送ack.
    在处理完数据之后发送ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以安全的删除它了.
    如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出情况下数据也不会丢失.
    RabbitMQ它没有用到超时机制.RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有正确处理,也就是说RabbitMQ给了Consumer足够长的时间做数据处理。
    如果忘记ack,那么当Consumer退出时,Mesage会重新分发,然后RabbitMQ会占用越来越多的内存.

    5.Message durability消息持久化

    要持久化队列queue的持久化需要在声明时指定durable=True;
    这里要注意,队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性.
    队列和交换机有一个创建时候指定的标志durable,durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复
    消息持久化包括3部分
    1. exchange持久化,在声明时指定durable => true

    hannel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
    

     

    • 1

    2.queue持久化,在声明时指定durable => true

    channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);//声明消息队列,且为可持久化的
    

     

    • 1

    3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化).

    channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());  
    

     

    • 1

    如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定.
    注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。

    6.Fair dispath 公平分发

    你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它不管Consumer是否还有unacked Message,只是按照这个默认的机制进行分发.
    那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却毫无休息的机会,那么,Rabbit是如何处理这种问题呢?
    这里写图片描述
    通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它

    channel.basic_qos(prefetch_count=1) 
    

     

    • 1

    注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。

    7.分发到多个Consumer

    7.1Exchange

    先来温习以下交换机路由的几种类型:
    Direct Exchange:直接匹配,通过Exchange名称+RountingKey来发送与接收消息.
    Fanout Exchange:广播订阅,向所有的消费者发布消息,但是只有消费者将队列绑定到该路由器才能收到消息,忽略Routing Key.
    Topic Exchange:主题匹配订阅,这里的主题指的是RoutingKey,RoutingKey可以采用通配符,如:*或#,RoutingKey命名采用.来分隔多个词,只有消息这将队列绑定到该路由器且指定RoutingKey符合匹配规则时才能收到消息;
    Headers Exchange:消息头订阅,消息发布前,为消息定义一个或多个键值对的消息头,然后消费者接收消息同时需要定义类似的键值对请求头:(如:x-mactch=all或者x_match=any),只有请求头与消息头匹配,才能接收消息,忽略RoutingKey.
    默认的exchange:如果用空字符串去声明一个exchange,那么系统就会使用”amq.direct”这个exchange,我们创建一个queue时,默认的都会有一个和新建queue同名的routingKey绑定到这个默认的exchange上去

    channel.BasicPublish("", "TaskQueue", properties, bytes);
    

     

    • 1

    因为在第一个参数选择了默认的exchange,而我们申明的队列叫TaskQueue,所以默认的,它在新建一个也叫TaskQueue的routingKey,并绑定在默认的exchange上,导致了我们可以在第二个参数routingKey中写TaskQueue,这样它就会找到定义的同名的queue,并把消息放进去。
    如果有两个接收程序都是用了同一个的queue和相同的routingKey去绑定direct exchange的话,分发的行为是负载均衡的,也就是说第一个是程序1收到,第二个是程序2收到,以此类推。
    如果有两个接收程序用了各自的queue,但使用相同的routingKey去绑定direct exchange的话,分发的行为是复制的,也就是说每个程序都会收到这个消息的副本。行为相当于fanout类型的exchange。
    下面详细来说:

    7.2 Bindings 绑定

    绑定其实就是关联了exchange和queue,或者这么说:queue对exchange的内容感兴趣,exchange要把它的Message deliver到queue。

    7.3Direct exchange

    Driect exchange的路由算法非常简单:通过bindingkey的完全匹配,可以用下图来说明.
    这里写图片描述
    Exchange和两个队列绑定在一起,Q1的bindingkey是orange,Q2的binding key是black和green.
    当Producer publish key是orange时,exchange会把它放到Q1上,如果是black或green就会到Q2上,其余的Message被丢弃.

    7.4 Multiple bindings

    多个queue绑定同一个key也是可以的,对于下图的例子,Q1和Q2都绑定了black,对于routing key是black的Message,会被deliver到Q1和Q2,其余的Message都会被丢弃.
    这里写图片描述

    7.5 Topic exchange

    对于Message的routing_key是有限制的,不能使任意的。格式是以点号“.”分割的字符表。比如:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。你可以放任意的key在routing_key中,当然最长不能超过255 bytes。
    对于routing_key,有两个特殊字符

    • *(星号)代表任意一个单词
    • #(hash)0个或多个单词
      这里写图片描述
      Producer发送消息时需要设置routing_key,routing_key包含三个单词和连个点号o,第一个key描述了celerity(灵巧),第二个是color(色彩),第三个是物种:
      在这里我们创建了两个绑定: Q1 的binding key 是”.orange.“; Q2 是 “..rabbit” 和 “lazy.#”:

      • Q1感兴趣所有orange颜色的动物
      • Q2感兴趣所有rabbits和所有的lazy的.
        例子:rounting_key 为 “quick.orange.rabbit”将会发送到Q1和Q2中
        rounting_key 为”lazy.orange.rabbit.hujj.ddd”会被投递到Q2中,#匹配0个或多个单词。

    8.消息序列化

    RabbitMQ使用ProtoBuf序列化消息,它可作为RabbitMQ的Message的数据格式进行传输,由于是结构化的数据,这样就极大的方便了Consumer的数据高效处理,当然也可以使用XML,与XML相比,ProtoBuf有以下优势:
    1.简单
    2.size小了3-10倍
    3.速度快了20-100倍
    4.易于编程
    6.减少了语义的歧义.
    ,ProtoBuf具有速度和空间的优势,使得它现在应用非常广泛

    RabbitMQ 作用及模式

    RabbitMQ 作用

    你好! 你知道RabbitMQ是什么吗,是干什么用的呢,就让我们来学习一下吧。

    1.什么是RabbitMQ

    RabbitMQ采用了AMQP高级信息消息队列协议的一种消息对列技术,特点就是消费并不需要确保提供方的存在,大大的实现了对服务之间的高度解耦

    2.为什么要用RabbitMQ

    1.在分布式系统下具备异步,削峰,负载均衡等一系列的功能。
    2.拥有持久化的机制,进程信息,队列中的信息也可以保存下来。
    3.实现消费者和生产者之间的解耦。
    4.可以使用消息队列达到异步下单的效果,排队中,后台进行逻辑下单。
    
    • 3.使用场景
    1. 服务见异步通信
    2. 顺序消费
    3. 定时任务
    4. 请求削峰
    
    • 1
    • 2
    • 3
    • 4

    4.如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息

    发送方:
    将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一的ID。
    
    • 1
    • 2

    一旦消息被投递到目的队列后,或者消息被写入磁盘后(可持久化的消息),信道会发送一个确认给生产者(包含消息唯一ID)。
    如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack(not acknowledged,未确认)消息。
    发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。当确认消息到达生产者应用程序,生产者应用程序的回调方法就会被触发来处理确认消息。
    接受方:
    接收方消息确认机制:消费者接收每一条消息后都必须进行确认(消息接收和消息确认是两个不同操作)。只有消费者确认了消息,RabbitMQ才能安全地把消息从队列中删除。
    这里并没有用到超时机制,RabbitMQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证数据的最终一致性;
    下面罗列几种特殊情况:
    如果消费者接收到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消息重复消费的隐患,需要去重)
    如果消费者接收到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙,将不会给该消费者分发更多的消息。

    5.消息基于什么来传输

    RabbitMQ使用的是信道的方式来传输数据,信道建立在真实的TCP连接内的虚拟连接中,而且每一条TCP 连接的信道数量都没有限制

    6.工作模式

    • 简单的模式 HELLO WORLD
      在这里插入图片描述
      功能:一个生产者A给消息队列Q发信息,一个消费者B接受
      生产者的实现思路
      先创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口1103,设置用户名,密码,Virtual host, 从连接工厂中获取连接Connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,然后关闭通道和连接。
      在这里插入图片描述

    消费者实现思路
    创建连接工厂ConnectionFactory, 设置服务地址127.0.0.1,端口号1103,设置用户名,密码,Virtual host, 从连接工厂中获取连接connection , 使用连接创建通道channel,使用通道channel创建队列queue,创建消费者并监听队列,从队列中读取信息,
    在这里插入图片描述
    2.工作队列模式Work Queue
    在这里插入图片描述
    功能:一个生产者,多个消费者,每个消费者获取的消息唯一,多个消费者只有一个队列
    生产者实现思路:
    创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号1103,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,2条消息之间间隔一定时间,关闭通道和连接。
    在这里插入图片描述
    消费者实现思路:

    创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号1103,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,创建消费者C1并监听队列,获取消息并暂停10ms,另外一个消费者C2暂停1000ms,由于消费者C1消费速度快,所以C1可以执行更多的任务。
    在这里插入图片描述
    3.发布/订阅模式
    在这里插入图片描述
    功能:一个生产者发送的消息会被多个消费者获取。一个生产者,一个交换机,多个队列,多个消费者
    生产者:可以把消息发送给队列或者是交换机
    消费者:只能从队列中获取消息
    如果消息没有发送到队列绑定的交换机上,那么消息将会丢失,交换机上是不会存储消息的,消息存储在消息队列中。
    生产者思路:
    创建链接工厂ConnectionFactory , 设置服务地址127.0.0.1,端口号1103,设置用户名,密码,Virtual host ,从连接工厂中获取连接connection,使用连接创建通道channel,只用通道channel创建队列queue,使用通道channel创建交换机并指向交换机行为为fanout,使用通道向交换机发送信息,并关闭通道的连接
    在这里插入图片描述
    消费着实现思路
    创建连接工厂ConectionFactory,设置服务地址127.0.0.1,端口号: 1103,设置用户名,密钥,Virtual host 从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,多个消费者进行监听。
    在这里插入图片描述
    4.路由模式Routing
    在这里插入图片描述

    生产者实现思路:

    创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为direct,使用通道向交换机发送消息并指定key=b,关闭通道和连接。

    在这里插入图片描述
    消费者实现思路:

    创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,但只要绑定key=b的队列key接收到消息,多个消费者进行监听。在这里插入图片描述
    5.通配符模式Topics
    在这里插入图片描述
    生产者实现思路:

    创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel创建交换机并指定交换机类型为topic,使用通道向交换机发送消息并指定key=key.1,关闭通道和连接。
    在这里插入图片描述
    消费者实现思路:

    创建连接工厂ConnectionFactory,设置服务地址127.0.0.1,端口号5672,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,绑定队列到交换机,设置Qos=1,创建消费者并监听队列,使用手动方式返回完成。可以有多个队列绑定到交换机,凡是绑定规则符合通配符规则的队列均可以接收到消息,比如key.*,key.#,多个消费者进行监听。
    在这里插入图片描述

    RabbitMQ快速入门(详细)

    在介绍RabbitMQ之前,我们先来看下面一个电商项目的场景

    • 商品的原始数据保存在数据库中,增删改查都在数据库中完成。
    • 搜索服务数据来源是索引库(Elasticsearch),如果数据库商品发生变化,索引库数据不能及时更新。
    • 商品详情做了页面静态化处理,静态页面数据也不会随着数据库商品更新而变化。

    如果我们在后台修改了商品的价格,搜索页面和商品详情页显示的依然是旧的价格,这样显然不对。该如何解决?  

    我们可能会想到这么做:

    • 方案1:每当后台对商品做增删改操作,同时修改索引库数据及更新静态页面。
    • 方案2:搜索服务和商品页面静态化服务对外提供操作接口,后台在商品增删改后,调用接口。

    这两种方案都有个严重的问题:就是代码耦合,后台服务中需要嵌入搜索和商品页面服务,违背了微服务的独立原则。

    这时,我们就会采用另外一种解决办法,那就是消息队列

    商品服务对商品增删改以后,无需去操作索引库和静态页面,只需向MQ发送一条消息(比如包含商品id的消息),也不关心消息被谁接收。 搜索服务和静态页面服务监听MQ,接收消息,然后分别去处理索引库和静态页面(根据商品id去更新索引库和商品详情静态页面)。

    什么是消息队列

    MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

     

    开发中消息队列通常有如下应用场景:

    1、任务异步处理:

    高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。

    2、应用程序解耦合

    MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

     

    AMQP和JMS

    MQ是消息通信的模型,并发具体实现。现在实现MQ的有两种主流方式:AMQP、JMS。

    两者间的区别和联系:

    • JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式
    • JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
    • JMS规定了两种消息模型;而AMQP的消息模型更加丰富

    常见MQ产品

    • ActiveMQ:基于JMS
    • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
    • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
    • Kafka:分布式消息系统,高吞吐量

    RabbitMQ快速入门

    RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com

    下载与安装

    RabbitMQ由Erlang语言开发,需要安装与RabbitMQ版本对应的Erlang语言环境,具体的就不解释了,自行搜索教程。RabbitMQ官网下载地址:http://www.rabbitmq.com/download.html

    RabbitMQ的工作原理

    下图是RabbitMQ的基本结构:

    组成部分说明:

    • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
    • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
    • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
    • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
    • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

    生产者发送消息流程:

    1、生产者和Broker建立TCP连接。

    2、生产者和Broker建立通道。

    3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

    4、Exchange将消息转发到指定的Queue(队列)

    消费者接收消息流程:

    1、消费者和Broker建立TCP连接

    2、消费者和Broker建立通道

    3、消费者监听指定的Queue(队列)

    4、当有消息到达Queue时Broker默认将消息推送给消费者。

    5、消费者接收到消息。

    6、ack回复

    六种消息模型

    ①基本消息模型:

    在上图的模型中,有以下概念:

    • P:生产者,也就是要发送消息的程序
    • C:消费者:消息的接受者,会一直等待消息到来。
    • queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

     生产者

    新建一个maven工程,添加amqp-client依赖

    1. <dependency>
    2. <groupId>com.rabbitmq</groupId>
    3. <artifactId>amqp-client</artifactId>
    4. <version>5.7.1</version>
    5. </dependency>

    连接工具类:

    1. public class ConnectionUtil {
    2. /**
    3. * 建立与RabbitMQ的连接
    4. * @return
    5. * @throws Exception
    6. */
    7. public static Connection getConnection() throws Exception {
    8. //定义连接工厂
    9. ConnectionFactory factory = new ConnectionFactory();
    10. //设置服务地址
    11. factory.setHost("192.168.1.103");
    12. //端口
    13. factory.setPort(5672);
    14. //设置账号信息,用户名、密码、vhost
    15. factory.setVirtualHost("/kavito");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
    16. factory.setUsername("kavito");
    17. factory.setPassword("123456");
    18. // 通过工厂获取连接
    19. Connection connection = factory.newConnection();
    20. return connection;
    21. }
    22. }

    生产者发送消息:

    1. public class Send {
    2. private final static String QUEUE_NAME = "simple_queue";
    3. public static void main(String[] argv) throws Exception {
    4. // 1、获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. // 2、从连接中创建通道,使用通道才能完成消息相关的操作
    7. Channel channel = connection.createChannel();
    8. // 3、声明(创建)队列
    9. //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
    10. /**
    11. * 参数明细
    12. * 1、queue 队列名称
    13. * 2、durable 是否持久化,如果持久化,mq重启后队列还在
    14. * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
    15. * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
    16. * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
    17. */
    18. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    19. // 4、消息内容
    20. String message = "Hello World!";
    21. // 向指定的队列中发送消息
    22. //参数:String exchange, String routingKey, BasicProperties props, byte[] body
    23. /**
    24. * 参数明细:
    25. * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
    26. * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
    27. * 3、props,消息的属性
    28. * 4、body,消息内容
    29. */
    30. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    31. System.out.println(" [x] Sent '" + message + "'");
    32. //关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
    33. channel.close();
    34. connection.close();
    35. }
    36. }

    控制台:

    web管理页面:服务器地址/端口号 (本地:127.0.0.1:15672,默认用户及密码:guest guest)

    点击队列名称,进入详情页,可以查看消息:

    消费者接收消息

    1. public class Recv {
    2. private final static String QUEUE_NAME = "simple_queue";
    3. public static void main(String[] argv) throws Exception {
    4. // 获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
    7. Channel channel = connection.createChannel();
    8. // 声明队列
    9. //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
    10. /**
    11. * 参数明细
    12. * 1、queue 队列名称
    13. * 2、durable 是否持久化,如果持久化,mq重启后队列还在
    14. * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
    15. * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
    16. * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
    17. */
    18. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    19. //实现消费方法
    20. DefaultConsumer consumer = new DefaultConsumer(channel){
    21. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    22. /**
    23. * 当接收到消息后此方法将被调用
    24. * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
    25. * @param envelope 信封,通过envelope
    26. * @param properties 消息属性
    27. * @param body 消息内容
    28. * @throws IOException
    29. */
    30. @Override
    31. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    32. //交换机
    33. String exchange = envelope.getExchange();
    34. //消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
    35. long deliveryTag = envelope.getDeliveryTag();
    36. // body 即消息体
    37. String msg = new String(body,"utf-8");
    38. System.out.println(" [x] received : " + msg + "!");
    39. }
    40. };
    41. // 监听队列,第二个参数:是否自动进行消息确认。
    42. //参数:String queue, boolean autoAck, Consumer callback
    43. /**
    44. * 参数明细:
    45. * 1、queue 队列名称
    46. * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
    47. * 3、callback,消费方法,当消费者接收到消息要执行的方法
    48. */
    49. channel.basicConsume(QUEUE_NAME, true, consumer);
    50. }
    51. }

    控制台打印:

    再看看队列的消息,已经被消费了

    我们发现,消费者已经获取了消息,但是程序没有停止,一直在监听队列中是否有新的消息。一旦有新的消息进入队列,就会立即打印.

    消息确认机制(ACK)

    通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

    那么问题来了:RabbitMQ怎么知道消息被接收了呢?

    如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

    因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

    • 自动ACK:消息一旦被接收,消费者自动发送ACK
    • 手动ACK:消息接收后,不会发送ACK,需要手动调用

    大家觉得哪种更好呢?

    这需要看消息的重要性:

    • 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便
    • 如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

    我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:

    1. public class Recv2 {
    2. private final static String QUEUE_NAME = "simple_queue";
    3. public static void main(String[] argv) throws Exception {
    4. // 获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. // 创建通道
    7. final Channel channel = connection.createChannel();
    8. // 声明队列
    9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    10. // 定义队列的消费者
    11. DefaultConsumer consumer = new DefaultConsumer(channel) {
    12. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    13. @Override
    14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    15. // body 即消息体
    16. String msg = new String(body);
    17. System.out.println(" [x] received : " + msg + "!");
    18. // 手动进行ACK
    19. /*
    20. * void basicAck(long deliveryTag, boolean multiple) throws IOException;
    21. * deliveryTag:用来标识消息的id
    22. * multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
    23. */
    24. channel.basicAck(envelope.getDeliveryTag(), false);
    25. }
    26. };
    27. // 监听队列,第二个参数false,手动进行ACK
    28. channel.basicConsume(QUEUE_NAME, false, consumer);
    29. }
    30. }

    最后一行代码设置第二个参数为false

    channel.basicConsume(QUEUE_NAME, false, consumer);

    自动ACK存在的问题

    修改消费者,添加异常,如下:

    生产者不做任何修改,直接运行,消息发送成功:

    运行消费者,程序抛出异常:

    管理界面:

    消费者抛出异常,但是消息依然被消费,实际上我们还没获取到消息。

    演示手动ACK

    重新运行生产者发送消息:

    同样,在手动进行ack前抛出异常,运行Recv2

    再看看管理界面:

    消息没有被消费掉!

    还有另外一种情况:修改消费者Recv2,把监听队列第二个参数自动改成手动,(去掉之前制造的异常) ,并且消费方法中没手动进行ACK

    生产者代码不变,再次运行:

    运行消费者 :

    但是,查看管理界面,发现:

    停掉消费者的程序,发现:

    这是因为虽然我们设置了手动ACK,但是代码中并没有进行消息确认!所以消息并未被真正消费掉。当我们关掉这个消费者,消息的状态再次变为Ready。

    正确的做法是:

    我们要在监听队列时设置第二个参数为false,代码中手动进行ACK

    再次运行消费者,查看web管理页面:

    消费者消费成功!

    生产者避免数据丢失:https://www.cnblogs.com/vipstone/p/9350075.html

     

    ②work消息模型

    工作队列或者竞争消费者模式

    work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。

    这个消息模型在Web应用程序中特别有用,可以处理短的HTTP请求窗口中无法处理复杂的任务。

    接下来我们来模拟这个流程:

    P:生产者:任务的发布者

    C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)

    C2:消费者2:领取任务并且完成任务,假设完成速度较快

     

    生产者

    生产者循环发送50条消息

    1. public class Send {
    2. private final static String QUEUE_NAME = "test_work_queue";
    3. public static void main(String[] argv) throws Exception {
    4. // 获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. // 获取通道
    7. Channel channel = connection.createChannel();
    8. // 声明队列
    9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    10. // 循环发布任务
    11. for (int i = 0; i < 50; i++) {
    12. // 消息内容
    13. String message = "task .. " + i;
    14. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    15. System.out.println(" [x] Sent '" + message + "'");
    16. Thread.sleep(i * 2);
    17. }
    18. // 关闭通道和连接
    19. channel.close();
    20. connection.close();
    21. }
    22. }

     

    消费者1

    1. public class Recv {
    2. private final static String QUEUE_NAME = "test_work_queue";
    3. public static void main(String[] argv) throws Exception {
    4. // 获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
    7. Channel channel = connection.createChannel();
    8. // 声明队列
    9. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    10. //实现消费方法
    11. DefaultConsumer consumer = new DefaultConsumer(channel){
    12. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    13. @Override
    14. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    15. // body 即消息体
    16. String msg = new String(body,"utf-8");
    17. System.out.println(" [消费者1] received : " + msg + "!");
    18. //模拟任务耗时1s
    19. try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }
    20. }
    21. };
    22. // 监听队列,第二个参数:是否自动进行消息确认。
    23. channel.basicConsume(QUEUE_NAME, true, consumer);
    24. }
    25. }

    消费者2

    代码不贴了,与消费者1基本类似,只是消费者2没有设置消费耗时时间。

    接下来,两个消费者一同启动,然后发送50条消息:

    可以发现,两个消费者各自消费了不同25条消息,这就实现了任务的分发。

    能者多劳

    刚才的实现有问题吗?

    • 消费者1比消费者2的效率要低,一次任务的耗时较长
    • 然而两人最终消费的消息数量是一样的
    • 消费者2大量时间处于空闲状态,消费者1一直忙碌

    现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。

    怎么实现呢?

    通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

    值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。

    再次测试:

     

    订阅模型分类

    说明下:

    1、一个生产者多个消费者
    2、每个消费者都有一个自己的队列
    3、生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
    4、每个队列都需要绑定到交换机上
    5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
    例子:注册->发邮件、发短信

    X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

    Exchange类型有以下几种:

    Fanout:广播,将消息交给所有绑定到交换机的队列

    Direct:定向,把消息交给符合指定routing key 的队列

    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

    Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    Publish/subscribe(交换机类型:Fanout,也称为广播 

    Publish/subscribe模型示意图 :

     

    生产者

    和前面两种模式不同:

    • 1) 声明Exchange,不再声明Queue
    • 2) 发送消息到Exchange,不再发送到Queue
    1. public class Send {
    2. private final static String EXCHANGE_NAME = "test_fanout_exchange";
    3. public static void main(String[] argv) throws Exception {
    4. // 获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. // 获取通道
    7. Channel channel = connection.createChannel();
    8. // 声明exchange,指定类型为fanout
    9. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    10. // 消息内容
    11. String message = "注册成功!!";
    12. // 发布消息到Exchange
    13. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
    14. System.out.println(" [生产者] Sent '" + message + "'");
    15. channel.close();
    16. connection.close();
    17. }
    18. }

    消费者1 (注册成功发给短信服务)

    1. public class Recv {
    2. private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信队列
    3. private final static String EXCHANGE_NAME = "test_fanout_exchange";
    4. public static void main(String[] argv) throws Exception {
    5. // 获取到连接
    6. Connection connection = ConnectionUtil.getConnection();
    7. // 获取通道
    8. Channel channel = connection.createChannel();
    9. // 声明队列
    10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11. // 绑定队列到交换机
    12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    13. // 定义队列的消费者
    14. DefaultConsumer consumer = new DefaultConsumer(channel) {
    15. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    16. @Override
    17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    18. byte[] body) throws IOException {
    19. // body 即消息体
    20. String msg = new String(body);
    21. System.out.println(" [短信服务] received : " + msg + "!");
    22. }
    23. };
    24. // 监听队列,自动返回完成
    25. channel.basicConsume(QUEUE_NAME, true, consumer);
    26. }
    27. }

    消费者2(注册成功发给邮件服务)

    1. public class Recv2 {
    2. private final static String QUEUE_NAME = "fanout_exchange_queue_email";//邮件队列
    3. private final static String EXCHANGE_NAME = "test_fanout_exchange";
    4. public static void main(String[] argv) throws Exception {
    5. // 获取到连接
    6. Connection connection = ConnectionUtil.getConnection();
    7. // 获取通道
    8. Channel channel = connection.createChannel();
    9. // 声明队列
    10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11. // 绑定队列到交换机
    12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
    13. // 定义队列的消费者
    14. DefaultConsumer consumer = new DefaultConsumer(channel) {
    15. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    16. @Override
    17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    18. byte[] body) throws IOException {
    19. // body 即消息体
    20. String msg = new String(body);
    21. System.out.println(" [邮件服务] received : " + msg + "!");
    22. }
    23. };
    24. // 监听队列,自动返回完成
    25. channel.basicConsume(QUEUE_NAME, true, consumer);
    26. }
    27. }

    我们运行两个消费者,然后发送1条消息:

    思考:

    1、publish/subscribe与work queues有什么区别。

    区别:

    1)work queues不用定义交换机,而publish/subscribe需要定义交换机。

    2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。

    3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。

    相同点:

    所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。

    2、实际工作用 publish/subscribe还是work queues。

    建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。

    ④Routing 路由模型(交换机类型:direct)

    Routing模型示意图:

    P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

    X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

    C1:消费者,其所在队列指定了需要routing key 为 error 的消息

    C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

    接下来看代码:

    生产者

    1. public class Send {
    2. private final static String EXCHANGE_NAME = "test_direct_exchange";
    3. public static void main(String[] argv) throws Exception {
    4. // 获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. // 获取通道
    7. Channel channel = connection.createChannel();
    8. // 声明exchange,指定类型为direct
    9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    10. // 消息内容,
    11. String message = "注册成功!请短信回复[T]退订";
    12. // 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
    13. channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
    14. System.out.println(" [x] Sent '" + message + "'");
    15. channel.close();
    16. connection.close();
    17. }
    18. }

    消费者1

    1. public class Recv {
    2. private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列
    3. private final static String EXCHANGE_NAME = "test_direct_exchange";
    4. public static void main(String[] argv) throws Exception {
    5. // 获取到连接
    6. Connection connection = ConnectionUtil.getConnection();
    7. // 获取通道
    8. Channel channel = connection.createChannel();
    9. // 声明队列
    10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11. // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
    12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息
    13. //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
    14. // 定义队列的消费者
    15. DefaultConsumer consumer = new DefaultConsumer(channel) {
    16. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    19. byte[] body) throws IOException {
    20. // body 即消息体
    21. String msg = new String(body);
    22. System.out.println(" [短信服务] received : " + msg + "!");
    23. }
    24. };
    25. // 监听队列,自动ACK
    26. channel.basicConsume(QUEUE_NAME, true, consumer);
    27. }
    28. }

    消费者2

    1. public class Recv2 {
    2. private final static String QUEUE_NAME = "direct_exchange_queue_email";//邮件队列
    3. private final static String EXCHANGE_NAME = "test_direct_exchange";
    4. public static void main(String[] argv) throws Exception {
    5. // 获取到连接
    6. Connection connection = ConnectionUtil.getConnection();
    7. // 获取通道
    8. Channel channel = connection.createChannel();
    9. // 声明队列
    10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11. // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
    12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息
    13. // 定义队列的消费者
    14. DefaultConsumer consumer = new DefaultConsumer(channel) {
    15. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    16. @Override
    17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    18. byte[] body) throws IOException {
    19. // body 即消息体
    20. String msg = new String(body);
    21. System.out.println(" [邮件服务] received : " + msg + "!");
    22. }
    23. };
    24. // 监听队列,自动ACK
    25. channel.basicConsume(QUEUE_NAME, true, consumer);
    26. }
    27. }

    我们发送sms的RoutingKey,发现结果:只有指定短信的消费者1收到消息了

     

    ⑤Topics 通配符模式(交换机类型:topics)

    Topics模型示意图:

    每个消费者监听自己的队列,并且设置带统配符的routingkey,生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列。

    Routingkey一般都是有一个或者多个单词组成,多个单词之间以“.”分割,例如:inform.sms

    通配符规则:

    #:匹配一个或多个词

    *:匹配不多不少恰好1个词

    举例:

    audit.#:能够匹配audit.irs.corporate 或者 audit.irs

    audit.*:只能匹配audit.irs

    从示意图可知,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的Routing key发送。路由关键字中的第一个单词将描述速度,第二个颜色和第三个种类:“<speed>.<color>.<species>”。

    我们创建了三个绑定:Q1绑定了“*.orange.*”,Q2绑定了“.*.*.rabbit”和“lazy.#”。

    Q1匹配所有的橙色动物。

    Q2匹配关于兔子以及懒惰动物的消息。

    下面做个小练习,假如生产者发送如下消息,会进入哪个队列:

    quick.orange.rabbit       Q1 Q2   routingKey="quick.orange.rabbit"的消息会同时路由到Q1与Q2

    lazy.orange.elephant    Q1 Q2

    quick.orange.fox           Q1

    lazy.pink.rabbit              Q2  (值得注意的是,虽然这个routingKey与Q2的两个bindingKey都匹配,但是只会投递Q2一次)

    quick.brown.fox            不匹配任意队列,被丢弃

    quick.orange.male.rabbit   不匹配任意队列,被丢弃

    orange         不匹配任意队列,被丢弃

    下面我们以指定Routing key="quick.orange.rabbit"为例,验证上面的答案

    生产者

    1. public class Send {
    2. private final static String EXCHANGE_NAME = "test_topic_exchange";
    3. public static void main(String[] argv) throws Exception {
    4. // 获取到连接
    5. Connection connection = ConnectionUtil.getConnection();
    6. // 获取通道
    7. Channel channel = connection.createChannel();
    8. // 声明exchange,指定类型为topic
    9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    10. // 消息内容
    11. String message = "这是一只行动迅速的橙色的兔子";
    12. // 发送消息,并且指定routing key为:quick.orange.rabbit
    13. channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
    14. System.out.println(" [动物描述:] Sent '" + message + "'");
    15. channel.close();
    16. connection.close();
    17. }
    18. }

    消费者1

    1. public class Recv {
    2. private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
    3. private final static String EXCHANGE_NAME = "test_topic_exchange";
    4. public static void main(String[] argv) throws Exception {
    5. // 获取到连接
    6. Connection connection = ConnectionUtil.getConnection();
    7. // 获取通道
    8. Channel channel = connection.createChannel();
    9. // 声明队列
    10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11. // 绑定队列到交换机,同时指定需要订阅的routing key。订阅所有的橙色动物
    12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
    13. // 定义队列的消费者
    14. DefaultConsumer consumer = new DefaultConsumer(channel) {
    15. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    16. @Override
    17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    18. byte[] body) throws IOException {
    19. // body 即消息体
    20. String msg = new String(body);
    21. System.out.println(" [消费者1] received : " + msg + "!");
    22. }
    23. };
    24. // 监听队列,自动ACK
    25. channel.basicConsume(QUEUE_NAME, true, consumer);
    26. }
    27. }

     消费者2

    1. public class Recv2 {
    2. private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
    3. private final static String EXCHANGE_NAME = "test_topic_exchange";
    4. public static void main(String[] argv) throws Exception {
    5. // 获取到连接
    6. Connection connection = ConnectionUtil.getConnection();
    7. // 获取通道
    8. Channel channel = connection.createChannel();
    9. // 声明队列
    10. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    11. // 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息
    12. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
    13. channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
    14. // 定义队列的消费者
    15. DefaultConsumer consumer = new DefaultConsumer(channel) {
    16. // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
    19. byte[] body) throws IOException {
    20. // body 即消息体
    21. String msg = new String(body);
    22. System.out.println(" [消费者2] received : " + msg + "!");
    23. }
    24. };
    25. // 监听队列,自动ACK
    26. channel.basicConsume(QUEUE_NAME, true, consumer);
    27. }
    28. }

    结果C1、C2是都接收到消息了:

     ⑥RPC

    RPC模型示意图:

    基本概念:

    Callback queue 回调队列客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。

    Correlation id 关联标识客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

    流程说明

    • 当客户端启动的时候,它创建一个匿名独享的回调队列。
    • 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
    • 将请求发送到一个 rpc_queue 队列中。
    • 服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
    • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用

    分享两道面试题:

    面试题:

    避免消息堆积?

    1) 采用workqueue,多个消费者监听同一队列。

    2)接收到消息以后,而是通过线程池,异步消费。

    如何避免消息丢失?

    1) 消费者的ACK机制。可以防止消费者丢失消息。

    但是,如果在消费者消费之前,MQ就宕机了,消息就没了?

    2)可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化

    交换机持久化

    队列持久化

    消息持久化

     

     Spring整合RibbitMQ

    下面还是模拟注册服务当用户注册成功后,向短信和邮件服务推送消息的场景

    搭建SpringBoot环境

    创建两个工程 mq-rabbitmq-producer和mq-rabbitmq-consumer,分别配置1、2、3(第三步本例消费者用注解形式,可以不用配)

    1、添加AMQP的启动器:

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>
    5. <dependency>
    6. <groupId>org.springframework.boot</groupId>
    7. <artifactId>spring‐boot‐starter‐test</artifactId>
    8. </dependency>

    2、在application.yml中添加RabbitMQ的配置:  

    1. server:
    2. port: 10086
    3. spring:
    4. application:
    5. name: mq-rabbitmq-producer
    6. rabbitmq:
    7. host: 192.168.1.103
    8. port: 5672
    9. username: kavito
    10. password: 123456
    11. virtualHost: /kavito
    12. template:
    13. retry:
    14. enabled: true
    15. initial-interval: 10000ms
    16. max-interval: 300000ms
    17. multiplier: 2
    18. exchange: topic.exchange
    19. publisher-confirms: true

    属性说明:

    • template:有关AmqpTemplate的配置
      • retry:失败重试
        • enabled:开启失败重试
        • initial-interval:第一次重试的间隔时长
        • max-interval:最长重试间隔,超过这个间隔将不再重试
        • multiplier:下次重试间隔的倍数,此处是2即下次重试间隔是上次的2倍
      • exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个
    • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

    当然如果consumer只是接收消息而不发送,就不用配置template相关内容。

     3、定义RabbitConfig配置类,配置Exchange、Queue、及绑定交换机。

    1. @Configuration
    2. public class RabbitmqConfig {
    3. public static final String QUEUE_EMAIL = "queue_email";//email队列
    4. public static final String QUEUE_SMS = "queue_sms";//sms队列
    5. public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
    6. public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
    7. public static final String ROUTINGKEY_SMS="topic.#.sms.#";
    8. //声明交换机
    9. @Bean(EXCHANGE_NAME)
    10. public Exchange exchange(){
    11. //durable(true) 持久化,mq重启之后交换机还在
    12. return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    13. }
    14. //声明email队列
    15. /*
    16. * new Queue(QUEUE_EMAIL,true,false,false)
    17. * durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
    18. * auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
    19. * exclusive 表示该消息队列是否只在当前connection生效,默认是false
    20. */
    21. @Bean(QUEUE_EMAIL)
    22. public Queue emailQueue(){
    23. return new Queue(QUEUE_EMAIL);
    24. }
    25. //声明sms队列
    26. @Bean(QUEUE_SMS)
    27. public Queue smsQueue(){
    28. return new Queue(QUEUE_SMS);
    29. }
    30. //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    31. @Bean
    32. public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
    33. @Qualifier(EXCHANGE_NAME) Exchange exchange){
    34. return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    35. }
    36. //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    37. @Bean
    38. public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
    39. @Qualifier(EXCHANGE_NAME) Exchange exchange){
    40. return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    41. }
    42. }

    生产者(mq-rabbitmq-producer)

    为了方便测试,我直接把生产者代码放工程测试类:发送routing key是"topic.sms.email"的消息,那么mq-rabbitmq-consumer下那些监听的(与交换机(topic.exchange)绑定,并且订阅的routingkey中匹配了"topic.sms.email"规则的) 队列就会收到消息。

    1. @SpringBootTest
    2. @RunWith(SpringRunner.class)
    3. public class Send {
    4. @Autowired
    5. RabbitTemplate rabbitTemplate;
    6. @Test
    7. public void sendMsgByTopics(){
    8. /**
    9. * 参数:
    10. * 1、交换机名称
    11. * 2、routingKey
    12. * 3、消息内容
    13. */
    14. for (int i=0;i<5;i++){
    15. String message = "恭喜您,注册成功!userid="+i;
    16. rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_NAME,"topic.sms.email",message);
    17. System.out.println(" [x] Sent '" + message + "'");
    18. }
    19. }
    20. }

    运行测试类发送5条消息:

    web管理界面: 可以看到已经创建了交换机以及queue_email、queue_sms 2个队列,并且向这两个队列分别发送了5条消息

     

    消费者(mq-rabbitmq-consumer)

    编写一个监听器组件,通过注解配置消费者队列,以及队列与交换机之间绑定关系。(也可以像生产者那样通过配置类配置)

    在SpringAmqp中,对消息的消费者进行了封装和抽象。一个JavaBean的方法,只要添加@RabbitListener注解,就可以成为了一个消费者。

    1. @Component
    2. public class ReceiveHandler {
    3. //监听邮件队列
    4. @RabbitListener(bindings = @QueueBinding(
    5. value = @Queue(value = "queue_email", durable = "true"),
    6. exchange = @Exchange(
    7. value = "topic.exchange",
    8. ignoreDeclarationExceptions = "true",
    9. type = ExchangeTypes.TOPIC
    10. ),
    11. key = {"topic.#.email.#","email.*"}))
    12. public void rece_email(String msg){
    13. System.out.println(" [邮件服务] received : " + msg + "!");
    14. }
    15. //监听短信队列
    16. @RabbitListener(bindings = @QueueBinding(
    17. value = @Queue(value = "queue_sms", durable = "true"),
    18. exchange = @Exchange(
    19. value = "topic.exchange",
    20. ignoreDeclarationExceptions = "true",
    21. type = ExchangeTypes.TOPIC
    22. ),
    23. key = {"topic.#.sms.#"}))
    24. public void rece_sms(String msg){
    25. System.out.println(" [短信服务] received : " + msg + "!");
    26. }
    27. }

    属性说明:

    • @Componet:类上的注解,注册到Spring容器
    • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:
      • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:
        • value:这个消费者关联的队列。值是@Queue,代表一个队列
        • exchange:队列所绑定的交换机,值是@Exchange类型
        • key:队列和交换机绑定的RoutingKey,可指定多个

    启动mq-rabbitmq-comsumer项目

    ok,邮件服务和短息服务接收到消息后,就可以各自开展自己的业务了。

    Pulsar初入门(一)

    Pulsar初入门(一)


    参考--Apache-Pulsar官网---http://pulsar.apache.org/

    -选择pulsar而不是Kafka的7个原因---https://kafkaesque.io/7-reasons-we-choose-apache-pulsar-over-apache-kafka/

    -选择pulsar而不是Kafka的7个原因--infoQ中文版--https://baijiahao.baidu.com/s?id=1634132982881230076&wfr=spider&for=pc

    -推荐阅读-----CSDN网友/Pulsar官网文档翻译计划参与者--稀有气体--Kafka的时代已经过去了,未来是Pulsar的吗?等系列文章

    简介:

    Apache Pulsar是一个开源的分布式的pub-sub消息系统,最初是雅虎创建的,现在是Apache Software Foundation的一部分。

    关于pulsar:

    1.pulsar函数,使用开发人员友好的API部署轻量级计算逻辑,无需运行自己的流处理引擎

    2.低延迟,耐用-设计用于大规模的低延迟发布(<5ms),具有强大的耐用性保障

    3.持久存储,基于Apache Bookkeeper的持久消息存储。提供写和读操作之间的IO隔离。

    4.生产中证明,Pulsar在雅虎生产超过3年,每秒百万条消息设计百万个主题

    5.地域复制,跨地理位置,异地数据中心复制支持

    6.客户端库。灵活的消息传递模型,支持java,c++,py,Go

    7.水平扩展,水平无缝扩展到数百万个节点

    8多租户,原生支持多租户,支持隔离,验证,授权和配额

    9可操作性,REST Admin API ,用于配置管理,工具和监视。可以部署在本地和k8s上。

    tips:

    >多种订阅模式(独占,共享和failover)--默认独占

    >broker无状态

    >数据老化时,分层存储可以将数据从hot  /   warm存储卸载到cold  /  longterm存储(s3,GCS)

    架构:

    èå²æç»æå¾

    一、Messaging Concepts(消息概念)

    pulsar基于pub-sub模式,类似于Kafka(支持点对点和pub-sub)和其它的消息系统。

    Messages是Pulsar的基本”单位“,它们是生产者向主题发布的内容以及消费者随后从主题中消费多的内容(并在消息处理时确认)。Messages类似于邮政系统中的信件。

    Pulsar的消息构成
    ComponentPurpose
    value  /  data payload消息携带的数据。所有Pulsar消息都携带原始字节,尽管消息数据也可以符合数据的Schemas
    key可以选择使用key标记message,这对 主题压缩 等操作非常有用
    properties用户定义属性的可选 key/value映射
    Producer Name生产者名称(生产者自动给出默认名称,但是你可以给出自己的名称)
    Sequence ID每个Pulsar消息属于其主题上的有序队列。消息的序列是该序列中的排序
    Publish timemessages的发布时间戳(由生产者自动应用)
    event time应用程序可以附加到表示发生事件的消息的可选时间戳,例如处理消息时。如果没有明确设置,则event time为0

    Producer

    模式:

    生产者可以 同步 异步 向broker发送messages

    同步:失败重试

    异步:阻塞队列(可配置)已满,则生产者调用时被阻止或立即失败,具体取决与传递给生产者的参数。

    压缩:

    可以在messages运输过程中压缩,以节省带宽。目前支持:
    LZ4

    ZLIB

    ZSTD

    SNAPPY

    Batching

    如果启用了批处理,则producer在单个请求中累计并发送一批消息。批处理大小由最大消息数和发布延迟定义。

    Consumer

    订阅后才能接收到消息

    模式:

    可以 同步  或 异步 从broker接收消息

    同步:在消息可用之前,将阻止同步接收

    异步:异步接收将立即返回一个未来值 - -例如CompleetableFuture,在java中,一旦新消息可用,他就会完成。

    client:

    客户端为消费者提供了监听器实现,java client提供的是MessageListener接口,在该接口中,只要received收到新消息,就会调用该方法。

    ack

    1.消费者成功接收到消息时,向broker发送确认请求,以便broker丢弃该消息,否则它将存储消息。

    可以逐个累积地确认消息,通过逐个累加确认,消费者只需要确认收到的最后一条消息。直到(包括)提供的消息的流中的所有的消息都不会被重新传递该消费者。

    累计确认不能与 --共享订阅模式--  一起使用,因为共享订阅模式多个消费者可以访问相同的订阅。

    在共享订阅模式下,可以单独确认消息。

    2.消费者不成功消费时,向代理发送否定确认,broker将重发messages。

    消息可以逐个或者累积的被确认否认,这取决有消费模式。

    在独占和failover模式下,消费者只会对它们收到的最后一条消息进行否定确认。

    在共享和key_Shared订阅模式中,您可以单独地否定确认消息。

    3.确认超时

    如果您希望broker自动重发消息,可以采用未确认消息自动传递机制,客户端在acktime内跟踪未确认地消息,redeliver unacknowledged messages在指定确认超时时自动向broker发送请求。

    tips:
    在确认消息超时之前使用否定确认。否定确认控制跟精确地重新传递单个消息,并在消息处理时间超过确认超时时避免无效重传。

     死信主题:

    即消费者无法消费地某些消息,将存储多的地方,其它消息系统,如RabbitMQ支持--通过broker存入,而Kafka没有相关概念,可以自行封装以支持。

    1. // java API Demo
    2. // Apache pulsar
    3. // http://pulsar.apache.org/docs/en/concepts-messaging/
    4. Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
    5. .topic(topic)
    6. .subscriptionName("my-subscription")
    7. .subscriptionType(SubscriptionType.Shared)
    8. .deadLetterPolicy(DeadLetterPolicy.builder()
    9. .maxRedeliverCount(maxRedeliveryCount)
    10. .build())
    11. .subscribe();

    死信主题取决于messagess是否重传。你需要确认消息重新传递方法:否定确认或确认超时。

    再确认超时之前使用否定确认。

    死信主题仅在共享订阅模式下支持

    topics:

    {persistent|non-persistent}://tenant/namespace/topic
    Pulsar topic构成
    Topic name componentDescription
    persistent  /  non-persistent标识了topic的类型。Pulsar支持两种主题:持久性和非持久性(默认持久性主题),持久性主题存在磁盘,而非持久性topic则不会存储到磁盘
    tenant实例中的topic的租户。租户是pulsar对多租户支持的重要组成,可以分散在集群中
    namespacetopic的管理单元,作为相关主题的分组机制。大多数主题配置都在命名空间级别执行,每个租户可以有多个namespace
    topicyopic的最后部分。topic的名字是自由格式的,在Pulsar实例中没有特殊含义

    producer写入不存在的主题时 了会在提供的命名空间下自动创建该主题

    namespace

    namespace是租户中多的逻辑命名法。租户可以通过管理API创建多个namespace,例如,具有不同应用程序的租户可以未每个应用程序创建单独的namespace。namespace允许应用程序创建和管理主题层次结构。主题my-tenant/app1是应用程序namespace app1的my-tenant。可以在命名空间下创建任意数量的主题。

    订阅模式:

    订阅模式是一种命名规则配置工具,用于确定如何将消息传递给使用者。Pulsar有三种可用的订阅模式:独占,共享和failover。这些模式如下图所示。

    è®¢éæ¨¡å¼

    Apache pulsar官网示例图片--http://pulsar.apache.org/docs/en/concepts-messaging/

    独占:

    在独占模式下,只允许一个消费者附加到订阅。如果多个消费尝试使用相同的订阅订阅主题,则消费者会收到错误。

    上面的示例图片中---只允许Consumer-A使用消息。

    独占模式是默认订阅模式

    故障转移:

    在failover模式下,多个使用者可以附加到同一个订阅。消费者按照消费者姓名进行词汇排序,第一个消费者最初将是唯一接收到消息的消费者。此消费者称为主消费者。

    当主消费者断开连接时,所有(非确认的和后续的)消息将被传递给下一个消费者。

    在上图中,Consumer-C-1是主消费者,而如果Consumer-C-1断开连接,Consumer-C-2将成为接收消息的下一个消费者。

    æé转移订é

    共享:

    在共享或round robin模式下,多个消费者可以附加到同一订阅。消息以消费者的循环分发形式传递,并且任何给定的消息仅传递给一个消费者。当消费者断开连接时,发送给它且未被确认的所有消息将被重新安排以便发送给剩余的消费者。

    局限性:
    不保证消息有序

    不能再此模式下使用累加确认

    Key_shared

    在Key_shared模式下,多个消费者可以附加到同一个订阅。消息在消费者的分发中传递,并且具有相同key或者相同排序key的

    消息仅被传递给一个消费者。无论重新传递多少次消息,当链接断开导致消费者更改某些消息的密钥,他都会传递给同一个消费者。

    局限性:

    测试功能(2.4.0)--可在broker.config中禁用它

    需要为消息指定密钥或orderingKey

    不能再此模式下使用累计确认

    Key_Shared订é

    多topic订阅

    消费者订阅Pulsar主题时,默认情况下它订阅一个特定主题,从pulsar1.23.0开始,可以支持多个topic订阅。可以通过两种方式来定义主题列表。

    1,正则  例如  persistent://public/default/finance-.*                       ---主题必须位于同一个namespace下

    2,明确给出topic list

    缺陷:
    没有订购保证  --如果生产条件要求严苛,则不要使用多topic订阅模式

    1. // java api Demo
    2. // Apache Pulsar
    3. // http://pulsar.apache.org/docs/en/concepts-messaging/
    4. import java.util.regex.Pattern;
    5. import org.apache.pulsar.client.api.Consumer;
    6. import org.apache.pulsar.client.api.PulsarClient;
    7. PulsarClient pulsarClient = // Instantiate Pulsar client object
    8. // Subscribe to all topics in a namespace
    9. Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
    10. Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
    11. .topicsPattern(allTopicsInNamespace)
    12. .subscriptionName("subscription-1")
    13. .subscribe();
    14. // Subscribe to a subsets of topics in a namespace, based on regex
    15. Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
    16. Consumer<byte[]> someTopicsConsumer = pulsarClient.newConsumer()
    17. .topicsPattern(someTopicsInNamespace)
    18. .subscriptionName("subscription-1")
    19. .subscribe();

    分区主题:

    普通主题只能由单个broker提供,限制了最大吞吐量。分区主题由多个broker处理的特殊主题模型,允许更高的吞吐量。

    --分区主题实际上实现为N个内部主题,其中N时分区数。将消息发布到分区主题时,每条消息会路由到多个代理,Pulsar自动会处理各个broker中的分区分配。

    路由模式:

    发布到分布主题,必须指定路由模式。默认三个路由模式,默认轮询-和Kafka类似。

    模式描述
    RoundRobinPartition如果未提供key,则以轮询方式往各分区上面发布消息,以实现最大吞吐量 --默认模式
    SinglePartition如果未提供key,则生产者随机选择一个分区并将所有消息发布到此分区。如果指定了key,将对key进行散列(默认javaStringHash=多客户端推荐Murmur3_32Hash)并将消息分配给特定的分区
    CustomPartition使用将调用的自定义消息路由器实现来特定消息的分区。用户在在java clent端实现MessageRouter接口来实现自定义路由模式。

    订购保证:

    消息的排序于消息的路由模式和消息的key有关。

    如果有key,messages被路由到指定的分区,在使用SinglePartition或RoundRobinPartition模式。

    订购保障描述路由模式和key
    Per-key-Partition具有相同message key的所有消息将按顺序排列并放置在同一个分区中。使用SinglePartition或RoundRobinPartition模式,每个消息提供key
    Pre-producer来自同一Producer的所有消息都将按顺序排列使用SinglePartition,并且不为每条消息提供key

    messages保留和到期:

    默认情况下,Pulsar消息代理:
    1.立即删除已经确认的消息

    2.持久性的将所有未确认的消息存储在消息积压中

    但是pulsar,提供了其他的功能可以覆盖此默认行为。

    1,消息保留,存储已经确认的消息

    2,消息到期,可以设置messages的TTL

    messages去重:

    pulsar支持消息去重,可防止不必要的消息的重复。

    Pulsaræ¶æ¯é夿°æ®å é¤

    该消息去重逻辑是由broker实现的。-见下面

    生产者幂等性:

    消息重复数据的删除的另一种可用方法是确保每每条消息只生成一次。这种方法通常称为生产者幂等性。这种方法的缺点是它将消息多的重复数据删除的工作推到了应用程序。在Pulsar中,这是在代理级别处理的。

    重复数据删除和有效一次的语义:

    重复消息的删除使得Pulsar的成为理想的消息传递系统,可与流处理引擎(SPE)和其它系统结合使用,以寻求有效一次处理语义。不提供自动重复消息删除的消息系统需要借助SPE或其它系统来保证重复数据删除,这意味着严格的排序是以重复数据删除的应用程序的负担为代价的。使用pulsar,严格的订购保证不会产生应用级别的成本。