【RabbitMq】3.交换机exchange的类型和使用

交换机Exchange

开始之前,你需要明白一点,RabbitMQ中的消息是由交换机Exchange根据路由Routingkey投递到队列中等待消费的。在 【RabbitMq】1.介绍与入门 中介绍了RabbitMQ的运行流程图

但是在第一章和第二章中并没有讲到交换机exchange相关的内容和设置。在默认情况下,RabbitMQ为我们准备了一个默认的交换机用来使用(AMQP default)

这个交换机足以满足常规的队列操作,这里才真正的开始介绍RabbitMQ里交换机!

交换机的类型

交换机有4种类型,分别是direct,topic,fanout,header。它们各自的特点是不一样的。

direct

direct类型交换机也叫直连交换机,它会在发布一条消息时找到所有精准匹配消息路由的队列然后投放。

如上图所示,一个direct类型的交换机上通过route1,route2,route3绑定了3个不同的队列,此时向交换机发送一条路由为route1的消息,不会被route2和route3处理。

topic

topic类型交换机也叫话题交换机,它可以实现路由的模糊匹配投放消息。topic路由匹配支持两种匹配符:

  1. *:代表匹配一个单词
  2. #:代表匹配0个或多个单词

如上图,3个队列分别通过#bar.**.foo绑定到了topic类型交换机上,此时发送消息会出现这种情况:

  1. 通过bar.foo发送消息

    1. #:匹配到了多个单词,会接受消息
    2. bar.*:可以匹配,会接受消息
    3. *.foo:可以匹配,会接受消息
  2. 通过bar.bar发送消息

    1. #:匹配到了多个单词,会接受消息
    2. bar.*:可以匹配,会接受消息
    3. *.foo:匹配不到,不会接受消息
  3. 通过bar.foo.foo发送消息

    1. #:匹配到了多个单词,会接受消息
    2. bar.*:匹配不到,因为有多个单词,不会接受消息
    3. *.foo:匹配不到,因为有多个单词,不会接受消息
  4. 通过bar发送消息

    1. #:匹配不到,因为#只会匹配0个或多个,不会接受消息
    2. bar.*:匹配不到,因为*必须匹配一个单词,不会接受消息
    3. *.foo:匹配不到,不会接受消息

fanout

fanout也叫扇区交换机,它会像所有绑定到该交换机上的队列发送消息,不管是否设置消息路由,是否匹配消息路由

上图的情况不管通过任何消息路由发送消息,所有队列都会收到。

header

header类型也叫头交换机,它的绑定并不是通过routingKey来做的,而是在发布消息时在header中匹配的。

如上图所示,在绑定时需要在header中设置若干key-value结构参数做为绑定关系,另外还需要一个特殊的参数x-match

  1. x-match = all:代表需要完整匹配所有参数即可收到消息
  2. x-match = any:代表需要至少匹配一个参数即可收到消息

绑定交换机

完整代码可能比较长,下面的$channel就是与RabbitMQ服务端的通信通道

声明一个交换机

$channel->exchange_declare($exchangeName, $exchangeType, true, true, false);
    /**
     * Declares exchange
     *
     * @param string $exchange 交换机的名字
     * @param string $type 交换机的类型
     * @param bool $passive
     * @param bool $durable 持久化
     * @param bool $auto_delete 是否自动删除
     * @param bool $internal 是否为内部使用
     * @param bool $nowait
     * @param array $arguments
     * @param int|null $ticket
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
     * @return mixed|null
     */
    public function exchange_declare(
        $exchange,
        $type,
        $passive = false,
        $durable = false,
        $auto_delete = true,
        $internal = false,
        $nowait = false,
        $arguments = array(),
        $ticket = null
    )

需要注意的是这里声明的交换机和声明队列不一样,声明队列的时候不存在会自动创建,声明交换机的时候测试中不存在会报错NOT_FOUND - no exchange 'games.fanout' in vhost '/'

绑定队列到交换机上

 $channel->queue_bind($queueName, $exchangeName, $routingKey);
   /**
     * Binds queue to an exchange
     *
     * @param string $queue // 队列名
     * @param string $exchange // 交换机的名字
     * @param string $routing_key // 绑定路由
     * @param bool $nowait
     * @param array $arguments
     * @param int|null $ticket
     * @throws \PhpAmqpLib\Exception\AMQPTimeoutException if the specified operation timeout was exceeded
     * @return mixed|null
     */
    public function queue_bind(
        $queue,
        $exchange,
        $routing_key = '',
        $nowait = false,
        $arguments = array(),
        $ticket = null
    ) {

这里的队列和交换机必须是真实存在的

模拟场景测试

假设现在有这样一个业务结构,一个视频类网站的频道有这几种:

  1. game(游戏)
    1. online(网游)
      1. lol(英雄联盟)
      2. pugb(恰鸡)
    2. local(单机)
      1. war3(魔兽3)

需要针对不同场景向关注对应频道的用户发推送

  1. 向所有用户发推送
  2. 向所有关注网游(gome.online)的用户发推送
  3. 向所有关注英雄联盟(game.online.lol)的用户发推送

创建交换机

创建交换机在管理后台创建即可,这里就不演示了,根据业务需求创建了game.direct,game.topic,game.fanout3个交换机

绑定队列到路由中

require_once dirname(__DIR__) . '/bootstrap.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

// 	创建连接和通道
$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');
$channel = $connection->channel();

//	待创建的交换机类型
$exchangeTypes = ['fanout', 'direct', 'topic'];
//	队列名
$queueNames = ['lol' => 'online', 'pugb' => 'online', 'war3' => 'local'];

foreach ($queueNames as $queueName => $category) {
    $channel->queue_declare($queueName, false, true, false, false);

    foreach ($exchangeTypes as $exchangeType) {
        $exchangeName = "game." . $exchangeType;
        if ($exchangeType === 'fanout') {	//	扇区交换机是不需要用消息路由的,当然加上也没有影响
            $routingKey = null;
        } elseif ($exchangeType === 'direct') {
            $routingKey = "game.{$category}.{$queueName}";
        } elseif ($exchangeType === "topic") {
            $routingKey = "game.{$category}.*";
        }
        $channel->queue_bind($queueName, $exchangeName, $routingKey);
    }
}

测试准备

生产者:

require_once dirname(__DIR__) . '/bootstrap.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeType = $argv[1];
$exchangeName = "game." . $exchangeType;
$routingKey = $argv[2] ?? null;

//  声明一个交换机
$channel->exchange_declare($exchangeName, $exchangeType, true, true, false);

$msg = new AMQPMessage(sprintf('test data from => %s, routingKey => %s', $exchangeName, $routingKey), ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
$channel->basic_publish($msg, $exchangeName, $routingKey);

echo "publish done at " . date('H:i:s') . "\r\n";

$channel->close();
$connection->close();

消费者

require_once dirname(__DIR__) . '/bootstrap.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('172.100.0.51', 5672, 'guest', 'guest');

$channel = $connection->channel();

$queueName = $argv[1];
$channel->queue_declare($queueName, false, true, false, false);

$channel->basic_qos(0, 1, null);
$channel->basic_consume($queueName, '', false, false, false, false, function ($msg) use ($queueName) {
    echo sprintf("=========start:%s:%s========\r\n",$queueName,date('H:i:s'));
    echo "| 接收到一条消息:" . $msg->body . "\r\n";
    echo "| 消息处理结束\r\n";
    echo sprintf("=========end:%s:%s========\r\n",$queueName,date('H:i:s'));
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag'], false, true);
});

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

生产者发送数据测试

向所有用户发推送

向所有关注网游(gome.online)的用户发推送

当然也可以向所有关注单机游戏(game.local)的用户发送推送

向所有关注英雄联盟(game.online.lol)的用户发推送

其他

关于header交换机有关的内容先挖个坑,以后想起来了再写吧~随缘更新的博客XD

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论