【swoft.1.04】swoft中的熔断器

熔断器

什么是熔断器

熔断器在日常生活中其实是真实存在的,这里引用一下百度百科的介绍:

熔断器(fuse)是指当电流超过规定值时,以本身产生的热量使熔体熔断,断开电路的一种电器。熔断器是根据电流超过规定值一段时间后,以其自身产生的热量使熔体熔化,从而使电路断开;运用这种原理制成的一种电流保护器。熔断器广泛应用于高低压配电系统和控制系统以及用电设备中,作为短路和过电流的保护器,是应用最普遍的保护器件之一。

在应用架构中,熔断器起着同样的作用。在上一篇中说到了分布式应用架构做RPC远程调用可能遇到的问题,也就时服务在挂掉后程序互相阻塞导致整个系统雪崩。使用熔断器可以很好的解决这个问题。

熔断技术可以说是一种“智能化的容错”,当调用满足失败次数,失败比例就会触发熔断器打开,有程序自动切断当前的RPC调用,来防止错误进一步扩大。实现一个熔断器主要是考虑三种模式,关闭,打开,半开。

  1. 关闭( Closed ):默认情况下Circuit Breaker是关闭的,此时允许操作执行。CircuitBreaker内部记录着最近失败的次数,如果对应的操作执行失败,次数就会续一次。如果在某个时间段内,失败次数(或者失败比率)达到阈值,CircuitBreaker会转换到开启( Open )状态。在开启状态中,Circuit Breaker会启用一个超时计时器,设这个计时器的目的是给集群相应的时间来恢复故障。当计时器时间到的时候,CircuitBreaker会转换到半开启( Half-Open )状态。

  2. 开启( Open ):在此状态下,执行对应的操作将会立即失败并且立即抛出异常。

  3. 半开启( Half-Open ):在此状态下,Circuit Breaker会允许执行一定数量的操作。如果所有操作全部成功,CircuitBreaker就会假定故障已经恢复,它就会转换到关闭状态,并且重置失败次数。如果其中 任意一次 操作失败了,Circuit Breaker就会认为故障仍然存在,所以它会转换到开启状态并再次开启计时器(再给系统一些时间使其从失败中恢复)

从上面的熔断器介绍可以看见,熔断器的主要作用就是监控客户端和服务端的通信,在通信不顺畅的时候及时阻断客户端和服务端的通信,防止整个服务雪崩。

swoft中的熔断器

使用方法

参考文档:https://swoft.org/documents/v2/microservice/blown-downgraded/

这里需要注意的是,swoft中的熔断器是针对方法的熔断,并不是针对服务的熔断。也就是说这个熔断器监控的是某个方法执行超时或失败的次数,而不是某个服务调用超时或失败的次数,这两者是有区别的。

在实际运行中,有两种情况可能导致雪崩,第一是某个单独的接口出现长时间阻塞,这种情况下针对方法的熔断是非常有效的。但是还有另外一种,就是集群服务下某台服务宕机导致请求会随机阻塞,此时方法的失败并不是连续的。而熔断器的原理是连续调用失败后对服务进行降级处理,所以针对方法的熔断并不能做到这一点,这里先演示方法熔断器的使用和源码。

配置

在swoft中已经为我们定义好了方便的扩展:swoft/breaker

使用的时候只需要在配置中对熔断器进行配置

    'breaker' => [
        'timeout' => 1, // 方法超时时间
        'failThreshold' => 5, // 打开熔断器的阈值(连续失败的次数)
        'sucThreshold' => 3, // 关闭熔断器的阈值(连续成功的次数)
        'retryTime' => 10 // 变为半开的时间(给服务恢复的时间)
    ]

使用

使用方法很简单

熔断器的使用相当简单且功能强大,使用一个 @Breaker 注解即可,Swoft 中的熔断是针对于类里面的方法熔断,只要方法里面没有抛出异常就说明是成功访问的,所以 @Breaker 注解可以在任何 bean 对象方法上面使用。

    /**
     * @Breaker(fallback="bulletScreenTestFallback")
     * @RequestMapping("bulletScreenTest/{time}")
     *
     * @throws Exception
     * @return string
     */
    public function bulletScreenTest()
    {
        return $this->bulletScreenService->getIp();
    }
	
    public function bulletScreenTestFallback()
    {
        return 'fallback';
    }

上面的代码中,bulletScreenTest()是用户请求的方法,在方法进行上注解@Breaker(fallback="bulletScreenTestFallback")即可完成熔断操作。

  1. @Breaker:将该方法放入熔断器中

  2. fallback="bulletScreenTestFallback":熔断器中超时,异常或者阻断时的会掉方法。

  3. 在不定义fallback回调时会抛出异常Breaker call timeout(1.000000)

使用方法文档中也有详细说明,那么我们还是来看看在源码中熔断器是怎么实现的吧。

源码阅读

读取注释和解析

读取和解析的原理就不多赘述了。在解析注释的过程中,swoft/breaker将注释所在的控制器和方法名注册到了vendor/swoft/breaker/src/BreakerRegister.php中,解析的源码片段如下:

use Swoft\Annotation\Annotation\Mapping\AnnotationParser;
use Swoft\Annotation\Annotation\Parser\Parser;
use Swoft\Breaker\Annotation\Mapping\Breaker;
use Swoft\Breaker\BreakerRegister;
use Swoft\Breaker\Exception\BreakerException;

/**
 * Class BreakerParser
 *
 * @since 2.0
 *
 * @AnnotationParser(Breaker::class)
 */
class BreakerParser extends Parser
{
    /**
     * @param int     $type
     * @param Breaker $annotationObject
     *
     * @return array
     * @throws BreakerException
     */
    public function parse(int $type, $annotationObject): array
    {
        if ($type != self::TYPE_METHOD) {
            return [];
        }

        BreakerRegister::registerBreaker($this->className, $this->methodName, $annotationObject);
        return [];
    }
}

在注册器中保存了所有的熔断器

    /**
     * @param string            $className
     * @param string            $method
     * @param BreakerAnnotation $breaker
     *
     * @throws BreakerException
     */
    public static function registerBreaker(string $className, string $method, BreakerAnnotation $breaker)
    {
        if (isset(self::$breakers[$className][$method])) {
            throw new BreakerException(
                sprintf('`@Breaker` must be only one on method(%s->%s)!', $className, $method)
            );
        }

        self::$breakers[$className][$method] = $breaker;
    }

同时,swoft/breaker监听了SwoftEvent::APP_INIT_COMPLETE,在app初始化结束后,将注册器中记录的熔断器注入到熔断器管理者vendor/swoft/breaker/src/BreakerManager.php

监听的事件回调

use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Breaker\BreakerManager;
use Swoft\Breaker\BreakerRegister;
use Swoft\Event\Annotation\Mapping\Listener;
use Swoft\Event\EventHandlerInterface;
use Swoft\Event\EventInterface;
use Swoft\SwoftEvent;

/**
 * Class AppInitCompleteListener
 *
 * @since 2.0
 *
 * @Listener(event=SwoftEvent::APP_INIT_COMPLETE)
 */
class AppInitCompleteListener implements EventHandlerInterface
{
    /**
     * @Inject()
     *
     * @var BreakerManager
     */
    private $breakerManger;

    /**
     * @param EventInterface $event
     *
     */
    public function handle(EventInterface $event): void
    {
        $breakers = BreakerRegister::getBreakers();
        $this->breakerManger->initBreaker($breakers);
    }
}

管理者通过读取记录的控制器和方法,挨个实例化熔断器,并保存在自己的内存中维护。

    /**
     * @param array $breakers
     *
     */
    public function initBreaker(array $breakers): void
    {
        foreach ($breakers as $className => $methodBreakers) {
            /* @var BreakerAnnotation $breaker */
            foreach ($methodBreakers as $methodName => $breaker) {

                $bConfig = [];
                $config  = $breaker->getConfig();
                foreach ($config as $key) {
                    $configMethod  = sprintf('get%s', ucfirst($key));
                    $bConfig[$key] = $breaker->{$configMethod}();
                }

                $this->breakers[$className][$methodName] = Breaker::new($bConfig);
            }
        }
    }

熔断器的实例化过程会先获取从maneger得到的配置,然后将熔断器关闭

    /**
     * @param array $config
     *
     * @return Breaker
     */
    public static function new(array $config): self
    {
        $self = self::__instance();

        foreach ($config as $name => $value) {
            $self->{$name} = $value;
        }

        // Move to close by init
        $self->moveToClose();

        return $self;
    }

到这里需要暂停一下,熔断器的开启,关闭和半关闭状态是由一个状态对象维护的,以上面的moveToClose方法为例

    /**
     */
    public function moveToClose(): void
    {
        $this->state = CloseState::new($this);
    }

状态类会在实例化时注入熔断器以达到状态类和熔断器互相操作的目的。

    /**
     * @param Breaker $breaker
     *
     * @return AbstractState
     */
    public static function new(Breaker $breaker): self
    {
        $self = self::__instance();

        $self->breaker = $breaker;

        // Reset state
        $self->reset();
        return $self;
    }

回到刚才的话题,实例化了一个CloseState对象,对象会使用$breaker将依赖的熔断器的调用失败次数重置为0.

整个熔断器的实例化到这里就结束了,接下来就是等待用户请求了。swoft中对请求做了切片处理,切片是什么请看文档:AOP

swoft使用切片阻止了程序原本的运行,转而在切片内经过熔断器来执行。切片处理在这里vendor/swoft/breaker/src/Aspect/BreakerAspect.php

    /**
     * @Around()
     *
     * @param ProceedingJoinPoint $proceedingJoinPoint
     *
     * @return mixed
     * @throws Throwable
     */
    public function around(ProceedingJoinPoint $proceedingJoinPoint)
    {
        $args = $proceedingJoinPoint->getArgs();
        $target = $proceedingJoinPoint->getTarget();
        $method = $proceedingJoinPoint->getMethod();
        $className = get_class($target);
        $className = Proxy::getOriginalClassName($className);

        $breaker = $this->breakerManager->getBreaker($className, $method);
        $result = $breaker->run($target, $className, $method, [$proceedingJoinPoint, 'proceed'], $args);
        return $result;
    }

在切片中通过熔断器来监控本次请求的运行状态

    /**
     * @param object $target
     * @param string $className
     * @param string $method
     * @param callable|array $callback
     * @param array $params
     *
     * @return mixed
     * @throws BreakerException
     * @throws Throwable
     */
    public function run($target, string $className, string $method, $callback, $params = [])
    {
        if ($method == $this->fallback) {
            throw new BreakerException(sprintf('Method(%s) and fallback must be different', $method));
        }

        try {
            // Check state
            $this->state->check();

            if ($this->timeout == 0) {
                $result = PhpHelper::call($callback);
                $this->state->success();

                return $result;
            }

            $channel = new Channel(1);
            sgo(function () use ($callback, $channel) {
                try {
                    $result = PhpHelper::call($callback);
                    $channel->push([true, $result]);
                } catch (Throwable $e) {
                    $message = sprintf('%s file=%s line=%d', $e->getMessage(), $e->getFile(), $e->getLine());
                    $channel->push([false, $message]);
                }
            }, false);

            $data = $channel->pop($this->timeout);
            if ($data === false) {
                throw new BreakerException(
                    sprintf('Breaker call timeout(%f)', $this->timeout)
                );
            }

            [$status, $result] = $data;
            if ($status == false) {
                throw new BreakerException($result);
            }

            $this->state->success();
            return $result;
        } catch (Throwable $e) {
            $message = sprintf(
                'Breaker(%s->%s %s) call fail!(%s)',
                $className,
                $method,
                json_encode($params),
                $e->getMessage()
            );

            Log::error($message);
            $this->state->exception();

            if (!empty($this->fallback)) {
                return PhpHelper::call([$target, $this->fallback], ...$params);
            }

            throw $e;
        }
    }

熔断器首先会检测自己的状态到底是半开,开启,关闭中的哪一种,通过状态来判断请求是否可以放行,无法放行会直接抛异常在内部捕获。

这里熔断器中如果设置超时时间为0,则仅会监控方法是否抛出异常,如果设置超时时间不为0,源码中会使用了swoole/channel的等待超时机制,起了一个单独的协程环境来处理请求并在协程外部等待子协程往通道中写入结果。这里可能会出现3种情况:

  1. 等待channle消息超时,抛出异常BreakerException,BreakerException会在熔断器内捕获

  2. 协程内部抛出异常,内部会catch这个异常并返回[false,异常消息],外部成功收到消息,针对false在外部抛出一个熔断器异常BreakerException,消息为上面的异常消息,BreakerException会在熔断器内捕获

    1. 在失败的情况下,不管熔断器处于什么状态,熔断器都会增加一次失败调用次数,重置成功调用次数
    2. 在熔断器关闭状态,如果此时失败次数达到开启熔断器阈值,熔断器会打开
    3. 在熔断器半开状态,熔断器直接重新打开
    4. 若为熔断器设置了降级回调参数,会执行降级操作返回数据
  3. 协程内部执行成功,内部返回[true,结果],外部接受到后调用状态类的success方法

    1. 在熔断器关闭状态,success会重置失败次数
    2. 在熔断器半开状态,success会增加一次成功调用计数,重置失败调用计数,如果此时成功次数达到关闭阈值则会将熔断器彻底关闭

尾声

这就是熔断器的使用和源码分析了。不过并不能达到想要的针对连接池中的连接熔断,还是老规矩,先挖空,随缘填,各位再见!

程序幼儿员-龚学鹏
请先登录后发表评论
  • latest comments
  • 总共0条评论