logo头像
Snippet 博客主题

RabbitMQ工作模式之简单模式

本文于514天之前发表,文中内容可能已经过时

今天我们来学习一下RabbitMQ工作模式之简单模式,在学之前我们根据官方文档(纯英文文档,可以装个Chrome插件选中翻译)学习一下RabbitMQ的一些术语,这才能让我们更深入的了解。

准备条件

本教程假定RabbitMQ已在标准端口(5672)上的localhost上安装并运行。如果您使用不同的主机,端口或认证,则需要调整连接设置。

术语介绍

RabbitMQ是一个消息代理中间件:它接受和转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确定邮递员最终会将邮件发送给您的收件人。在这个比喻中,RabbitMQ是一个邮箱,邮局和邮递员只负责中转调度。

RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制blob数据 - 消息。

RabbitMQ和一般的消息传递使用了一些术语:生产者、队列、消费者。

  • 生产者

    发送消息的程序是生产者。

    官网上青色椭圆P则标识消息生产者。

    img

  • 队列

    队列是RabbitMQ中的邮箱的名称。虽然消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中。队列只受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。许多生产者可以发送到一个队列的消息,并且许多消费者可以尝试从一个队列接收数据。官网上红色多格子长方形则标识队列。

    img

  • 消费者

    消费与接受有类似的意义。消费者是一个主要等待接收消息的程序:

img

请注意,生产者、消费者和消息代理不必驻留在同一主机上;实际上在大多数应用中,应用程序也可以是生产者和消费者。

总结:整个过程非常简单,生产者创建消息,消费者接收这些消息。你的应用程序既可以作为生产者向其他应用程序发送消息,也可以作为消费者,等待接收其他应用程序的消息。其中,存储消息的是消息队列,它类似于邮箱,消息通过消息队列进行投递。

案例实战

在本教程的这一部分中,我们将用Java编写两个程序;发送单个消息的生产者,以及接收消息并将其打印出来的消费者。我们将掩盖Java API中的一些细节,专注于这个非常简单的事情,只是为了开始。这是消息传递的“Hello World”。

在下图中,“P”是我们的生产者,“C”是我们的消费者。中间的红框是一个队列 - RabbitMQ代表消费者保留的消息缓冲区。

img

了解到上述概念,现在我们开始动手实践吧!

导入客户端依赖

  1. Maven
1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>

Gradle

1
2
3
dependencies {
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.6.0'
}

导入完依赖后,我们将编写消息发布者(发送者)Send发送和我们的消息消费者(接收者)Recv。发布者将连接到RabbitMQ,发送单个消息,然后退出。

编写消息生产/发生者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package cn.yeamin.actions.simple.produces;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 简单队列的消息生产者
*/
public class Send {

/**
* 设置队列名称
*/
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {

// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接主机名
factory.setHost("localhost");
//设置端口号,不设置默认为5672
factory.setPort(5672);
/*
* 如果可以通过连接工厂创建一个连接,则继续在连接基础上继续创建通道.
* 这里我们可以使用try-with-resources语句,因为Connection和Channel都实现了java.io.Closeable。
* 这样我们就不需要在代码中明确地关闭它们。
*/
try (Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel()) {

/*
* 为了发送消息成功,我们必须声明一个队列供我们发送,队列只声明一次,不可能存在重复队列
* 参数1: queue表示队列名称
* 参数2: durable表示是否持久化
* 参数3: exclusive表示仅创建者可以使用的私有队列,断开后自动删除
* 参数4: autoDelete表示当所有消费客户端连接断开后,是否自动删除队列
* 参数5: arguments表示其他的构造参数,为队列构造而准备
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 要发送的消息
String message = "Hello World, lt, Welcome RabbitMQ!";
/*
* 最基本的消息发送
* 参数1: exchange表示交换机
* 参数2: routingKey表示路由Key
* 参数3: props表示息的其他参数
* 参数4: autoDelete表示当所有消费客户端连接断开后,是否自动删除队列
* 参数5: body表示息体,是个字节数组,意味着可以传递任何数据
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//消息发送方的日志打印
System.out.println(" [x] Sent '" + message + "'");
}
}
}

声明队列是幂等的 ,只有在它不存在的情况下才会创建它。消息内容是一个字节数组,因此您可以发送任何数据。

编写消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.yeamin.actions.simple.consumers;

import com.rabbitmq.client.*;

/**
* 简单队列的消息消费者
*/
public class Recv {

private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ主机名
factory.setHost("localhost");
// 新建连接
Connection connection = factory.newConnection();
// 新建通道
Channel channel = connection.createChannel();
//绑定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 创建消费者,消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};

/**
*
* 消费者消费
* 参数1 :queue队列名
* 参数2 :autoAck 是否自动ACK
* 参数3 :callback消费者对象的一个接口,用来配置回调
*
*/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

为什么我们不使用try-with-resource语句来自动关闭通道和连接?通过这样做,我们只需让程序继续运行,关闭所有内容,然后退出!这将是尴尬的,因为我们希望在消费者异步监听消息到达时,该进程保持活动状态。

我们即将告诉服务器从队列中传递消息。因为它会异步地向我们发送消息,所以我们以对象的形式提供一个回调,它将缓冲消息,直到我们准备好使用它们。这就是DeliverCallback子类的作用。

消息生产与发送

我们先执行消息发送者,执行完成后如下图:

发送消息

紧接着,我们进行消息消费,执行完成后如下图:

消费消息

当消息生产者一直发送时,我们可以得到如下结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
15:17:00: Executing task 'Recv.main()'...

> Task :compileJava UP-TO-DATE
> Task :processResources NO-SOURCE
> Task :classes UP-TO-DATE

> Task :Recv.main()
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World, litong, Welcome RabbitMQ!'
[x] Received 'Hello World, lt, Welcome RabbitMQ!'

源码地址

https://github.com/ltyeamin/RabbitMQ-Action/tree/master/patterns

参考资料

支付宝打赏 微信打赏

请作者喝杯咖啡吧