【swoft.1.04】使用swoft+consul做服务注册,服务发现

前景提要

在上一篇大概说了一下如何使用sowft去请求集群的服务,并实现了下图的架构

在我写完后突然发现,swoole中使用服务提供者可以直接在配置中注入。不过没关系,上一篇也可以当做源码分析嘛!

上一篇:【swoft.1.03】如何请求集群的服务

今天,我会通过swoft+consul实现上图架构的升级版,也就是通过consul来实现服务注册,服务发现。

在学习之前,你可能需要先补充一下consul的基础知识:【Consul】下载,安装及最简单的使用介绍

那么进入正题吧!

consul可以做什么

consul可以通过集群consul-server,consul-client代理请求,健康检查这3个基本功能来保证一个分部署服务的高可用,并且能作为服务注册与发现的中间件,方便服务之间的调用和维护。下面我来列举一下分布式项目架构可能会碰到的问题。

分布式项目的问题

  1. 服务与客户端之间无法互相感知。 因为服务与客户端是单独的两个个体,客户端无法知道每个服务应该如何请求。当然,我们可以通过配置服务ip等方式来解决这个问题,但是这样的话增加、删除、转移一个服务的代价就会变得特别高,因为当增加、删除、转移这些服务时,必须修改所有可能会调用到这个服务的配置。维护起来十分麻烦。

  2. 某个服务崩溃或断线可能会导致整个项目阻塞甚至雪崩。 当一个服务的集群中有一台或所有机器崩溃或断线了无法连接时,进行远程调用的客户端可能会阻塞在此io中无法自拔。当然有人会说swoole的协程不是碰到io阻塞时会让出执行权限吗?没错使用swoole确实可以跳过该io,但是阻塞就是阻塞,阻塞的协程中的变量也不会释放,最后也会导致服务器内存飙升造成大面积的崩溃和瘫痪。

consul可以解决吗

使用consul可以做到服务的注册和发现,通过服务注册和发现即可解决服务和客户端之间感知的问题。

一个生活中的例子理解服务注册

假设某一天你想撸串,你家楼下就有好多路边摊。你兴冲冲的穿衣,下楼,结果发现楼下的路边摊被城管赶走了,转移到了另一个角落只是你不知道,最后串没有撸到,下楼出门一趟不容易,外面还挺冷的自己感冒了传染了一家子。这就是上面说到的断线导致雪崩的问题。我们来看看这个过程发生了什么:

  1. 你想撸串 => 客户端准备发送RPC请求

  2. 你兴冲冲的穿衣,下楼 => 发起服务调用连接

  3. 结果发现楼下的路边摊被城管赶走了 => 服务下线

  4. 转移到了另一个角落只是你不知道 => 无法感知到新的服务

  5. 串没有撸到,下楼出门一趟不容易,外面还挺冷的自己感冒了 => io阻塞导致进程/线程/协程无法快速释放内存

  6. 感冒了传染了一家子 => 服务互相阻塞发生了雪崩

那么怎么使用consul解决这个问题呢?consul可以作为一个注册中心,实现服务的注册和发现,完成服务和客户端之间的感知。consul在这个小例子中就相当于有了美团外卖:路边摊可以将自己的门店信息注册到美团外卖中,你也可以通过美团外卖发现这些路边摊的最新信息,并且有人替你跑腿。这就是服务注册和发现。

在swoft中使用consul

swoft/consul

swoft中提供了很方便的swoft/consul组件:swoft/consul

可以通过注释直接注入 agent / health / catalog / kv / session 对象,使用起来简单灵活。

这里我们主要需要使用agent来做服务注册,health来做健康检查和服务发现。

使用agent做服务注册

在consul入门中讲过,consul自带很方便的管理api,使用HTTP即可调用consul中的大部分内容。consul的客户端可以通过/agent/service/register来实现服务的注册。当然,也有很多其他的api,swoft/consul做的事情就是将这些api封装了一层方便调用。

无论是 http / rpc / ws 服务,启动的时候只需监听 SwooleEvent::START 事件,即可把启动的服务注册到第三方集群。

SwooleEvent::START就是swoole启动时触发的第一个事件,在swoole启动时注册服务。

你可能需要查看的文档:swoft-事件swoole-server的生命周期

在app目录下任意位置新建一个针对SwooleEvent::START事件的监听处理器并使用swoft/consul发送注册请求

namespace App\Rpc\Listener;

use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Consul\Agent;
use Swoft\Event\Annotation\Mapping\Listener;
use Swoft\Event\EventHandlerInterface;
use Swoft\Event\EventInterface;
use Swoft\Log\Helper\CLog;
use Swoft\Server\SwooleEvent;

/**
 * Class ServerStartEvent
 * @package App\Rpc\Listener
 * @Listener(SwooleEvent::START)
 */
class ServerStartEvent implements EventHandlerInterface
{
    /**
     * @Inject()
     * @var Agent
     */
    private $agent;

    /**
     * @param EventInterface $event
     * @throws \Swoft\Consul\Exception\ClientException
     * @throws \Swoft\Consul\Exception\ServerException
     */
    public function handle(EventInterface $event): void
    {
        $server = $event->getTarget();

        $host = config('local.host');

        $service = [
            'ID' => 'bulletScreen-' . $host,
            'Name' => 'bulletScreen',
            'Tags' => [
                'http'
            ],
            'Address' => $host,
            'Port' => $server->getPort(),
            'Meta' => [
                'version' => '1.0'
            ],
            'EnableTagOverride' => false,
            'Weights' => [
                'Passing' => 10,
                'Warning' => 1
            ],
            'Check' => [
                "TCP" => $host . ":" . $server->getPort(),
                "Interval" => "5s",
                "Timeout" => "2s"
            ]
        ];

        // Register
        $this->agent->registerService($service);

        CLog::info('Swoft http register service success by consul!');
    }
}

同时,为了避免项目停止后服务未能取消导致consul服务端不停检查心跳,这里推荐在swoole退出时取消服务的注册

服务启动注册服务,服务关闭或者退出则需要取消服务注册,此时这里和注册一样监听一个 SwooleEvent::SHUTDOWN 事件即可

namespace App\Rpc\Listener;

use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Consul\Agent;
use Swoft\Event\Annotation\Mapping\Listener;
use Swoft\Event\EventHandlerInterface;
use Swoft\Event\EventInterface;
use Swoft\Log\Helper\CLog;
use Swoft\Server\SwooleEvent;

/**
 * Class ServerStartEvent
 * @package App\Rpc\Listener
 * @Listener(SwooleEvent::SHUTDOWN)
 */
class ServerStopEvent implements EventHandlerInterface
{
    /**
     * @Inject()
     * @var Agent
     */
    private $agent;

    /**
     * @param EventInterface $event
     * @throws \Swoft\Consul\Exception\ClientException
     * @throws \Swoft\Consul\Exception\ServerException
     */
    public function handle(EventInterface $event): void
    {
        $host = config('local.host');

        $this->agent->deregisterService('bulletScreen-'.$host);

        CLog::info('Swoft http deregister service success by consul!');
    }
}

通过监听swoole的开始和结束事件,使用swoft/consul来做服务注册就是这么简单。

使用health实现服务发现

服务提供者

【swoft.1.03】如何请求集群的服务有提到,通过实现ProviderInterface服务提供者接口swoft底层会绕开配置中配置的host和port转而使用服务提供者getList()方法中返回的服务地址。使用服务提供者不需要像上一篇文章那么麻烦,只需要在配置中注入服务对应的ProviderInterface接口实现类即可:

    'bulletScreen' => [
        'class' => ServiceClient::class,
        'host' => '127.0.0.1',
        'port' => 8001,
        'setting' => [
            'timeout' => 0.5,
            'connect_timeout' => 1.0,
            'write_timeout' => 10.0,
            'read_timeout' => 0.5,
        ],
        'packet' => bean('rpcClientPacket'),
        'provider' => bean(\App\Common\BulletScreenProvider::class)  //  在这里配置服务提供实例
    ],

在consul入门中也说到了:

使用/v1/health/service/web开查看健康检查 使用/v1/health/service/web?passing来查看健康检查通过的服务

在服务提供者实例中,通过调用/v1/health/service/web?passing来获取健康的服务

namespace App\Common;

use ReflectionException;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Bean\Exception\ContainerException;
use Swoft\Consul\Agent;
use Swoft\Consul\Exception\ClientException;
use Swoft\Consul\Exception\ServerException;
use Swoft\Consul\Health;
use Swoft\Rpc\Client\Client;
use Swoft\Rpc\Client\Contract\ProviderInterface;

/**
 * Class RpcProvider
 *
 * @since 2.0
 *
 * @Bean()
 */
class BulletScreenProvider implements ProviderInterface
{
    /**
     * @Inject()
     *
     * @var Health
     */
    private $health;

    const SERVICE_NAME = 'bulletScreen';

    /**
     * @param Client $client
     *
     * @return array
     * @throws ReflectionException
     * @throws ContainerException
     * @throws ClientException
     * @throws ServerException
     * @example
     * [
     *     'host:port',
     *     'host:port',
     *     'host:port',
     * ]
     */
    public function getList(Client $client): array
    {
        $services = $this->health->service(self::SERVICE_NAME, ['passing' => true]);
        $res = json_decode($services->getBody(), true);

        $services = [];

        foreach ($res as $v) {
            $services[] = $v['Service']['Address'] . ':' . $v['Service']['Port'];
        }

        return $services;
    }
}

因为使用/v1/health/service/web?passing在服务中心中检测了服务的运行状态,这里得到的服务都是正常运行的。同时,在这里统一访问consul注册中心就可以得到所有的正常服务了。

当然,consul中也可以配置服务的权重

'Passing' => 10, 'Warning' => 1

通过配置权重,可以在服务提供者中实现加权算法的负载均衡。

负载均衡需要在服务提供者中做好后返回一个只有一个元素的数组给底层的connection.php,暂时底层连接服务时如果收到多个连接只会随机访问

至于加权算法,这里简单画个图给大家解释一下,非常好实现,我就不写demo了。

假设这里有3个服务,下面通过服务=>权重来标示

  1. A => 10
  2. B => 5
  3. C => 2

需要实现的就是图中的功能,比较简单,自行理解吧。

其他要说的

当然,不可能这么简单就解决了分布式项目架构中会出现的问题。由于swoft中存在连接池,并且健康检查是有间隔的,所以在实际运行中可能会因为连接池中的连接状态未更新或健康检查间隔中服务崩溃导致整个雪崩,这就涉及到服务熔断的问题了。还是那句话,坑先开,更新随缘,有缘的话我们下一篇再见吧!

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论