【RabbitMq】1.介绍与入门

什么是消息队列(MQ)

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。

下面是一个常见的消息队列模型

消息队列消息模型的特点

  1. 消息生产者将消息发送到Queue中,然后消息消费者监听Queue并接收消息;
  2. 消息被确认消费以后,就会从Queue中删除,所以消息消费者不会消费到已经被消费的消息;
  3. Queue支持存在多个消费者和生产者,但是对某一个消息而言,只会有一个消费者成功消费。

消息的生产与消费常规流程

一条完整的可靠的消息生产到消费的过程需要经历以下6个步骤

  1. Producer生成消息并发送给MQ(同步、异步);
  2. MQ接收消息并将消息数据持久化到消息存储(持久化操作为可选配置);
  3. MQ向Producer返回消息的接收结果(返回值、异常);
  4. Consumer监听并消费MQ中的消息;
  5. Consumer获取到消息后执行业务处理;
  6. Consumer对已成功消费的消息向MQ进行ACK确认(确认后的消息将从MQ中删除)。

什么是AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

AMQP是一个进程间传递异步消息的网络协议,这篇将要讲到的RabbitMQ就是基于AMQP协议的一个消息中间件。

在普通的MQ上,AMQP增加了一个交换机用来绑定消息的路由,同时消费者也可以从指定路由中获取需要的消息。

  1. 生产者(Producer)发布消息(Message),经由交换机(Exchange)。
  2. 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
  3. 最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

RabbitMQ

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

安装

安装过程可以直接参考文档:Downloading and Installing RabbitMQ

在我的环境中使用了Docker镜像:bitnami/rabbitmq 当然你也可以使用RabbitMQ官方镜像:rabbitmq

简单介绍

如上图所示,蓝色区域是发送消息的过程,红色区域是RabbitMQ-Server内部结构,黄色的部分是RabbitMQ内部交换机通过路由将消息与队列绑定的过程,绿色区域是消费的过程。

RabbitMQ Server: 也叫broker server,它是一种传输服务。 他的角色就是维护一条从Producer到Consumer的路线,保证数据能够按照指定的方式进行传输。但是这个保证也不是100%的保证,但是对于普通的应用来说这已经足够了。当然对于商业系统来说,可以再做一层数据一致性的guard,就可以彻底保证系统的一致性了。

Producer: 消息生产者,如图A、B、C,数据的发送方。消息生产者连接RabbitMQ服务器然后将消息投递到Exchange。

Consumer:消息消费者,如图1、2、3,数据的接收方。消息消费者订阅队列,RabbitMQ将Queue中的消息发送到消息消费者。

Exchange:生产者将消息发送到Exchange(交换器),由Exchange将消息路由到一个或多个Queue中(或者丢弃)。Exchange并不存储消息。RabbitMQ中的Exchange有fanout、direct、topic、headers四种类型,每种类型对应不同的路由规则,后面详细介绍这四种类型。

Queue:(队列)是RabbitMQ的内部对象,用于存储消息。消息消费者就是通过订阅队列来获取消息的,RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。多个消费者可以订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。

RoutingKey:生产者在将消息发送给Exchange的时候,一般会指定一个routing key,来指定这个消息的路由规则,而这个routing key需要与Exchange Type及binding key联合使用才能最终生效。在Exchange Type与binding key固定的情况下(在正常使用时一般这些内容都是固定配置好的),我们的生产者就可以在发送消息给Exchange时,通过指定routing key来决定消息流向哪里。RabbitMQ为routing key设定的长度限制为255 bytes。

Connection: (连接)。Producer和Consumer都是通过TCP连接到RabbitMQ Server的。以后我们可以看到,程序的起始处就是建立这个TCP连接。

Channels: (信道)。它建立在上述的TCP连接中。数据流动都是在Channel中进行的。也就是说,一般情况是程序起始建立TCP连接,第二步就是建立这个Channel。

简单的入门

入门前你需要知道的事

图解实例说明:

  1. P就是生产者(Producer),也就是发送消息的程序:

  2. 队列(Queue)是RabbitMQ内部的消息容器。虽然消息经过了生产者,RabbitMQ,消费者,但它们只能存储在RabbitMQ的消息容器中,队列本质上是一个大的消息缓冲器。多生产者可以发送消息到同一个队列,多消费者也可以从一个队列接收数据:

  3. 消费者(Consumer)与生产者具有相似的含义,主要是等待接收信息:

端口说明:

  1. 4369 (epmd), 25672 (Erlang distribution):Epmd 是 Erlang Port Mapper Daemon 的缩写,在 Erlang 集群中相当于 dns 的作用,绑定在4369端口上。这连个端口用于集群中内部的通信

  2. 5672, 5671 (AMQP 0-9-1 without and with TLS):AMQP 是 Advanced Message Queuing Protocol 的缩写,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,专为面向消息的中间件设计。基于此协议的客户端与消息中间件之间可以传递消息,并不受客户端/中间件不同产品、不同的开发语言等条件的限制。是用来提供给生产者/消费者客户端访问的端口。

  3. 15672 (if management plugin is enabled):RabbitMQ 的 Web 管理界面默认端口,默认用户名密码都是 guest。(注意:RabbitMQ 3.0之前的版本默认端口是55672,下同)。通过web访问15672端口可以进入rabbitmq的管理后台(前提是开启了management插件)

  4. 61613, 61614 (if STOMP is enabled):Stomp 是一个简单的消息文本协议,它的设计核心理念就是简单与可用性,官方文档,实践一下 Stomp 协议需要:

  5. 一个支持 stomp 消息协议的 messaging server (譬如activemq,rabbitmq);

  6. 一个终端(譬如linux shell);

  7. 一些基本命令与操作(譬如nc,telnet)

  8. 1883, 8883 (if MQTT is enabled):MQTT 只是 IBM 推出的一个消息协议,基于 TCP/IP 的。两个 App 端发送和接收消息需要中间人,这个中间人就是消息服务器(比如ActiveMQ/RabbitMQ),三者通信协议就是 MQTT

实现AMQP协议进行通信

需要与RabbitMQ通信,你只需要完成AMQP协议就可以了。这里有两个推荐的方法:

  1. 使用composer引入php-amqplib/php-amqplib
  2. 安装amqp扩展

我这里就直接使用composer引入的方式了。

实现最简单的生产-消费模型

上图可以理解为2个部分:

  1. 生产者:

  2. 消费者:

生产者

创建与RabbitMQ的连接
$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');

amqp-lib中抽象了一个连接类AbstractConnection用来处理协议和用户校验相关事项。

AMQPStreamConnection就是一个连接类的实现。

AMQPStreamConnection(string $host, string $port, string $user, string $password)

创建一个channel进行通信
$channel = $connection->channel();

channel是用来与RabbitMQ通信的,大多数API都要用到channel。

声明一个队列
$channel->queue_declare('test');

通过$channel->queue_declare()声明一个队列,如果RabbitMQ中没有该队列会自动生成一个同名队列。

发送一条消息
$msg = new AMQPMessage('test data');
$channel->basic_publish($msg, '', 'test');

AMQPChannel->basic_publish(AMQPMessage $msg, string $exchange ,string $routing_key)

通过AMQPMessage来维护消息的整个协议,使用$channel->basic_publish()做消息的推送将消息发送到RabbitMQ中。其中消息会发送到与$routing_key同名的队列中

关闭连接
$channel->close();
$connection->close();

最后别忘了关闭连接!

完整代码
require_once __DIR__ . '/bootstrap.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('test');

for ($i = 0; $i < 10; $i++) {
    $msg = new AMQPMessage('test data => ' . $i);

    $channel->basic_publish($msg, '', 'test');
}

$channel->close();
$connection->close();

这里发送了10条消息给RabbitMQ

检查消息

打开管理者插件后可以直接通过15672端口访问RabbitMQ后台

如图所示,test队列中准备好了10条待消费的消息。

消费者

require_once __DIR__ . '/bootstrap.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('test');

$channel->basic_consume('test', '', false, true, false, false, function ($msg) {
    echo "接收到一条消息:" . $msg->body . "\r\n";
});

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

在进过同样的创建连接,创建通道,声明队列后,使用AMQPChannel->basic_consume(string $queue, string $consumer_tag, bool $no_local, bool $no_ack, bool $exclusive, bool $nowait, callable|null $callback)创建一个消费者,通过传入一个回调方法来消费一条消息。

消息的消费是在

while ($channel->is_consuming()) {
    $channel->wait();
}

中处理的,其中$channel->is_consuming()会根据该消费者客户端是否有消费需求(设置回调方法)阻塞程序运行等待新的异步消息进入队列中。运行一下看看结果:

接收到一条消息:test data => 0
接收到一条消息:test data => 1
接收到一条消息:test data => 2
接收到一条消息:test data => 3
接收到一条消息:test data => 4
接收到一条消息:test data => 5
接收到一条消息:test data => 6
接收到一条消息:test data => 7
接收到一条消息:test data => 8
接收到一条消息:test data => 9

查看后台,队列也被清空了。最基础的RabbitMQ须知概念和使用就到这里了,后面不出意外会有更深的使用,如订阅,ACK等

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论