【swoft.1.02】Swoft中的RPC

RPC

Rpc基础 原理 框架

什么是RPC

RPC是微服务的基石之一,因为微服务本身的核心是将服务拆分后互相调用,调用方法一般有两种模型,一种是REST风格的API接口,另一种就是远程调用RPC了。

RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

比如说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,由于不在一个内存空间,不能直接调用,就需要通过网络来表达调用的语义和传达调用的数据,而这种方式就是rpc

RPC 的主要功能目标是让构建分布式计算(应用)更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。为实现该目标,RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。

为什么要用RPC

  1. 分布式部署及微服务 当我们的系统访问量增大、业务增多时,我们会发现一台单机运行此系统已经无法承受。此时,我们可以将业务拆分成几个互不关联的应用,分别部署在各自机器上,以划清逻辑并减小压力。

  2. 不同技术选型 公司业务规模扩大,有可能引入不同的语言,比如A团队要开发CPU密集型的采用c++语言,B团队要开发IO密集型的采用了PHP,不同语言之间如何通讯

  3. 系统高可用性差 因为所有的功能开发最后都部署到同一个框架里,运行在同一个进程之中,一旦某一功能涉及的代码或者资源有问题,那就会影响整个框架中部署的功能。

协议

为什么选择RPC而不是REST呢?因为RPC不仅支持HTTP,还支持TCP,UDP等协议。同时,在数据的编码上也支持广泛,json,xml,二进制等都没问题。

一般来说为了性能和可靠性会采取TCP,当然,协议还是要看场景。

例子

上面说到

RPC 框架需提供一种透明调用机制让使用者不必显式的区分本地调用和远程调用。

RPC的特点之一就是具有单一原则性,开发者并不需要关心另一端的服务是怎么实现的,只需要调用就行了,并且在调用中请求的过程也是无感的。这里拿swoft中的官方demo调用为例子

文件位置:app/Http/Controller/RpcController.php

/**
 * Class RpcController
 *
 * @since 2.0
 *
 * @Controller()
 */
class RpcController
{
    /**
     * @Reference(pool="user.pool")
     *
     * @var UserInterface
     */
    private $userService;

    /**
     * @Reference(pool="user.pool", version="1.2")
     *
     * @var UserInterface
     */
    private $userService2;

    /**
     * @RequestMapping("getList")
     *
     * @return array
     */
    public function getList(): array
    {
        $result = $this->userService->getList(12, 'type');
        $result2 = $this->userService2->getList(12, 'type');
        return [$result, $result2, $result3];
    }
}

代码中的$this->userService->getList(12, 'type');就是在调用外部的RPC服务了。只需要调用服务暴露在外的接口就可以了,并不需要关心另一个服务的实现方法和RPC之间的通信原理。

swoftRPC相关代码追踪与分析

在开始前,你可能需要这些文档:

  1. swoft官方文档
  2. 注解
  3. swoft官方文档
  4. rpc-client
  5. bean

配置解读

项目相关配置文件在app/bean.php。因为在源码中这个服务是在一个HTTP服务中调用的,要在http服务中使用rpc相关功能,我们需要在httpServer中开启rpc相关的多端口监听

swoole中的多端口监听相关文档:多端口监听的使用

 'httpServer' => [
        'class' => HttpServer::class,
        'port' => 8000,
        'listener' => [
            'rpc' => bean('rpcServer')
        ],
		......
    ]

代码中使用bean()函数获取了配置,将rpcServer实例写入了httpServer的监听对象中。

    'rpcServer' => [
        'class' => ServiceServer::class,
        'port' => 8001,
    ],

rpcServer是一个ServiceServer示例,源码位置:vendor/swoft/rpc-server/src/ServiceServer.php

客户端和连接池的配置

    'user' => [
        'class' => ServiceClient::class,
        'host' => '127.0.0.1',
        'port' => 8001,
        'setting' => [
            'timeout' => 0.5,
            'connect_timeout' => 1.0,
            'write_timeout' => 10.0,
            'read_timeout' => 0.5,
        ],
        'packet' => bean('rpcClientPacket')
    ],
    'user.pool' => [
        'class' => ServicePool::class,
        'client' => bean('user'),
    ],

上面rpc服务端监听的接口改为了8001,这里改成一致的即可。其中setting中的配置会修改swoole客户端的配置。值的注意的是其中有一项配置'packet' => bean('rpcClientPacket'),这个是客户端和服务端打包/拆包的工具,关于为什么要打包拆包在粘包问题的那篇文章有提到,有兴趣的可以自行翻阅。

连接池相关组件就比较深了,因为swoft在连接池上还做了一层代理,这方面的代码还没看的太懂,以后有机会另开一篇吧。

控制器

配置结束,回到刚才的客户端app/Http/Controller/RpcController.php

/**
 * Class RpcController
 *
 * @since 2.0
 *
 * @Controller()
 */
class RpcController
{
    /**
     * @Reference(pool="user.pool")
     *
     * @var UserInterface
     */
    private $userService;

    /**
     * @Reference(pool="user.pool", version="1.2")
     *
     * @var UserInterface
     */
    private $userService2;

    /**
     * @RequestMapping("getList")
     *
     * @return array
     */
    public function getList(): array
    {
        $result = $this->userService->getList(12, 'type');
        $result2 = $this->userService2->getList(12, 'type');
        return [$result, $result2, $result3];
    }
}

这里在控制器中引入了user.pool连接池,并定义了两个私有属性$userService$userService2,这两个属性需要继承UserInterface接口。

下面的操作$this->userService->getList(12, 'type')看起来是本地调用,但是其实内有乾坤!

客户端

在使用UserInterface中的接口时,会从连接池中取出一个连接,当然开始是没有的,swoft会创建连接并加入连接池的代理中。

客户端源码在这里:vendor/swoft/rpc-client/src/Client.php

    /**
     * @param Pool $pool
     *
     * @return Connection
     * @throws RpcClientException
     */
    public function createConnection(Pool $pool): Connection
    {
        $connection = Connection::new($this, $pool);
        $connection->create();

        return $connection;
    }

连接处理的源码在这里:vendor/swoft/rpc-client/src/Connection.php

    /**
     * @throws RpcClientException
     */
    public function create(): void
    {
        $connection = new Client(SWOOLE_SOCK_TCP);

        [$host, $port] = $this->getHostPort();
        $setting = $this->client->getSetting();

        if (!empty($setting)) {
            $connection->set($setting);
        }

        if (!$connection->connect($host, (int)$port)) {
            throw new RpcClientException(
                sprintf('Connect failed host=%s port=%d', $host, $port)
            );
        }

        $this->connection = $connection;
    }

如果连接池代理中有空闲连接的话,会从pool="user.pool"代理中获取一个实例,然后触发唤醒的回调__proxyCall,源码在这里:vendor/swoft/rpc-client/src/Concern/ServiceTrait.php

    protected function __proxyCall(string $interfaceClass, string $methodName, array $params)
    {
        $poolName = ReferenceRegister::getPool(__CLASS__);
        $version  = ReferenceRegister::getVersion(__CLASS__);

        /* @var Pool $pool */
        $pool = BeanFactory::getBean($poolName);

        /* @var Connection $connection */
        $connection = $pool->getConnection();
        $connection->setRelease(true);
        $packet = $connection->getPacket();

        // Ext data
        $ext = $connection->getClient()->getExtender()->getExt();

        $protocol = Protocol::new($version, $interfaceClass, $methodName, $params, $ext);
        $data     = $packet->encode($protocol);
        $message  = sprintf(
            'Rpc call failed.interface=%s method=%s pool=%s version=%s',
            $interfaceClass, $methodName, $poolName, $version
        );

        $result = $this->sendAndRecv($connection, $data, $message);
        $connection->release();

        $response = $packet->decodeResponse($result);
		......
        return $response->getResult();

    }

触发后会根据$interfaceClass, $methodName$params使用打包工具打包发送的消息,发送请求并等待返回。

这就是客户端做的事情。

服务端

在服务端,接受到请求后会根据请求分发到对应的中间件中,具体流程如下:

接收到请求触发onReceive事件

源码位置:vendor/swoft/rpc-server/src/Swoole/ReceiveListener.php

    public function onReceive(Server $server, int $fd, int $reactorId, string $data): void
    {
        $request  = Request::new($server, $fd, $reactorId, $data);
        $response = Response::new($server, $fd, $reactorId);

        /* @var ServiceDispatcher $dispatcher */
        $dispatcher = BeanFactory::getSingleton('serviceDispatcher');

        $dispatcher->dispatch($request, $response);
    }

使用dispatch分发请求,源码位置:vendor/swoft/rpc-server/src/ServiceDispatcher.php

    public function dispatch(...$params)
    {
        /**
         * @var Request  $request
         * @var Response $response
         */
        [$request, $response] = $params;

        try {
            Swoft::trigger(ServiceServerEvent::BEFORE_RECEIVE, null, $request, $response);

            $handler  = ServiceHandler::new($this->requestMiddleware(), $this->defaultMiddleware);
            $response = $handler->handle($request);
        } catch (Throwable $e) {
            /** @var RpcErrorDispatcher $errDispatcher */
            $errDispatcher = BeanFactory::getSingleton(RpcErrorDispatcher::class);

            // Handle request error
            $response = $errDispatcher->run($e, $response);
        }

        Swoft::trigger(ServiceServerEvent::AFTER_RECEIVE, null, $response);
    }

在请求分发中调用ServiceHandler类加载指定/默认中间件处理请求,源码位置:vendor/swoft/rpc-server/src/ServiceHandler.php

    /**
     * @param array  $middlewares
     * @param string $defaultMiddleware
     *
     * @return self
     *
     */
    public static function new(array $middlewares, string $defaultMiddleware): self
    {
        $instance = self::__instance();

        $instance->offset = 0;

        $instance->middlewares       = $middlewares;
        $instance->defaultMiddleware = $defaultMiddleware;

        return $instance;
    }

    /**
     * @param RequestInterface $request
     *
     * @return ResponseInterface
     */
    public function handle(RequestInterface $request): ResponseInterface
    {
        // Default middleware to handle request route
        $middleware = $this->middlewares[$this->offset] ?? $this->defaultMiddleware;

        /* @var MiddlewareInterface $bean */
        $bean = BeanFactory::getBean($middleware);

        // Next middleware
        $this->offset++;

        return $bean->process($request, $this);
    }

这里以UserMiddleware为例,源码位置:vendor/swoft/rpc-server/src/Middleware/UserMiddleware.php

    /**
     * @param RequestInterface        $request
     * @param RequestHandlerInterface $requestHandler
     *
     * @return ResponseInterface
     * @throws RpcServerException
     */
    public function process(RequestInterface $request, RequestHandlerInterface $requestHandler): ResponseInterface
    {
        $version   = $request->getVersion();
        $interface = $request->getInterface();
        $method    = $request->getMethod();

        /* @var Router $router */
        $router = BeanFactory::getBean('serviceRouter');

        $handler = $router->match($version, $interface);

        $request->setAttribute(Request::ROUTER_ATTRIBUTE, $handler);

        [$status, $className] = $handler;

        if ($status != Router::FOUND) {
            return $requestHandler->handle($request);
        }

        $middlewares = MiddlewareRegister::getMiddlewares($className, $method);
        if (!empty($middlewares) && $requestHandler instanceof ServiceHandler) {
            $requestHandler->insertMiddlewares($middlewares);
        }

        return $requestHandler->handle($request);
    }

$handler = $router->match($version, $interface);在中间件中寻找了请求对应路由关系,并根据路由关系执行方法并/返回错误

结束

这就是整个swoft中封装的RPC服务整个调用流程了,内部做了非常多的处理,包括连接池,事件处理,唤醒等,但是在使用者看来只是简单的调用了$this->userService->getList(12, 'type');。非常易于维护和做分布式项目架构!后面可能会有更深的内容,先挖个坑。

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论