Hello, 大家好,我是一名在互联网捡破烂的程序员
在上一期呢,我们讲到了工作队列的使用,还没有打怪升级的小伙伴先去修炼哦
今天呢,我们要继续打怪升级哦。
今天我们来讲一讲比较高级的消息方式,嗯,我想一下,是什么呢?
哦,我想起来了,今天我们来讲一讲发布/订阅模式,那么开始我们的表演。。。
一、开篇前提
在上一篇中我们讲到了 工作队列 ,生产者生产一堆消息,发布到MQ中,创建的多个消费者自动消费消息。
缺点是什么呢?在此过程中,我们的消费者都是做的同一个工作,如果工作有很多不同的种类,那我们该咋办呢?如果我们想多个消费者消费同一个消息呢?
哈哈哈,没有什么可以难到我们的,我们要有坚持的精神,不管在什么时候,我们都要有求知欲的信念。我们一定要质问到底。
好了,这里我们就扯远了,我们回归正题
那么我们怎么解决?
这就要使用到我们的另外一种模式:发布/订阅模式
什么是发布/订阅模式呢?
概念:
概念其实很简单:我们需要做不同的事情—我们将向多个消费者发送消息,可以发布同一条消息也可以发布不同的消息。这种模式就叫做 发布/订阅模式
场景假设:
我们在注册过程中,一般注册成功后,会发送消息通知用户注册成功(失败)。如果在一个系统中,用户注册有邮箱和手机号码,如果手机号码用来接收验证码,邮箱用来通知用户注册的结果。利用MQ实现业务异步处理,如果使用工作队列的话,就会注册信息队列。注册完成之后生产者会向队列提交一条注册数据,消费者取出数据同时向邮箱和手机号码发送信息。但是实际上邮箱和手机号信息发送是两个不同的业务逻辑,不应该放在一起处理。
解决方案
这时候我们就需要利用发布/订阅模式将信息发送到转换机(EXCHANGE),声明两个不同的队列(邮箱,手机),并绑定在交换机上。这样生产者只需要向交换机(EXCHANGE)发送消息,两个队列都会接收到消息发送给消费者。
二、图例展示
上面的图是来自官方的发布/订阅模式图例
- P:消息的生产者
- X:交换机
- 红色:队列
- C1,C2:消息消费者
三、生产者
package com.codeworld.rabbitmqdemo.ps; |
代码解释:
1.在发布/订阅模式中,我们又引入了一个新的概念
EXCHANGE(交换机)
其实,交换机理解起来很简单,它的一端是接收生产者发送过来的消息,一端将消息发送给队列。所以,当交换机接收到一个消息时,它必须做出相应的处理,是发送给一个特定的队列?还是发给多个队列?还是丢弃?
在前两期中,我们都是往队列中发送消息和获取消息。
回忆一下我们前两张学习的内容
- 一个生产者用来发送消息
- 一个队列缓存消息
- 一个或多个消费者消费队列中的消息
然而,RabbitMQ的消息模式核心思想并不是那样的。。。
核心思想是:
生产者从不将任何消息直接发送到队列上。实际上,生产者根本不知道它发送的消息将被转发到那些队列
实际上,生产者只能把消息发送给交换机(exchange),exchange只做一件简单的事情:一方面接收生产者发送过来的消息,另一方面将接收的消息发送给队列。一个exchange必须清楚的知道如何处理一条消息
类型
- direct
- topic
- headers
- fanout
我们使用是最后一种,fanout广播模式
// 声明交换机(分发:发布/订阅模式) |
广播模式很简单。正如你可能从名称中猜到的那样,它只是将它接收的所有消息广播到它知道的队列。这正需要我们的模式
2.发布/订阅模式有几点和工作队列不一致
在生产者中我们没有声明队列名称,而是声明了一个交换机名称
在工作队列的生产者中我们是这样做的
/** |
在发布/订阅模式中我们只定义了一个交换机名称
/** |
这样的好处是我们只需要将我们的消息发送到交换机中,交换机再将消息发送到绑定的队列上
// 发送消息 |
这一步就很好的证明了。。。
四、消费者
先直接上代码
- 消费者1
package com.codeworld.rabbitmqdemo.ps; |
- 消费者2
package com.codeworld.rabbitmqdemo.ps; |
代码解释:
1.在消费者中我们只需要定义交换机名称和对应的队列名称
消费者1:
/** |
我们这里定义的交换机和生产者中的交换机名称EXCHANGE_NAME = "ps_exchange"
保持一致,也就是说我们要把队列QUEUE_NAME = "ps_email"
绑定到交换机上面
消费者2:和消费者1一样,只是队列名称不一样,这样我们就可以监听不同的队列
/** |
2. 声明交换机类型,这里我们就只列出一种消费者
// 声明交换机(发布/订阅模式) |
3. 声明队列
// 声明队列 |
// 在这里我们没有使用队列的持久化
4. 将队列绑定到交换机上
// 将队列绑定到交换机上 |
五、补充(参照官网— 发布/订阅模式)
在我们的消费者中,如果我们不想手动命名我们的队列,我们可以使用临时队列
你是否还记得我们在 RabbitMQ基本使用三(工作队列) 使用具体的特定队列名称,创建队列名称对我们来说是至关重要的-我们需要将消费者指向同一个队列。如果要在生产者和使用者之间共享队列,则为队列指定名称非常重要。
但是,我们希望听到所有的消息,而不是仅仅是它们的部分信息。我们也只当前流动的消息感兴趣,而不是旧消息中。要解决这个问题,我们需要两件事
- 首先,每当我们连接到 RabbitMQ,我们需要一个新的、空的队列。为此,我们可以创建一个随机名称的队列,或者—让服务器为我们选择一个随机队列名称
- 一旦我们断开使用者,队列应该自动删除
那么我们应该怎么使用方法呢?
在Java中,当我们不向queueDeclare()提供任何参数时,我们会创建一个非持久、独占的自动删除队列,该队列的名称生成方式:
String queueName = channel.queueDeclare().getQueue(); |
此时queueName
包含一个随机队列名称,例如:它可能看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg
将生成的队列名称绑定到交换机上
六、截图
交换机
队列
交换机绑定的队列
运行结果截图
生产者
消费者1
消费者2
七、结束
OK,在这里我们的发布/订阅模式就到这里了,不知道你有没有帮助到你呢?
若没有看懂请前往官网 RabbitMQ
看到这里的人都是技术人才,你的浏览就是我前进的动力
欢迎加入QQ群: