原文地址:https://programmingpercy.tech/blog/event-driven-architecture-using-rabbitmq/
★
文章较长,请按照步骤操作,强化理解
”
事件驱动架构 (EDA) 是我在编程中最喜欢的东西之一。这种架构使我们能够构建微服务并轻松地在它们之间共享信息。在常规的顺序软件中,您会有一个函数触发另一个函数或一个重复出现的脚本来检查某些事情要做。 在事件驱动架构中,我们改为利用队列或发布/订阅架构。允许不同的服务在它们之间通知或发送信息以触发代码执行。事件驱动架构通常用于构建高度灵活和可扩展的软件。这是因为能够通过简单地使用新服务侦听事件来轻松添加或删除功能。它还使得在生产的同时影子部署和测试新服务变得非常容易,因为您可以让新服务对相同的事件做出反应,而不会干扰正在运行的系统。 然而,并非全是阳光和玫瑰,一些人认为 EDA 系统稍微复杂一些,有时在考虑完整的服务流时更难测试。我会说测试更容易,因为我们可以轻松触发事件并查看相关服务或单个服务的反应。但是如果没有适当的架构文档,也很难理解是什么触发了什么以及为什么触发。 本教程将介绍如何使用 RabbitMQ 构建两个使用事件进行通信的微服务。我们将研究 RabbitMQ 中使用的不同范例,同时我们将学习如何在 Go 中使用 RabbitMQ,我们将主要关注 RabbitMQ 的概念。涵盖一些常见错误和一些要遵循的最佳实践。RabbitMQ 支持多种协议发送数据,但在本教程中,我们将重点介绍使用 AMQP。 在本教程的过程中,我们将学习以下内容:
-
使用 Docker 设置 RabbitMQ虚拟主机、用户和权限 -
使用带有 rabbitmqctl 和 rabbitmqadmin 的 CLI -
管理 RabbitmQ了解生产者、消费者以及如何编写它们。 -
了解队列、交换器和绑定 -
使用工作队列(先进先出) -
将发布/订阅与 RabbitMQ 结合使用 -
使用基于 RPC 的模式和回调。 -
使用 TLS 加密流量 -
使用配置在 RabbitMQ 中声明资源
文章中使用的所有代码都可以在 Github 上找到。
安装 RabbitMQ – 设置用户和虚拟主机及权限
可以按照下载和安装中的示例来启动和运行 RabbitMQ。我建议在投入生产时遵循该指南,但对于本教程和实验,我们可以使用更简单的方法。 一如既往,最简单的方法是运行 Docker! 此命令将下载最新的 RabbitMQ 并将其作为后台进程启动,暴露端口 5672 和 15672。
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.11-management
端口 5672 用于启用 AMQP 连接。AMQP 是 RabbitMQ 和许多其他消息代理使用的网络协议。 端口 15672 :管理 UI 和管理 UI 托管在该端口上,管理 RabbitMQ 的 API 也位于该端口上。有关端口的更多详细信息,您可以参考 RabbitMQ 的网络指南。 docker 启动后,让我们首先访问托管在 localhost:15672 的管理 UI。 糟糕,我们需要一个用户!让我们使用 RabbitMQCLI 创建一个。不要担心安装它,它存在于我们运行的 Docker 容器中。 我们可以使用命令 add_user 创建一个新用户,命令后跟用户名和密码。我们使用 docker exec rabbitmq 来执行 docker 内部的命令,将 rabbitmq 替换为您为 docker 容器指定的名称。
docker exec rabbitmq rabbitmqctl add_user percy secret
我建议在探索期间也授予管理员访问权限。我们可以通过为新用户添加管理员标签来做到这一点。
docker exec rabbitmq rabbitmqctl set_user_tags percy administrator
哦,最后一件事,默认有一个访客用户,我强烈建议删除这个用户!该用户仅对使用 localhost 的用户可用,删除会更加安装。
docker exec rabbitmq rabbitmqctl delete_user guest
就是这样,回到管理用户界面并登录。登录后,您会看到一个相当古老的 UI,但它很棒,因为我们可以从这里真正监控 RabbitMQ,并查看发生了什么。我们还不会摆弄 UI,我们首先需要一个实际连接和发送数据的服务。在我们开始破解之前,我们需要再解决两件事。 RabbitMQ 中的资源,例如Queue和其他东西被一个逻辑层分隔开,这个逻辑层称为虚拟主机(Vhost)。解释虚拟主机的最简单方法是将其与命名空间进行比较,但在某些方面可能是错误的。我们可以使用这些虚拟主机将某些资源组合在一起,并通过添加允许使用虚拟主机的用户来限制访问。让我们从使用 add_vhost 命令创建虚拟主机开始,它接受一个输入,即虚拟主机的名称。
docker exec rabbitmq rabbitmqctl add_vhost customers
现在我们有了虚拟主机,我们可以为之前创建的用户添加权限,以便允许连接。 添加权限是使用** set_permissions 命令完成的,我们使用 -p 标志指定要添加权限的虚拟主机。语法中的下一项是要添加权限的用户。 命令的最后一部分是可怕的部分,它是一个定义要添加什么权限的正则表达式**,一个添加所有权限的示例可能看起来像,或者以 customer- 开头的所有资源的权限将是“^customer-*”。将有 3 个正则表达式插槽,按顺序配置以下权限。
-
Conf – 配置资源匹配正则表达式的权限 -
Write – 在匹配正则表达式的资源上写入的权限 -
Read – 读取与正则表达式匹配的资源的权限
为我的用户 percy 在客户虚拟主机上添加配置、写入和读取的完全访问权限的完整命令如下所示。请注意,我授予对 .* 的访问权限,这就是一切。
docker exec rabbitmq rabbitmqctl set_permissions -p customers percy ".*" ".*" ".*"
RabbitMQ 基础- Producers, Consumers, Exchanges, and Queues
-
Producer – 任何发送消息的软件。消费者 – 任何正在接收消息的软件。 -
Queue – Queue 接受 Message,输出消息,可以把它想象成一个大的 Buffer。Queue是 FIFO(先进先出),这意味着消息按照它们插入队列的顺序输出。 -
Exchange – An Router,是Producers发送消息的地方。交换接受消息并将它们发送到正确的队列,具体取决于交换的类型和应用的绑定(规则)。
总体思路是我们可以使用它在服务之间发送和接收消息。值得一提的是,生产者和消费者不必在同一台主机上运行,因此可以很好地扩展。首先用 Go 创建一个新项目,如果你没有安装 Go,请从这里安装它。 让我们在 Go 中构建一个 Producer,它可以开始在 Queue 上发送消息。首先为生产者创建一个新项目,并获取由 RabbitMQ 团队官方维护的 AMQP 库。 该项目将有一个 cmd 文件夹,该文件夹将包含所有不同的服务,每个服务都是可运行的。我们还将有一个内部文件夹,用于存储共享库等。
mkdir eventdriven
cd eventdriven
mkdir -p cmd/producer
mkdir internal
touch cmd/producer/main.go
touch internal/rabbitmq.go
go mod init programmingpercy.tech/eventdrivenrabbit
go get github.com/rabbitmq/amqp091-go
您的文件夹结构应如下所示。让我们首先在 internal/rabbitmq.go 中添加到 RabbitMQ 实例的连接。 我们将创建一个小的辅助函数,它将使用 amqp 协议连接到 RabbitMQ。我们将允许用户指定凭据和主机,以及要连接的虚拟主机。 我将简单地返回指向连接的指针,即网络连接和用于并发发送消息的 amqp.Channel。将其留给用户来管理连接。
package internal
import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
// RabbitClient is used to keep track of the RabbitMQ connection
type RabbitClient struct {
// The connection that is used
conn *amqp.Connection
// The channel that processes/sends Messages
ch *amqp.Channel
}
// ConnectRabbitMQ will spawn a Connection
func ConnectRabbitMQ(username, password, host, vhost string) (*amqp.Connection, error) {
// Setup the Connection to RabbitMQ host using AMQP
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s/%s", username, password, host, vhost))
if err != nil {
return nil, err
}
return conn, nil
}
// NewRabbitMQClient will connect and return a Rabbitclient with an open connection
// Accepts a amqp Connection to be reused, to avoid spawning one TCP connection per concurrent client
func NewRabbitMQClient(conn *amqp.Connection) (RabbitClient, error) {
// 唯一的并发服务器通道来处理/发送消息
// 一个好的经验法则是始终在应用程序之间重复使用 Conn
// 但每个例程生成一个新的通道
ch, err := conn.Channel()
if err != nil {
return RabbitClient{}, err
}
return RabbitClient{
conn: conn,
ch: ch,
}, nil
}
// Close will close the channel
func (rc RabbitClient) Close() error {
return rc.ch.Close()
}
一个非常好的经验法则是在整个应用程序中重用单个连接并为并发任务生成新的通道。这样做的原因是一个Connection是一个TCP连接,而一个channel是分配的TCP Connection中的一个多路复用连接。遵循这一经验法则可以实现更具可扩展性的解决方案。 让我们将这个简单的客户端导入 cmd/producer/main.go 并尝试连接看看会发生什么。 现在,我们将简单地连接并在关闭连接之前休眠 30 秒。
package main
import (
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn)
if err != nil {
panic(err)
}
defer client.Close()
time.Sleep(30 * time.Second)
log.Println(client)
}
一旦我们准备就绪,运行生产者。
go run cmd/producer/main.go
运行后,导航回管理 UI 并看到我们实际上可以看到现在有 1 个连接和 1 个通道。通道是处理 TCP 层的一种非常聪明的方式,您可以在文档中阅读更多相关信息。它允许用户使用多个 Channels 重用一个打开的 TCP 连接,而不是打开许多 TCP 连接。这是一种多路复用技术。 是时候开始发送数据了,这是在上述通道上完成的。通道的含义比人们想象的要多,它不仅仅是一个愚蠢的管道,实际上有一些漂亮的选项可以在创建它们时进行配置。 我们可以从 UI 创建队列,但我喜欢在测试时在代码中创建它们。在生产环境中,我喜欢有一个配置文件来声明一些基本设置,我们稍后会研究它。 我们可以通过调用 amqp.QueueDeclare 创建一个 Queue,这个函数有很多输入参数,我们需要了解这些参数才能获得 Queue 的所需行为。签名看起来像这样。
func (*amqp.Channel).QueueDeclare(name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table) (amqp.Queue, error)
-
Name- 用于引用Queue的名称。这实际上可以为空,在这种情况下,服务器将生成一个名称。 -
Durable- 如果 Queue 应该在 Broker 重启(RabbitMQ 重启)后持久化 -
AutoDelete- 如果Queue应该在最后一个Consumer离开时自动删除 -
Exclusive- 仅适用于创建队列的同一个连接。 -
NoWait- 假设队列是在服务器上创建的 -
Arguments- 提供用户提供的参数的选项。
为了使这更容易一些,我将创建一个包装函数,它接受名称、持久和自动删除参数。默认情况下,我会让其他处于禁用状态。
// CreateQueue will create a new queue based on given cfgs
func (rc RabbitClient) CreateQueue(queueName string, durable, autodelete bool) error {
_, err := rc.ch.QueueDeclare(queueName, durable, autodelete, false, false, nil)
return err
}
让我们更新 producer/main.go 以执行新的 CreateQueue 函数,我将创建一个持久队列,因为我希望处理新客户的队列保持活跃和持久,我还将自动删除设置为 false。 我还将创建一个名为 customers_test 的非持久队列来展示差异。
func main() {
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn)
if err != nil {
panic(err)
}
defer client.Close()
if err := client.CreateQueue("customers_created", true, false); err != nil {
panic(err)
}
if err := client.CreateQueue("customers_test", false, true); err != nil {
panic(err)
}
time.Sleep(10 *time.Second)
log.Println(client)
}
添加之后,确保执行生产者。
go run cmd/producer/main.go
您可以访问 UI 并查看应该都可用的队列。请注意,一旦程序存在,customers_test 队列就不允许被删除,这是因为我们还没有 Consumer 连接。只有具有消费者连接的队列才会被删除。为了好玩,您现在可以尝试重新启动 RabbitMQ,看看 customers_test 是如何消失的,因为它没有被标记为持久的。
探索交换和绑定
在我们开始在队列上发送消息之前,我们需要创建一个 Exchange。已经创建了一些默认值,但我们将创建自己的默认值以更多地了解它们。 Exchange 是 RabbitMQ 的重要组成部分,它是我们向其发送消息的资源。交换器的工作是将消息传递到正确的队列。 要开始在队列上接收消息,该队列需要绑定到 Exchange,这称为绑定。绑定基本上是一个路由规则。需要理解的一件重要事情是,队列可以绑定到多个Exchange,这使得为什么也有不同的Exchange类型变得更加清楚。 有几种不同类型的交换,每一种都有不同的消息发送方式。 首先,我们有最基本的,直接交换。这个非常简单,消息是根据它们的 EXACT 路由键来路由的。在示例图像中,我们看到发送到** customer_created** 的消息如何仅由交换 customer_events 路由到该特定队列。当您需要将工作分配给一组工作人员时,直接交流非常有用。第二种类型是扇出,用于将消息发送到所有绑定队列。任何绑定到此Exchange的队列都会收到消息,路由键被忽略!这通常用于向任何感兴趣的各方广播消息。
然后我们有Topic交换,这曾经很酷。它们允许绑定指定一个规则来根据它们的Exchange选择发送的消息的子集。 路由键被`’.’分隔在每个单词之间,比如 customers.eu.stockholm。这可能是来自瑞典斯德哥尔摩的客户的路由键,然后我们可以有一个绑定告诉 Exchange 队列需要这些消息,但不是 customers.us.florida。 有一些特殊字符,# 表示零个或多个匹配项,例如 customers.# 将匹配以 customers.. 开头的任何路由键。还有 * 是特定位置的特定词,例如 customers.*.stockholm 只会匹配具有第一个词 customers 和第三个词 stockholm 的路由键。 这当然非常有用,因为某些服务只接收与主题子集相关的消息。下面的示例显示了如何在 2 月创建新客户,队列 customer_created 收到消息,因为绑定是针对 customers.created.# 规则的,所以队列 customer_emailed 没有收到消息,因为它与绑定的 customers.created 不匹配。
最后的Exchange是Header Exchange ,我们在RabbitMQ上发送的每条Message都有可能加上Headers,这是一个键值域。当我们需要基于更高级的级别进行路由时,这会非常方便。 假设我们添加了一个浏览器标头,指示用户在注册时使用的 Web 浏览器。例如,我们可以将任何 Linux 用户路由到某个队列。您可以指定多个标头并且它们都必须匹配,或者只有一个标头必须匹配。这是通过将 x-match 设置为 all 或 any 在绑定中完成的。
让我们停止讨论,而是创建一个我们可以使用的 Exchange。 要添加交换,我们将使用 rabbitmqadmin CLI 工具,它与我们之前使用的 rabbitmqctl 非常相似。 我们使用 declare exchange 命令,后跟交换名称和类型。对于本教程,我将使用Topic Exchange。我们将创建一个名为 customer-events 的交换。我们还需要指定虚拟主机以及管理员的用户名和密码。如果您希望持久重启,请记住将持久性设置为 true。
docker exec rabbitmq rabbitmqadmin declare exchange --vhost=customers name=customer_events type=topic -u percy -p secret durable=true
我们还需要授予用户发送此交换的权限。我们使用 set_topic_permissions 命令设置对某个主题的权限。以下命令将用户 percy 设置为允许在以 customers 开头的任何路由键上的交换 customer_events 上的 vhost customers 上发布。
docker exec rabbitmq rabbitmqctl set_topic_permissions -p customers percy customer_events "^customers.*" "^customers.*"
现在在这个Exchange上发布不会发生任何事情,因为我们在队列和exchange之间没有绑定。发送的任何消息都将被丢弃。
发送数据到Exchanges
要开始发布消息,我们首先需要在 customers_created 和 customers_test 队列以及 customers_events 交换之间创建绑定。打开 rabbitmq.go 并添加一个添加绑定的 CreateBinding 函数。
// CreateBinding is used to connect a queue to an Exchange using the binding rule
func (rc RabbitClient) CreateBinding(name, binding, exchange string) error {
// leaveing nowait false, having nowait set to false wctxill cause the channel to return an error and close if it cannot bind
// the final argument is the extra headers, but we wont be doing that now
return rc.ch.QueueBind(name, binding, exchange, false, nil)
}
然后在 producer/main.go 中我们添加绑定以便连接所有内容。我们希望客户发布的主题是 customers.created,然后是他们来自的国家/地区。但是 Binding 不会关心国家,只关心它是否匹配模式。
...
// Create binding between the customer_events exchange and the customers-created queue
if err := client.CreateBinding("customers-created", "customers.created.*", "customer_events"); err != nil {
panic(err)
}
// Create binding between the customer_events exchange and the customers-test queue
if err := client.CreateBinding("customers-test", "customers.*", "customer_events"); err != nil {
panic(err)
}
如果您执行生产者一次,我们就可以访问管理 UI 并查看可用的绑定。
go run cmd/producer/main.go
然后进入 UI 并访问您的 Exchange。现在我们有了绑定,我们可以看看发布消息了。我们从最简单的开始。我们创建了一个名为 Send 的包装函数,它接受有关交换的参数,以及要发布到的路由密钥。 该函数还将接受要使用的上下文和 amqp.Publishing 结构。amqp.Publishing 结构对于理解至关重要,因为它允许我们自定义我们发送的消息的特性和行为。我们将逐步探索它们,因为它们很多。
// Send is used to publish a payload onto an exchange with a given routingkey
func (rc RabbitClient) Send(ctx context.Context, exchange, routingKey string, options amqp.Publishing) error {
return rc.ch.PublishWithContext(ctx,
exchange, // exchange
routingKey, // routing key
// Mandatory is used when we HAVE to have the message return an error, if there is no route or queue then
// setting this to true will make the message bounce back
// If this is False, and the message fails to deliver, it will be dropped
true, // mandatory
// immediate Removed in MQ 3 or up https://blog.rabbitmq.com/posts/2012/11/breaking-things-with-rabbitmq-3-0§
false, // immediate
options, // amqp publishing struct
)
}
返回 producer/main.go ,我们将创建一条要发送的消息。我们将发送两条消息,每个队列一条。这是为了展示非常重要的 deliveryMode 参数。如果将其设置为持久化,消息将一直保存到某些消费者获取它为止,但这会带来开销和更长的延迟。如果您有不需要耐用的东西,则将它们设置为 Transient 以提高性能。 请记住,如果您发送持久消息,您的队列也需要是持久的,如果队列本身不存在,则在重新启动时保存消息没有意义。
...
// Create context to manage timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create customer from sweden
if err := client.Send(ctx, "customer_events", "customers.created.se", amqp091.Publishing{
ContentType: "text/plain", // The payload we send is plaintext, could be JSON or others..
DeliveryMode: amqp091.Persistent, // This tells rabbitMQ that this message should be Saved if no resources accepts it before a restart (durable)
Body: []byte("An cool message between services"),
}); err != nil {
panic(err)
}
if err := client.Send(ctx, "customer_events", "customers.test", amqp091.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp091.Transient, // This tells rabbitMQ that this message can be deleted if no resources accepts it before a restart (non durable)
Body: []byte("A second cool message"),
}); err != nil {
panic(err)
}
log.Println(client)
}
是时候再次执行 Producer 了。
go run cmd/producer/main.go
您现在应该在 Queue 页面下的 UI 中看到每个 Queue 的一条消息。如果你愿意,你可以进入每个队列并使用消息来查看它们,但我建议重新启动 RabbitMQ 以显示 Transient 和 Persistent 之间的区别。
docker restart rabbitmq
重新启动后尝试重新加载 UI,您应该会看到整个 customers-test 队列已被删除,但 customers-created 队列实际上还保留着旧消息。这是因为持久消息被写入磁盘以在崩溃等情况下幸存下来。我们将涵盖更高级的发布。
消费消息、确认、不确认以及重入队列
我们知道如何发布消息,但如果我们不能在另一个服务中使用消息,那对我们没有好处。消费就是从队列中抓取消息的过程。让我们创建一个新的二进制文件,我们可以用它来消费消息。
mkdir cmd/consumer
touch cmd/consumer/main.go
在我们开始消费之前,我们将在 Rabbitmq.go 中添加一个 Consume 函数,它将包装通道消费函数。 消费时有几个选项需要考虑。
-
Exclusive – 如果设置为 true 将确保这是该队列中的单个且唯一的消费者,如果为 False 服务器将在消费者之间公平地分发消息。 -
AutoAck – 如果为 true,将自动确认交付,如果为 false,则希望消费者在完成时调用 Acknowledge。AutoAck 听起来可能很棒,但它很棘手,如果您的消费者在 Acking 一个耗时的过程后失败,消息就会丢失,因为服务器认为它已经完成。 -
NoLocal – 在 RabbitMQ、AMQP 字段中不支持,以避免从同一域发布和消费。 -
NoWait – 不等待服务器确认。
让我们将 Consume 函数添加到 Rabbitmq.go
// Consume is a wrapper around consume, it will return a Channel that can be used to digest messages
// Queue is the name of the queue to Consume
// Consumer is a unique identifier for the service instance that is consuming, can be used to cancel etc
// autoAck is important to understand, if set to true, it will automatically Acknowledge that processing is done
// This is good, but remember that if the Process fails before completion, then an ACK is already sent, making a message lost
// if not handled properly
func (rc RabbitClient) Consume(queue, consumer string, autoAck bool) (<-chan amqp.Delivery, error) {
return rc.ch.Consume(queue, consumer, autoAck, false, false, false, nil)
}
现在我们可以消费了,让我们填写 consumer/main.go 以便它连接到 RabbitMQ 并开始从队列中提取消息。
package main
import (
"log"
"programmingpercy/eventdrivenrabbit/internal"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
mqClient, err := internal.NewRabbitMQClient(conn)
if err != nil {
panic(err)
}
messageBus, err := mqClient.Consume("customers_created", "email-service", false)
if err != nil {
panic(err)
}
// blocking is used to block forever
var blocking chan struct{}
go func() {
for message := range messageBus {
// breakpoint here
log.Printf("New Message: %v", message)
}
}()
log.Println("Consuming, to close the program press CTRL+C")
// This will block forever
<-blocking
}
运行该消费者应该打印出来自发布者的消息。
★
请记住,重用连接,但为每个并行处理创建一个新的通道,在我们的例子中,将创建第二个 RabbitMQ 客户端来管理客户测试队列。
”
go run cmd/consumer/main.go
如果您没有看到任何消息,可能是因为您需要先运行生产者。
★
2023/02/12 22:17:24 New Message: {0xc0000b0000 map[] text/plain 2 0 0001-01-01 00:00:00 +0000 UTC ema il-service 0 1 false customer_events customers.created.se [65 110 32 99 111 111 108 32 109 101 115 115 97 103 101 32 98 101 116 119 101 101 110 32 115 101 114 118 105 99 101 115]}
”
可能值得探索通过通道发送的结构,即 amqp.Delivery 结构,它很好地了解了所有存在的字段。
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
Acknowledger Acknowledger // the channel from which this delivery arrived
Headers Table // 应用程序或标题交换表
// Properties
ContentType string // MIME 内容类型
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // 队列实现使用——非持久化(1)还是持久化(2)
Priority uint8 // 队列实现使用 优先级- 0 到 9
CorrelationId string // 应用程序使用 - 相关标识符
ReplyTo string // 应用程序使用 - 要回复的地址(例如:RPC)
Expiration string // 实现使用 - 消息过期规范
MessageId string // 应用程序使用 - 消息标识符
Timestamp time.Time // 应用程序使用 - 消息时间戳
Type string // 应用程序使用 - 消息类型名称
UserId string // 应用程序使用 - 创建用户 - 应该是经过身份验证的用户
AppId string // 应用程序使用 - 创建应用程序 ID
// Valid only with Channel.Consume
ConsumerTag string
// Valid only with Channel.Get
MessageCount uint32
DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key
Body []byte
}
如果你重新运行当前的消费者,你会看到同样的消息再次出现。这是因为我们从未承认消费者使用了该消息。这必须在迭代消息或使用自动确认标志时手动完成。 确认时我们可以传入一个 multiple 标志,这表明如果我们一次确认多条消息,我们可以将其设置为 false。我们可以确认或 NACK 消息,Acknowledge 表示一切正常,Nack 表示我们未能处理它,然后消息将被传回队列。 让我们更新使用消息的代码,以便它确认它们。
go func() {
for message := range messageBus {
// breakpoint here
log.Printf("New Message: %v", message)
// Multiple means that we acknowledge a batch of messages, leave false for now
if err := message.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle manually %sn", message.MessageId)
continue
}
log.Printf("Acknowledged message %sn", message.MessageId)
}
}()
现在重新运行代码,您应该会看到消息再次打印,但重新启动后消息消失了。这真的很有用,可以避免让消费者接受一条消息,在处理它时失败,然后该消息就会消失。为了展示为什么 Auto Ack 可能是危险的,这里是一个修改过的示例,我们将 Auto Ack 设置为 true,但在处理过程中失败了。
// Auto Ack is now True
messageBus, err := mqClient.Consume("customers-created", "email-service", true)
if err != nil {
panic(err)
}
// blocking is used to block forever
var blocking chan struct{}
go func() {
for message := range messageBus {
log.Printf("New Message: %v", message)
panic("Whops I failed here for some reason")
}
}()
运行消费者两次,你会发现它实际上只在第一次执行时被接受。如果您管理不当,这可能是危险的行为。这就是为什么我一直提到它! 为了处理失败,你可以使用 Nack 来告诉 RabbitMQ 它失败了,你可以使用 redelivered 字段来避免重试太多次。 Nack 接受一个 Requeuing 的参数,真是太好用了 !这是一个示例,我们在消息第一次到达时失败,将其重新排队,然后在下次到达时确认它。
messageBus, err := mqClient.Consume("customers-created", "email-service", false)
if err != nil {
panic(err)
}
// blocking is used to block forever
var blocking chan struct{}
go func() {
for message := range messageBus {
log.Printf("New Message: %v", message)
if !message.Redelivered {
// Nack multiple, Set Requeue to true
message.Nack(false, true)
continue
}
// Multiple means that we acknowledge a batch of messages, leave false for now
if err := message.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle manually %sn", message.MessageId)
continue
}
log.Printf("Acknowledged message %sn", message.MessageId)
}
}()
这里还有更多需要考虑的地方,现在我们使用的处理程序是单线程的,这意味着我们一次只能接受一条消息。我们可以通过实现一个允许一定数量的并发任务的工作组来解决这个问题。 我将添加一个错误组,所以这种方法需要 Go 1.2。使用 ErrGroup 非常简单,我们可以将其限制为每个消费者 10 条消息。 errgroup 来自 golang.org/x/sync/errgroup 包。
.....
// Set a timeout for 15 secs
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
// Create an Errgroup to manage concurrecy
g, ctx := errgroup.WithContext(ctx)
// Set amount of concurrent tasks
g.SetLimit(10)
go func() {
for message := range messageBus {
// Spawn a worker
msg := message
g.Go(func() error {
log.Printf("New Message: %v", msg)
time.Sleep(10 * time.Second)
// Multiple means that we acknowledge a batch of messages, leave false for now
if err := msg.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle manually %sn", msg.MessageId)
return err
}
log.Printf("Acknowledged message %sn", msg.MessageId)
return nil
})
}
}()
添加这个会让消费者好一点。
★
SetLimit 仅用于现在,还有另一种方法可以管理消耗了多少消息,使用我推荐的 RabbitMQ,称为 Prefetch,我们稍后会介绍
”
我们可以通过将 Send 函数包装在 for 循环中来更新 Publisher 以发送更多垃圾邮件。
// Create context to manage timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create customer from sweden
for i := 0; i < 10; i++ {
if err := client.Send(ctx, "customer_events", "customers.created.se", amqp091.Publishing{
ContentType: "text/plain", // The payload we send is plaintext, could be JSON or others..
DeliveryMode: amqp091.Persistent, // This tells rabbitMQ that this message should be Saved if no resources accepts it before a restart (durable)
Body: []byte("An cool message between services"),
}); err != nil {
panic(err)
}
}
if err := client.Send(ctx, "customer_events", "customers.test", amqp091.Publishing{
ContentType: "text/plain",
DeliveryMode: amqp091.Transient, // This tells rabbitMQ that this message can be deleted if no resources accepts it before a restart (non durable)
Body: []byte("A second cool message"),
}); err != nil {
panic(err)
}
log.Println(client)
}
尝试一下,看看消费者现在接受多条消息,或者甚至尝试生成多个消费者来玩一些。 注意到 Producer 发送消息后立即退出了吗?目前,发送功能不等待来自服务器的任何确认。有时,我们可能想阻塞直到服务器确认它已经收到消息。幸运的是,我们可以!我们需要将在 RabbitMQ 中使用的 Publish 函数更改为 PublishWithDeferredConfirmWithContext,它将返回一个对象,我们可以使用该对象等待服务器确认。除非Channel被设置为Confirm模式,否则该对象将一直为NIL,将其设置为Confirm模式将使服务器在收到发布的消息时发送确认。在 Rabbitmq.go 中,让我们更改 Publish 方法并添加一个 Wait。
// Send is used to publish a payload onto an exchange with a given routingkey
func (rc RabbitClient) Send(ctx context.Context, exchange, routingKey string, options amqp.Publishing) error {
// PublishWithDeferredConfirmWithContext will wait for server to ACK the message
confirmation, err := rc.ch.PublishWithDeferredConfirmWithContext(ctx,
exchange, // exchange
routingKey, // routing key
// Mandatory is used when we HAVE to have the message return an error, if there is no route or queue then
// setting this to true will make the message bounce back
// If this is False, and the message fails to deliver, it will be dropped
true, // mandatory
// immediate Removed in MQ 3 or up https://blog.rabbitmq.com/posts/2012/11/breaking-things-with-rabbitmq-3-0§
false, // immediate
options, // amqp publishing struct
)
if err != nil {
return err
}
// Blocks until ACK from Server is receieved
log.Println(confirmation.Wait())
return nil
}
我们还更新 NewRabbitMQClient 以始终将通道设置为确认模式。
// NewRabbitMQClient will connect and return a Rabbitclient with an open connection
// Accepts a amqp Connection to be reused, to avoid spawning one TCP connection per concurrent client
func NewRabbitMQClient(conn *amqp.Connection) (RabbitClient, error) {
// Unique, Conncurrent Server Channel to process/send messages
// A good rule of thumb is to always REUSE Conn across applications
// But spawn a new Channel per routine
ch, err := conn.Channel()
if err != nil {
return RabbitClient{}, err
}
// Puts the Channel in confirm mode, which will allow waiting for ACK or NACK from the receiver
if err := ch.Confirm(false); err != nil {
return RabbitClient{}, err
}
return RabbitClient{
conn: conn,
ch: ch,
}, nil
}
Rabbitmq.go 的更好方法是添加一个 NewChannel 函数,然后让每个函数接受一个 Channel 作为输入参数。现在运行程序,你应该看到每次服务器确认消息时 publisher.go 打印 TRUE,注意这与 Consumer ACK 不同。我们只等待服务器确认发布的消息已被接受。
发布和订阅(PubSub)
到目前为止,我们一直在使用 FIFO 队列(先进先出)。这意味着每条消息只发送给一个消费者。 在发布和订阅模式中,您将希望每个消费者接收相同的消息。 我们学到的关于绑定等的一切仍然是真实的,并以同样的方式使用。无论队列名称如何,我们都可以使用 Fanout 交换(将消息推送到所有绑定的队列)。 这个想法是让每个消费者创建一个未命名队列,一个未命名队列将获得 RabbitMQ 服务器随机生成的唯一名称。
★
这是一个很好的例子,在代码中创建队列是合适的
”
我们可能希望将 customers_event 发送到多个服务。假设我们需要一个电子邮件服务和一个记录每个客户事件的日志服务。让我们构建它。(由于这是一个仅用于学习 RabbitMQ 的教程,我们将简单地在两个实例中启动 Consumer)。我们首先删除我们拥有的交换,因为它是错误的类型。我们还创建了一个新的,但类型为 Fanout。这次我们没有为权限指定特定的前缀,而是赋予它完全访问权限。
docker exec rabbitmq rabbitmqadmin delete exchange name=customer_events --vhost=customers -u percy -p secret
docker exec rabbitmq rabbitmqadmin declare exchange --vhost=customers name=customer_events type=fanout -u percy -p secret durable=true
docker exec rabbitmq rabbitmqctl set_topic_permissions -p customers percy customer_events ".*" ".*"
由于我们在用当前代码创建一个未命名的Queue时并不知道Queue的名字,所以我们需要修改它。我们把RabbitMQ包中的CreateQueue返回的Queue信息返回一下。该对象将包含随机创建的名称。
// CreateQueue will create a new queue based on given cfgs
func (rc RabbitClient) CreateQueue(queueName string, durable, autodelete bool) (amqp.Queue, error) {
q, err := rc.ch.QueueDeclare(queueName, durable, autodelete, false, false, nil)
if err != nil {
return amqp.Queue{}, nil
}
return q, nil
}
是时候更新 Publisher 了,在本教程的前面我们在 Publisher 中创建了 Channel 绑定。根据我的说法,这真的没有意义,这只是为了不要移动得太快并仍然展示功能。 消费者声明绑定更有意义,因为它与消费者相关。现在使用发布和订阅更没有意义,因为消费者的数量和路径可能是未知的。让我们更新 publisher.go 使其更小。
package main
import (
"context"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
defer conn.Close()
client, err := internal.NewRabbitMQClient(conn)
if err != nil {
panic(err)
}
defer client.Close()
// Create context to manage timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create customer from sweden
for i := 0; i < 10; i++ {
if err := client.Send(ctx, "customer_events", "customers.created.se", amqp091.Publishing{
ContentType: "text/plain", // The payload we send is plaintext, could be JSON or others..
DeliveryMode: amqp091.Persistent, // This tells rabbitMQ that this message should be Saved if no resources accepts it before a restart (durable)
Body: []byte("An cool message between services"),
}); err != nil {
panic(err)
}
}
log.Println(client)
}
我们将更新 consumer.go 以创建一个没有名称的队列,创建绑定,然后开始使用该队列。
package main
import (
"context"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
mqClient, err := internal.NewRabbitMQClient(conn)
if err != nil {
panic(err)
}
// Create Unnamed Queue which will generate a random name, set AutoDelete to True
queue, err := mqClient.CreateQueue("", true, true)
if err != nil {
panic(err)
}
// Create binding between the customer_events exchange and the new Random Queue
// Can skip Binding key since fanout will skip that rule
if err := mqClient.CreateBinding(queue.Name, "", "customer_events"); err != nil {
panic(err)
}
messageBus, err := mqClient.Consume(queue.Name, "email-service", false)
if err != nil {
panic(err)
}
// blocking is used to block forever
var blocking chan struct{}
// Set a timeout for 15 secs
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
// Create an Errgroup to manage concurrecy
g, ctx := errgroup.WithContext(ctx)
// Set amount of concurrent tasks
g.SetLimit(10)
go func() {
for message := range messageBus {
// Spawn a worker
msg := message
g.Go(func() error {
log.Printf("New Message: %v", msg)
time.Sleep(10 * time.Second)
// Multiple means that we acknowledge a batch of messages, leave false for now
if err := msg.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle manually %sn", msg.MessageId)
return err
}
log.Printf("Acknowledged message %sn", msg.MessageId)
return nil
})
}
}()
log.Println("Consuming, to close the program press CTRL+C")
// This will block forever
<-blocking
}
此设置可用于正确展示 Pub/Sub,我们可以生成两个消费者,然后生成发布者。它将显示所有消费者如何查看所有消息。 我们现在知道如何使用常规队列和 PubSub。还有一件事,第三种非常常见的场景是基于 RPC 的范例。
使用Rabbitmq进行RPC调用
有时,我们希望对消息进行一些回调。假设生产者想知道客户何时发送电子邮件。这很常见并且很容易修复。我们可以在消息中设置一个名为 ReplyTo 的字段,这可以用来告诉 Consumer 回复某个 Queue 上的响应。 我们可能需要知道回调与哪个消息相关,因此我们还可以添加一个correlationID,可以用来了解响应与哪个请求相关。 首先创建一个新的 Exchange with Direct 类型。我将其命名为 customer_callbacks。Direct 类型在这里效果很好。
docker exec rabbitmq rabbitmqadmin declare exchange --vhost=customers name=customer_callbacks type=direct -u percy -p secret durable=true
docker exec rabbitmq rabbitmqctl set_topic_permissions -p customers percy customer_callbacks ".*" ".*"
我们需要了解的第一件事是此时一个重要的最佳实践。 拥有回调将要求相同的服务同时发布和使用消息,这并没有错。然而,一个著名的规则是为多个通道重用连接。但永远不要在同一个连接上PUBLISH和CONSUME。想象一下,如果您有一个服务既在生产又在消费,并且您是在同一个连接上进行的,那么想象一下该服务正在消费大量消息。如果消息多于服务设法处理的数量,则消息开始堆积。RabbitMQ 然后可能会施加反压并开始阻止 TCP 连接发送,猜猜看,必须发送 ACK 消息来处理消息。突然之间,由于连接被阻止,您的代码无法确认消息。这可能会导致延误。 黄金法则:
-
应用程序中重用连接; -
一种连接用于消费,一种用于发布 ; -
为每个 Goroutine生成新的 Channels;
让我们更新 producer.go 以启动两个连接,一个用于发布,一个用于消费。我们还将创建一个未命名的队列并将其绑定到交换器,然后我们将使用这些响应。 我们还将在消息中添加 replyTo,它告诉消费者在哪里回复,以及 correlationId 解释消息与哪个唯一事件相关。
package main
import (
"context"
"fmt"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"github.com/rabbitmq/amqp091-go"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
defer conn.Close()
// Never use the same Connection for Consume and Publish
consumeConn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
defer consumeConn.Close()
client, err := internal.NewRabbitMQClient(conn)
if err != nil {
panic(err)
}
defer client.Close()
consumeClient, err := internal.NewRabbitMQClient(consumeConn)
if err != nil {
panic(err)
}
defer consumeClient.Close()
// Create Unnamed Queue which will generate a random name, set AutoDelete to True
queue, err := consumeClient.CreateQueue("", true, true)
if err != nil {
panic(err)
}
if err := consumeClient.CreateBinding(queue.Name, queue.Name, "customer_callbacks"); err != nil {
panic(err)
}
messageBus, err := consumeClient.Consume(queue.Name, "customer-api", true)
if err != nil {
panic(err)
}
go func() {
for message := range messageBus {
log.Printf("Message Callback %sn", message.CorrelationId)
}
}()
// Create context to manage timeout
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Create customer from sweden
for i := 0; i < 10; i++ {
if err := client.Send(ctx, "customer_events", "customers.created.se", amqp091.Publishing{
ContentType: "text/plain", // The payload we send is plaintext, could be JSON or others..
DeliveryMode: amqp091.Persistent, // This tells rabbitMQ that this message should be Saved if no resources accepts it before a restart (durable)
Body: []byte("An cool message between services"),
// We add a REPLYTO which defines the
ReplyTo: queue.Name,
// CorrelationId can be used to know which Event this relates to
CorrelationId: fmt.Sprintf("customer_created_%d", i),
}); err != nil {
panic(err)
}
}
var blocking chan struct{}
log.Println("Waiting on Callbacks, to close the program press CTRL+C")
// This will block forever
<-blocking
}
消费者需要更新,以便它也使用两个连接。当我们处理完消息后,我们将添加它以便我们在 replyTo 队列中发送响应。同样,我们必须使用两种不同的连接,一种用于消费,一种用于发布。
package main
import (
"context"
"log"
"programmingpercy/eventdrivenrabbit/internal"
"time"
"github.com/rabbitmq/amqp091-go"
"golang.org/x/sync/errgroup"
)
func main() {
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
defer conn.Close()
publishConn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5672", "customers")
if err != nil {
panic(err)
}
defer publishConn.Close()
mqClient, err := internal.NewRabbitMQClient(conn)
if err != nil {
panic(err)
}
publishClient, err := internal.NewRabbitMQClient(publishConn)
if err != nil {
panic(err)
}
// Create Unnamed Queue which will generate a random name, set AutoDelete to True
queue, err := mqClient.CreateQueue("", true, true)
if err != nil {
panic(err)
}
// Create binding between the customer_events exchange and the new Random Queue
// Can skip Binding key since fanout will skip that rule
if err := mqClient.CreateBinding(queue.Name, "", "customer_events"); err != nil {
panic(err)
}
messageBus, err := mqClient.Consume(queue.Name, "email-service", false)
if err != nil {
panic(err)
}
// blocking is used to block forever
var blocking chan struct{}
// Set a timeout for 15 secs
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
// Create an Errgroup to manage concurrecy
g, ctx := errgroup.WithContext(ctx)
// Set amount of concurrent tasks
g.SetLimit(10)
go func() {
for message := range messageBus {
// Spawn a worker
msg := message
g.Go(func() error {
// Multiple means that we acknowledge a batch of messages, leave false for now
if err := msg.Ack(false); err != nil {
log.Printf("Acknowledged message failed: Retry ? Handle manually %sn", msg.MessageId)
return err
}
log.Printf("Acknowledged message, replying to %sn", msg.ReplyTo)
// Use the msg.ReplyTo to send the message to the proper Queue
if err := publishClient.Send(ctx, "customer_callbacks", msg.ReplyTo, amqp091.Publishing{
ContentType: "text/plain", // The payload we send is plaintext, could be JSON or others..
DeliveryMode: amqp091.Transient, // This tells rabbitMQ to drop messages if restarted
Body: []byte("RPC Complete"),
CorrelationId: msg.CorrelationId,
}); err != nil {
panic(err)
}
return nil
})
}
}()
log.Println("Consuming, to close the program press CTRL+C")
// This will block forever
<-blocking
}
继续尝试代码,您应该看到生产者收到 RPC 响应并将它们打印出来。请注意,此代码可以清理,但本教程的重点是 RabbitMQ 的工作原理,而不是清理代码。
限流以及限速
还记得我们之前使用 errgroup 限制了消费者完成的工作量吗?这是一个软限制,一个由代码强加的限制,但 RabbitMQ 仍然可以向消费者发送更多消息。 对此有更好的解决方法,如果您希望您的消费者同时处理消息,则可能应该使用组合。 AMQP 协议允许我们应用预取限制。这告诉 RabbitMQ 服务器一次可以向通道发送多少未确认的消息。这样我们就可以添加一个硬限制。 这是通过应用一组服务质量规则 (QOS) 来完成的。让我们在 rabbitmq.go 中添加一个应用三个可用规则的方法。
// ApplyQos is used to apply qouality of service to the channel
// Prefetch count - How many messages the server will try to keep on the Channel
// prefetch Size - How many Bytes the server will try to keep on the channel
// global -- Any other Consumers on the connection in the future will apply the same rules if TRUE
func (rc RabbitClient) ApplyQos(count, size int, global bool) error {
// Apply Quality of Serivce
return rc.ch.Qos(
count,
size,
global,
)
}
然后在 consumer.go 中我们可以简单地调用它并应用我们想要允许的消息数。
// Create an Errgroup to manage concurrecy
g, ctx := errgroup.WithContext(ctx)
// Set amount of concurrent tasks
g.SetLimit(10)
// Apply Qos to limit amount of messages to consume
if err := mqClient.ApplyQos(10, 0, true); err != nil {
panic(err)
}
go func() {
for message := range messageBus {
TLS安全连接
现在是 2023 年,在投入生产之前,我认为我们应该对流量进行加密是非常安全的说法。 RabbitMQ 有一个 GitHub 存储库来帮助我们创建一个 rootCA 和我们需要的证书,这是开始加密流量的第一步。 我们需要克隆这个存储库并执行其中的 make 文件,这将生成我们需要的文件。
git clone https://github.com/rabbitmq/tls-gen tls-gen
cd tls-gen/basic
make PASSWORD=
make verify
所有生成的文件将出现在一个名为 result 的新文件夹中。为了在 Docker 中工作,我们需要更改它们的权限。
sudo chmod 644 tls-gen/basic/result/*
我们需要删除正在运行的 RabbitMQ 容器,我们需要创建一个带有配置文件的新容器。
sudo docker container rm -f rabbitmq
配置文件名为 rabbitmq.conf,应放在容器中的 /etc/rabbitmq/rabbitmq.conf 中。此配置文件可用于配置的不仅仅是 TLS,但我们现在将坚持使用 TLS。在项目的根目录中创建一个具有正确名称的新文件。
cd ../../ # Go to root of Project
touch rabbitmq.conf
我们需要在启动容器的时候将配置文件挂载到docker中。我们还将将从 TLS-Gen 工具生成的证书挂载到 /certs 中,以便容器可以找到它们。请注意,两个端口都减少了一个,这是 RabbitMQ 协议中的标准。
docker run -d --name rabbitmq -v "$(pwd)"/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro -v "$(pwd)"/tls-gen/basic/result:/certs -p 5671:5671 -p 15671:15671 rabbitmq:3.11-management
一旦完成,我们就可以开始向这个容器添加 TLS 配置。在 rabbitmq.conf 中,让我们将 PATH 添加到证书和根 ca。我的计算机名为 blackbox,您需要将证书名称替换为您的计算机生成的名称。
# Disable NON TCP
listeners.tcp = none
# TCP port
listeners.ssl.default = 5671
# SSL Certs
ssl_options.cacertfile = /certs/ca_certificate.pem
ssl_options.certfile = /certs/server_blackbox_certificate.pem
ssl_options.keyfile = /certs/server_blackbox_key.pem
# Peer verification
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
然后重启RabbitMQ
docker restart rabbitmq
要验证一切正常,您可以使用 docker logs rabbitmq 查看 docker 日志。搜索有关侦听器的日志。
2023-02-19 07:35:15.566316+00:00 [info] <0.738.0> Ready to start client connection listeners
2023-02-19 07:35:15.567418+00:00 [info] <0.885.0> started TLS (SSL) listener on [::]:5671
现在,旧程序将不再有效。它尝试在没有 TLS 的情况下使用连接,所以让我们解决这个问题。需要更新程序才能使用客户端证书。让我们将其添加为 ConnectRabbitMQ 函数的输入。
// ConnectRabbitMQ will spawn a Connection
func ConnectRabbitMQ(username, password, host, vhost, caCert, clientCert, clientKey string) (*amqp.Connection, error) {
ca, err := os.ReadFile(caCert)
if err != nil {
return nil, err
}
// Load the key pair
cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
if err != nil {
return nil, err
}
// Add the CA to the cert pool
rootCAs := x509.NewCertPool()
rootCAs.AppendCertsFromPEM(ca)
tlsConf := &tls.Config{
RootCAs: rootCAs,
Certificates: []tls.Certificate{cert},
}
// Setup the Connection to RabbitMQ host using AMQPs and Apply TLS config
conn, err := amqp.DialTLS(fmt.Sprintf("amqps://%s:%s@%s/%s", username, password, host, vhost), tlsConf)
if err != nil {
return nil, err
}
return conn, nil
}
请注意,我们现在使用 amqps 协议。证书路径是绝对路径,让我们更新消费者和生产者以插入这些,我现在将使用硬编码值,但您不应该在实际应用程序中这样做。
conn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5671", "customers",
"/home/pp/development/blog/event-driven-rabbitmq/tls-gen/basic/result/ca_certificate.pem",
"/home/pp/development/blog/event-driven-rabbitmq/tls-gen/basic/result/client_blackbox_certificate.pem",
"/home/pp/development/blog/event-driven-rabbitmq/tls-gen/basic/result/client_blackbox_key.pem",
)
if err != nil {
panic(err)
}
defer conn.Close()
// Never use the same Connection for Consume and Publish
consumeConn, err := internal.ConnectRabbitMQ("percy", "secret", "localhost:5671", "customers",
"/home/pp/development/blog/event-driven-rabbitmq/tls-gen/basic/result/ca_certificate.pem",
"/home/pp/development/blog/event-driven-rabbitmq/tls-gen/basic/result/client_blackbox_certificate.pem",
"/home/pp/development/blog/event-driven-rabbitmq/tls-gen/basic/result/client_blackbox_key.pem",
)
defer consumeConn.Close()
砰!太棒了,我们得到了 TLS。 尝试运行生产者或消费者,然后使用 docker logs rabbitmq 观察 docker 日志。
2023-02-19 07:49:53.015732+00:00 [error] <0.948.0> Error on AMQP connection <0.948.0> (172.17.0.1:49066 -> 172.17.0.2:5671, state: starting):
2023-02-19 07:49:53.015732+00:00 [error] <0.948.0> PLAIN login refused: user 'percy' - invalid credentials
是的,我们在删除 docker 时删除了虚拟主机、用户、交换和所有内容,因为我们不持久存储。这很棒,因为这将我们带到本教程的下一步也是最后一步,即默认配置。
RabbitMQ配置和管理
相信我,您不想使用 AdminCLI 为多个用户管理 rabbitMQ,因为,如果您出于某种原因重置集群,将会有很多重复的工作。 支持插入定义文件、定义用户的 JSON 文件、虚拟主机、权限、队列和交换,甚至绑定。它们真的很容易使用,让我们添加我的老用户并赋予它在客户虚拟主机上写入和读取的权限,并添加一个基本的交换。 在此之前,我们需要一个密码哈希,这可能比想象的要复杂一些。这取决于您拥有的 RabbitMQ 设置以及您配置的算法。默认的是 SHA256。我在 stackoverflow 上找到了一个很棒的 bash 脚本来为我生成它。创建一个名为 encodepassword.sh 的文件,并将 secret 替换为您要编码的密码。
#!/bin/bash
function encode_password()
{
SALT=$(od -A n -t x -N 4 /dev/urandom)
PASS=$SALT$(echo -n $1 | xxd -ps | tr -d 'n' | tr -d ' ')
PASS=$(echo -n $PASS | xxd -r -p | sha256sum | head -c 128)
PASS=$(echo -n $SALT$PASS | xxd -r -p | base64 | tr -d 'n')
echo $PASS
}
encode_password "secret"
运行脚本 bash encodepassword.sh 并存储哈希。更新 rabbitmq.conf 以包含可用于在启动时加载定义文件的字段 load_definitions。
log.console = true
# Disable NON TCP
listeners.tcp = none
# TCP port
listeners.ssl.default = 5671
# SSL Certs
ssl_options.cacertfile = /certs/ca_certificate.pem
ssl_options.certfile = /certs/server_blackbox_certificate.pem
ssl_options.keyfile = /certs/server_blackbox_key.pem
# Peer verification
ssl_options.verify = verify_peer
ssl_options.fail_if_no_peer_cert = true
# Load definitions file
load_definitions = /etc/rabbitmq/rabbitmq_definitions.json
我将指向一个名为 /etc/rabbitmq/rabbitmq_definitions.json 的文件。 在项目的根目录中创建一个名为 rabbitmq_definitions.json 的文件,并用以下 JSON 填充它。在这一点上,我认为我们不需要覆盖 JSON 字段,一切都应该是可以理解的并且清楚什么是什么。它与我们之前运行的 CLI 命令非常相似。 下面的定义文件创建了两个我们有 customer_events 和 customer_callbacks 的交换。当前的代码会生成自己的队列,所以我们只是在示例中定义一个来玩一下。
{
"users": [
{
"name": "percy",
"password_hash": "dPOoDgfw31kjUy41HSmqQR+X2Q9PCA5fD++fbxQCgPvKZmnX",
"tags": "administrator"
}
],
"vhosts": [
{
"name": "/"
},{
"name": "customers"
}
],
"permissions": [
{
"user": "percy",
"vhost": "customers",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"exchanges": [
{
"name": "customer_events",
"vhost": "customers",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
},
{
"name": "customer_callbacks",
"vhost": "customers",
"type": "direct",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"queues": [
{
"name": "customers_created",
"vhost": "customers",
"durable": true,
"auto_delete": false,
"arguments": {}
}
],
"bindings": [
{
"source": "customers_events",
"vhost": "customers",
"destination": "customers_created",
"destination_type": "queue",
"routing_key": "customers.created.*",
"arguments": {}
}
]
}
一旦两个文件就位,删除旧的 docker,并重新启动一个新的,但这次我们为定义添加第三个安装。
docker run -d --name rabbitmq -v "$(pwd)"/rabbitmq_definitions.json:/etc/rabbitmq/rabbitmq_definitions.json:ro -v "$(pwd)"/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro -v "$(pwd)"/tls-gen/basic/result:/certs -p 5671:5671 -p 15672:15672 rabbitmq:3.11-management
运行后,验证他们打印出的创建用户的日志。
2023-02-19 08:17:53.467218+00:00 [info] <0.867.0> Started message store of type persistent for vhost 'customers'
2023-02-19 08:17:53.467310+00:00 [info] <0.867.0> Recovering 0 queues of type rabbit_classic_queue took 3ms
2023-02-19 08:17:53.467348+00:00 [info] <0.867.0> Recovering 0 queues of type rabbit_quorum_queue took 0ms
2023-02-19 08:17:53.467371+00:00 [info] <0.867.0> Recovering 0 queues of type rabbit_stream_queue took 0ms
2023-02-19 08:17:53.468487+00:00 [info] <0.698.0> Importing concurrently 1 permissions...
2023-02-19 08:17:53.469946+00:00 [info] <0.680.0> Successfully set permissions for 'percy' in virtual host 'customers' to '.*', '.*', '.*'
完成后,尝试运行消费者和生产者,您应该会看到现在一切都按预期进行。唯一的区别是我们现在使用配置而不是使用 CLI 在 RabbitMQ 中创建基础设施,并且流量是加密的。
总结
遗憾的是,这次漫长而激动人心的 RabbitMQ 冒险到此结束。让我们来看看我们学到了什么。我们已经学习了如何使用虚拟主机配置 RabbitMQ,以及如何创建具有这些虚拟主机权限的用户。 我们还学习了如何在队列和交换器上生成和使用消息。您应该熟悉所有资源,例如队列、交换器和绑定。我们还介绍了如何创建发布和订阅模式、RPC 模式和常规工作队列。 希望您清楚如何使用 Connections 和 Channels 以及它们之间的区别。连接是 TCP 连接,通道是连接上的多路复用虚拟通道。在同一软件中重用连接,但为每个并行进程创建新通道。 我们学会了永远不要在相同的连接上生产和消费。我们还介绍了如何设置 TLS 以及如何为 RabbitMQ 的预定义配置添加定义。
原文始发于微信公众号(小唐云原生):【翻译】golang: 通过rabbitmq学习事件驱动(EDA)
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/247605.html