【RabbitMq】2.消息持久化

持久化

首先让我们回顾一下消费者模型:

在一个消息被消费的过程中,消息从队列中取出,发送到了消费者。这个过程中消息是有可能会丢失的!那么在这里先列举一下丢失消息的场景

  1. 队列设置了自动删除,再没有消费者了后会自动删除
  2. 未做持久化RabbitMQ服务宕机
  3. 消费者未处理完消息宕机

这里将讲解如何针对这3种情况做处理。

关闭自动删除,开启队列持久化

这个问题是这3个情况中最简单的,只要在声明队列的时候关闭auto_delete就可以了

$channel->queue_declare('test', false, true, false, false);
public function queue_declare(
        $queue = '', // 队列名
        $passive = false,
        $durable = false, // 持久化
        $exclusive = false, // 排他队列,只对首次声明它的连接(Connection)可见,不允许其他连接访问,在连接断开的时候自动删除,无论是否设置了持久化
        $auto_delete = true, // 自动删除,如果该队列已经没有消费者时,该队列会被自动删除。这种队列适用于临时队列。
        $nowait = false, // 生产者消息确认
        $arguments = array(),
        $ticket = null
    ){}

如上面的代码所示,将第五个参数auto_delete设置为false(默认为true)即可关闭队列自动删除;将第三个参数durable设置为true(默认为false)即可开启队列持久化。

对于实际应用中,大部分需要利用到消息中间件的情况都是需要保持队列持久化的。

消息服务端持久化

上面的参数仅仅是队列的持久化,现在还并不能保证消息在RabbitMQ服务宕机的时候丢失,发送持久化消息需要在发送消息时增加参数

$msg = new AMQPMessage('test data => ' . $i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

消息的delivery_mode分为两种:

  1. AMQPMessage::DELIVERY_MODE_NON_PERSISTENT = 1 该模式下消息存储在内存中,RabbitMQ服务重启后消息将会丢失
  2. AMQPMessage::DELIVERY_MODE_PERSISTENT = 2 该模式下消息存储在磁盘中,RabbitMQ服务重启后消息可以恢复

处理消费者未处理完消息宕机的方法 -- ACK

ACK即消息确认机制(acknowledgment)。在RabbitMQ中也做了自动确认处理。

在自动确认模式下,发送消息后立即将其视为已成功传递和处理消息。此模式在较高的吞吐量(只要消费者可以跟上)和安全性之间进行权衡,以降低交付和消费者处理的安全性的代价增加了吞吐量。此模式通常称为“一劳永逸”。与手动确认模型不同,如果在成功传递之前关闭使用者的TCP连接或通道,则服务发送的消息将丢失。因此,自动消息确认应该被认为是不安全的, 并且不适合所有场景。

在大部分情况下,我们需要使用手动确认消息的方式来保证消费者未处理完消息宕机等情况下消息的完整性和可靠性。首先来看看用法:

$channel->basic_consume('test', '', false, false, false, false, function ($msg) {
    echo "=========start========\r\n";
    echo "| 接收到一条消息:" . $msg->body . "\r\n";
    // 处理耗时任务中
    sleep(1);
    echo "| 消息处理结束\r\n";
    echo "==========end=========\r\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});
public function basic_consume(
        $queue = '', // 队列名
        $consumer_tag = '', // 消费者标签
        $no_local = false, // 这个暂时不用管,搜了一下是AMQP中的一个标准但是RabbitMQ并没有实现
        $no_ack = false, // 不需要消息确认(即自动确认)
        $exclusive = false, // 排他队列,如果你希望创建一个队列,并且只有你当前这个程序(或进程)进行消费处理.不希望别的客户端读取到这个队列.用这个方法甚好.而同时如果当进程断开连接.这个队列也会被销毁.不管是否设置了持久化或者自动删除
        $nowait = false, // 执行后不需要等结果
        $callback = null, // 回调函数
        $ticket = null,
        $arguments = array()
    ) {}

no_ack设置为false即为关闭自动确认(开启手动消息确认)。开启消息确认后需要在回调函数中使用$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);对消息进行ACK确认。

  1. $msg->delivery_info['channel']即当前通道
  2. basic_ack()是对通道中的消息进行确认的方法
  3. $msg->delivery_info['delivery_tag']是消息的标签

消息标签

在我们继续讨论其他主题之前,重要的是要解释如何识别交货(确认表示它们各自的交货)。注册使用者(订阅)后,RabbitMQ将使用basic.deliver 方法传送(推送)消息。该方法带有传递标签,该标签唯一地标识通道上的传递。因此,交付标签是按通道划分的。

交付标签是单调增长的正整数,并由客户端库表示。确认交付的客户端库方法将交付标签作为参数。

由于传递标签是按通道划分的,因此必须在接收它们的同一通道上确认传递。在不同的通道上进行确认将导致“未知的传递标签”协议异常,并关闭该通道。

在说其他东西前需要先弄清楚消息标签的概念,消息标签的作用就是识别消息身份的,消费者在声明了要使用的队列后,RabbitMQ服务会使用basic.deliver方法推送消息,该方法会在消息上增加一个标签。因为消息是通过通道传递的,所以这个标签也是以通道划分的。

消息标签是一个递增的正整数。这里开启多个消费者看看

生产者:

require_once __DIR__ . '/bootstrap.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('test', false, true, false, false);

for ($i = 0; $i < 10; $i++) {
    $msg = new AMQPMessage('test data => ' . $i, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);

    $channel->basic_publish($msg, '', 'test');

}

$channel->close();
$connection->close();

消费者:

require_once __DIR__ . '/bootstrap.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');

$channel = $connection->channel();

$channel->queue_declare('test', false, true, false, false);

$channel->basic_qos(0, 5, null);
$channel->basic_consume('test', '', false, false, false, false, function ($msg) {
    echo "=========start========\r\n";
    echo "| 接收到一条消息:" . $msg->body . "\r\n";
    // 处理耗时任务中
    sleep(1);
    echo "| 消息处理结束\r\n";
    echo "==========end=========\r\n";
    echo '::::::::::$msg->delivery_info[\'delivery_tag\'] = ' . $msg->delivery_info['delivery_tag'] . "\r\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
});

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

shell-1:

=========start========
| 接收到一条消息:test data => 0
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 1
=========start========
| 接收到一条消息:test data => 2
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 2
=========start========
| 接收到一条消息:test data => 4
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 3
=========start========
| 接收到一条消息:test data => 6
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 4
=========start========
| 接收到一条消息:test data => 8
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 5

shell-2:

=========start========
| 接收到一条消息:test data => 1
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 1
=========start========
| 接收到一条消息:test data => 3
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 2
=========start========
| 接收到一条消息:test data => 5
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 3
=========start========
| 接收到一条消息:test data => 7
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 4
=========start========
| 接收到一条消息:test data => 9
| 消息处理结束
==========end=========
::::::::::$msg->delivery_info['delivery_tag'] = 5

可以看到,两个消费者在不同的通道上分别拥有各自的消息标签。同时,也因为标签是针对通道的,所以当消费者做消息确认时传递了一个不存在的标签时会报错,直接basic_ack(999)会出现下面的错误:

PHP Fatal error:  Uncaught PhpAmqpLib\Exception\AMQPProtocolChannelException: PRECONDITION_FAILED - unknown delivery tag 999 in ......

3种确认协议

手动确认消息有3种:ack,nack,reject

ack用于肯定确认,它是用来告诉服务端已确认并消费该消息,服务端此时会自动删除该消息。 reject用于否定确认,它会告诉服务端这条消息已收到但是消费者消费失败。它和ack的区别主要在于语义上:ack的假定消息已成功消费,而reject则表明未消费,但仍应删除。

ack可以批量确认,服务端在收到批量确认后会按照消息标签从小到大确认消息,直到回复的消息标签: 假设通道中有消息1,2,4,6,10未消费,消费者批量确认6号标签,服务端会标记1,2,4,6号消息为已确认并删除,10号消息不会有任何影响。而reject是不支持批量否定的,只能否定当前标签的消息。

nack是reject的一个扩展,它和reject的区别在于,nack可以批量否定确认消息。

这3种确认的方法在vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php

class AMQPChannel extends AbstractChannel
{
/**
     * Acknowledges one or more messages
     *
     * @param string $delivery_tag 消息标签
     * @param bool $multiple 是否批量确认
     */
    public function basic_ack($delivery_tag, $multiple = false)
	
	/**
     * Rejects one or several received messages
     *
     * @param string $delivery_tag 消息标签
     * @param bool $multiple 是否批量确认
     * @param bool $requeue 是否重新回到队列
     */
    public function basic_nack($delivery_tag, $multiple = false, $requeue = false)
	
    /**
     * Rejects an incoming message
     *
     * @param string $delivery_tag 消息标签
     * @param bool $requeue 是否重新回到队列
     */
    public function basic_reject($delivery_tag, $requeue)
    {
        list($class_id, $method_id, $args) = $this->protocolWriter->basicReject($delivery_tag, $requeue);
        $this->send_method_frame(array($class_id, $method_id), $args);
    }
}

重新回到队列

当回复nack,reject的时候,该消息很明显是消费失败的,此时应该阻止RabbitMQ服务端删除该消息。上面源码中的参数可以看到,nack和reject中可以传递bool $requeue来选择是否要让消息重新回到队列中。效果如下:

=========start========
| 接收到一条消息:test data => 0
| 消息处理结束
==========end=========
=========start========
| 接收到一条消息:test data => 0
| 消息处理结束
==========end=========
=========start========
| 接收到一条消息:test data => 0
| 消息处理结束
==========end=========
=========start========
| 接收到一条消息:test data => 0
| 消息处理结束
==========end=========

可以看到这一条消息0一直被归还到队列中重新消费。消息重新排队后,将尽可能放置在其队列中的原始位置。否则(多个消费者共享一个队列,其他消费者的并发传递和确认时),该消息将重新排队到更靠近队列头的位置。重新排队的消息可能立即准备好重新消费,具体取决于它们在队列中的位置以及消费者使用的通道的预获取消息数量。这意味着,如果所有消费者都因某些原因无法处理消息而重新排队将创建一个重新排队/重新消费的循环,这样的循环可能代价很高。实际情况中消费者实现可以跟踪重新交付的次数并永久拒绝消息(不重新入队并做好日志),或者重新向队列中发一条一样的消息而不是做重新排队处理。

通道预获取消息数量

因为消息的传递和确认是异步的,所以消费者可能存在拥有一条或以上的消息待消费的情况。在默认情况下,RabbitMQ服务端会将消息轮流分配给每一个消费者。但是消费者维护大量未确认的消息可能会导致消费者内存飙升或者未成功消费的消息被排到最后导致排队过慢的问题,实际情况中可能需要按需设定消费维护的消息数量。RabbitMQ提供了basic.qos方法设置通道预获取消息数量,该值定义通道上允许的未确认的最大数量。一旦数量达到配置的数量,RabbitMQ将停止在通道上传递更多消息,除非已确认至少一个未处理的消息。

同时这个设置也能解决消费者机器之间性能差距,消费速度不一致时的负载问题。假设队列中有100条消息,消费者A消费一条消息需要1秒,消费者B消费一条消息需要5秒,在默认的轮流分配情况下A和B会各自分配50条消息,这种情况下就会出现性能好的消费者A并不能比B消费更多消息的情况。通过设置预获取消息数量可以让消息在队列中待命,谁先消费谁先获取新的消息,达到理想的负载情况。

当然,如果预获取消息数量设置的太小也会导致消息集中维护在RabbitMQ服务端导致服务端内存飙升。所以实际配置需要在业务中慢慢摸索。

Values in the 100 through 300 range usually offer optimal throughput and do not run significant risk of overwhelming consumers.

文档中推荐预获取消息数量设置为100到300。

设置方法:

$channel->basic_qos(0, 1, null);
    /**
     * Specifies QoS
     *
     * @param int $prefetch_size
     * @param int $prefetch_count 这个参数就是预获取消息数量
     * @param bool $a_global
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
     * @return mixed
     */
    public function basic_qos($prefetch_size, $prefetch_count, $a_global)

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论