Code Coverage
 
Classes and Traits
Functions and Methods
Lines
Total
0.00% covered (danger)
0.00%
0 / 1
40.00% covered (danger)
40.00%
2 / 5
CRAP
85.71% covered (warning)
85.71%
48 / 56
Worker
0.00% covered (danger)
0.00%
0 / 1
40.00% covered (danger)
40.00%
2 / 5
21.17
85.71% covered (warning)
85.71%
48 / 56
 __construct
0.00% covered (danger)
0.00%
0 / 1
3.03
85.71% covered (warning)
85.71%
6 / 7
 run
100.00% covered (success)
100.00%
1 / 1
7
100.00% covered (success)
100.00%
20 / 20
 handleMessage
0.00% covered (danger)
0.00%
0 / 1
7.87
73.91% covered (warning)
73.91%
17 / 23
 stop
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
2 / 2
 dispatchEvent
0.00% covered (danger)
0.00%
0 / 1
2.06
75.00% covered (warning)
75.00%
3 / 4
<?php
/*
 * This file is part of the Symfony package.
 *
 * (c) Fabien Potencier <fabien@symfony.com>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */
namespace Symfony\Component\Messenger;
use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
use Symfony\Component\Messenger\Exception\HandlerFailedException;
use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
use Symfony\Component\Messenger\Stamp\ReceivedStamp;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
/**
 * @author Samuel Roze <samuel.roze@gmail.com>
 * @author Tobias Schultze <http://tobion.de>
 *
 * @final
 */
class Worker
{
    private $receivers;
    private $bus;
    private $eventDispatcher;
    private $logger;
    private $shouldStop = false;
    /**
     * @param ReceiverInterface[] $receivers Where the key is the transport name
     */
    public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
    {
        $this->receivers = $receivers;
        $this->bus = $bus;
        $this->logger = $logger;
        if (null !== $eventDispatcher && class_exists(LegacyEventDispatcherProxy::class)) {
            $this->eventDispatcher = LegacyEventDispatcherProxy::decorate($eventDispatcher);
        } else {
            $this->eventDispatcher = $eventDispatcher;
        }
    }
    /**
     * Receive the messages and dispatch them to the bus.
     *
     * Valid options are:
     *  * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
     */
    public function run(array $options = []): void
    {
        $this->dispatchEvent(new WorkerStartedEvent($this));
        $options = array_merge([
            'sleep' => 1000000,
        ], $options);
        while (false === $this->shouldStop) {
            $envelopeHandled = false;
            foreach ($this->receivers as $transportName => $receiver) {
                $envelopes = $receiver->get();
                foreach ($envelopes as $envelope) {
                    $envelopeHandled = true;
                    $this->handleMessage($envelope, $receiver, $transportName);
                    $this->dispatchEvent(new WorkerRunningEvent($this, false));
                    if ($this->shouldStop) {
                        break 2;
                    }
                }
                // after handling a single receiver, quit and start the loop again
                // this should prevent multiple lower priority receivers from
                // blocking too long before the higher priority are checked
                if ($envelopeHandled) {
                    break;
                }
            }
            if (false === $envelopeHandled) {
                $this->dispatchEvent(new WorkerRunningEvent($this, true));
                usleep($options['sleep']);
            }
        }
        $this->dispatchEvent(new WorkerStoppedEvent($this));
    }
    private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
    {
        $event = new WorkerMessageReceivedEvent($envelope, $transportName);
        $this->dispatchEvent($event);
        if (!$event->shouldHandle()) {
            return;
        }
        try {
            $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
        } catch (\Throwable $throwable) {
            $rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
            if ($rejectFirst) {
                // redelivered messages are rejected first so that continuous failures in an event listener or while
                // publishing for retry does not cause infinite redelivery loops
                $receiver->reject($envelope);
            }
            if ($throwable instanceof HandlerFailedException) {
                $envelope = $throwable->getEnvelope();
            }
            $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
            if (!$rejectFirst) {
                $receiver->reject($envelope);
            }
            return;
        }
        $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
        if (null !== $this->logger) {
            $message = $envelope->getMessage();
            $context = [
                'message' => $message,
                'class' => \get_class($message),
            ];
            $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
        }
        $receiver->ack($envelope);
    }
    public function stop(): void
    {
        $this->shouldStop = true;
    }
    private function dispatchEvent(object $event): void
    {
        if (null === $this->eventDispatcher) {
            return;
        }
        $this->eventDispatcher->dispatch($event);
    }
}