如何在go语言中实现高并发的消息中间件

什么是消息中间件?

消息中间件是一种提供异步消息通信机制的软件解决方案。它通过解耦生产消息的应用程序和处理消息的应用程序,从而改善了分布式系统的可扩展性和可靠性。

消息中间件通常由三个核心组成部分组成:消息生产者,消息中心和消息消费者。消息生产者是负责生产消息并将其写入中间件的应用程序,消息中心是负责存储和分发消息的中间件服务器,消息消费者是从中间件读取消息并将其处理的应用程序。消息中间件一般提供了发布/订阅决策(发布者订阅消费者),点对点传递(单个发布对应单个订阅)及其他高级特性。

为什么要使用消息中间件?

提高系统可伸缩性

在一个分布式系统中,随着业务量的增长和用户数量的增加,处理这些业务或者请求的实例数量也会随之增加。传统设计中部署多个应用服务器节点并通过负载均衡组件来分配请求。但是,这种方式的可伸缩性在单个节点的物理极限内有极限,难以达到实现真正意义上的横向拓展。

例如,假设我们的Web应用程序处理了大量的用户身份验证请求。这意味着随着用户数量的增加,身份验证服务器必须处理的请求数也将增加。当请求增加时,可以通过添加更多的服务器节点来拓展身份验证服务的处理能力。

但是这种方式最终会到达硬件的物理极限,我们可以添加更多的服务器,但是性能增长的情况会迅速减慢。而我们更需要的是能够横向拓展的解决方案,这时就需要使用消息中间件来解耦业务服务和负载均衡组件并支持拓展单个节点以支持更多请求。

提高系统可靠性

在分布式系统中,系统处理请求时可能会发生许多类型的故障。例如,应用程序服务器(或者特定的数据中心)可能会由于网络中断、自然灾害或其他原因而停机。如果我们的应用程序采用同步方式与另一个应用程序进行通信,则完全停机的节点将只能导致另一个节点终止运行,从而导致整个系统崩溃。

使用消息中间件我们可以确保消息在整个系统中保持可靠性。当生产者将消息发送到消息队列时,消息通常在中间件中复制多个副本以提供数据可用性。如果消费者没有准备好处理消息,则消息队列将其保存在队列中,直到消费者准备好处理它。

使用golang实现高并发的消息中间件

选择适合您的消息中间件

在选择消息队列时,您有多个选择,但要正确选择。每个消息队列都有其自己的特定用途,以及优势和劣势。

在golang中,最受欢迎的消息中间件是Kafka和RabbitMQ。

Kafka的强项适用于大规模日志处理&大数据场景,而01MQ更适用于业务场景&小规模的服务。

这篇文章会使用RabbitMQ,它是一种基于AMQP协议的开源消息队列。您可以在http://www.rabbitmq.com/中获取更多信息。

配置RabbitMQ服务器

在使用RabbitMQ之前,您需要先在您的本地或者远程安装RabbitMQ服务器。您可以按照官方文档或者使用一些简单的脚本来安装RabbitMQ。在本教程中,我们将使用docker镜像来安装RabbitMQ服务。

首先,我们需要先安装docker工具,我们选择docker社区版并在Ubuntu 20.04上进行安装,Docker社区版是适用于Docker开发者和技术爱好者的自由软件。您可以从Docker官方网站获得更多信息。

sudo apt update

sudo apt install apt-transport-https ca-certificates curl software-properties-common

curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -

sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu focal stable"

sudo apt update

sudo apt install docker-ce

我们将使用docker镜像来启动RabbitMQ服务器。您可以使用以下命令从docker hub中获取RabbitMQ镜像。

docker pull rabbitmq:3-management

运行以下命令即可启动RabbitMQ服务,并打开默认端口(5672)以及Web管理UI端口(15672)。

docker run -d --hostname rabbitmq-service --name rabbitmq-management -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=password -p 5672:5672 -p 15672:15672 rabbitmq:3-management

这时您可以访问http://localhost:15672/查看RabbitMQ控制台。您可以使用默认的用户名:admin和密码:password进行登录。

使用RabbitMQ的go客户端

RabbitMQ客户端有多种语言实现,包括Java、Python、.NET、Go等。为了在golang中使用RabbitMQ,我们需要安装其go客户端。go客户端称为amqp!(https://github.com/streadway/amqp)。

go get -u github.com/streadway/amqp

编写消息生产者代码

为了示例简单,我们将编写一个简单的生产者代码,该代码将位于客户端端点(producer.go)中。我们将提供一个命令行界面,以启动生产者并在用户请求输入时生产消息。我们使用最简单的发布/订阅模式来生产和消费消息。

所有发布到RabbitMQ的消息都可以路由到一个Exchange上。在绑定Exchange和队列后,Exchange上和队列中数据绑定将被处理。在我们的示例中,没有绑定队列“messages”,因为我们还没有为此创建响应的消费者。

package main

import (

"bufio"

"fmt"

"log"

"os"

"strings"

"github.com/streadway/amqp"

)

func failOnError(err error, msg string) {

if err != nil {

log.Fatalf("%s: %s", msg, err)

}

}

func main() {

conn, err := amqp.Dial("amqp://admin:password@localhost:5672/")

failOnError(err, "Failed to connect to RabbitMQ")

defer conn.Close()

ch, err := conn.Channel()

failOnError(err, "Failed to open a channel")

defer ch.Close()

q, err := ch.QueueDeclare(

"messages",

false,

false,

false,

false,

nil,

)

failOnError(err, "Failed to declare a queue")

reader := bufio.NewReader(os.Stdin)

for {

fmt.Print("Enter message: ")

text, _ := reader.ReadString('\n')

text = strings.TrimSuffix(text, "\n")

err = ch.Publish(

"",

q.Name,

false,

false,

amqp.Publishing{

ContentType: "text/plain",

Body: []byte(text),

})

failOnError(err, "Failed to publish a message")

fmt.Println("Message sent")

}

}

完成后,您可以运行此代码来启动RabbitMQ生产者。您将看到以下输出:

Enter message:

您可以在back-end的RabbitMQ控制台上观察消息是否已成功发布。

编写消息消费者代码

在消费者方面,我们将使用主功能作为单个除Message生产者之外的消费者。deal.go的代码如下所示:。

此消费者代码只是输出消息。

package main

import (

"log"

"github.com/streadway/amqp"

)

func failOnError(err error, msg string) {

if err != nil {

log.Fatalf("%s: %s", msg, err)

}

}

func main() {

conn, err := amqp.Dial("amqp://admin:password@localhost:5672/")

failOnError(err, "Failed to connect to RabbitMQ")

defer conn.Close()

ch, err := conn.Channel()

failOnError(err, "Failed to open a channel")

defer ch.Close()

q, err := ch.QueueDeclare(

"messages", // Queue name

false, // Durable

false, // Delete when unused

false, // Exclusive

false, // No-wait

nil, // Arguments

)

failOnError(err, "Failed to declare a queue")

msgs, err := ch.Consume(

q.Name, // Queue

"", // Consumer

true, // Auto-ack

false, // Exclusive

false, // No-local

false, // No-wait

nil, // Args

)

failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {

for d := range msgs {

log.Printf("Received a message: %s", d.Body)

}

}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

<-forever

}

完成后,您可以启动deal.go并订阅messages队列。您可以看到控制台输出以下信息:

[*] Waiting for messages. To exit press CTRL+C

当生产者发送消息过来的时候,您将看到消费者控制台输出相应的内容。

总结

通过本教程,您可以了解如何使用golang编写消息生产者和消费者,并使用RabbitMQ作为消息中间件在两个端点之间进行通信。本教程介绍了RabbitMQ的简单使用以及如何使用其go客户端实现高并发的消息中间件系统。鉴于RabbitMQ对其他语言的支持非常广泛,您可以选择使用Java、Python或其他语言,但使用RabbitMQ作为消息通信机制的想法是一样的。

后端开发标签