Code Coverage
 
Classes and Traits
Functions and Methods
Lines
Total
0.00% covered (danger)
0.00%
0 / 1
60.00% covered (warning)
60.00%
3 / 5
CRAP
87.04% covered (warning)
87.04%
47 / 54
Worker
0.00% covered (danger)
0.00%
0 / 1
60.00% covered (warning)
60.00%
3 / 5
19.79
87.04% covered (warning)
87.04%
47 / 54
 __construct
100.00% covered (success)
100.00%
1 / 1
2
100.00% covered (success)
100.00%
5 / 5
 run
100.00% covered (success)
100.00%
1 / 1
7
100.00% covered (success)
100.00%
20 / 20
 handleMessage
0.00% covered (danger)
0.00%
0 / 1
7.87
73.91% covered (warning)
73.91%
17 / 23
 stop
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
2 / 2
 dispatchEvent
0.00% covered (danger)
0.00%
0 / 1
2.06
75.00% covered (warning)
75.00%
3 / 4
1<?php
2
3/*
4 * This file is part of the Symfony package.
5 *
6 * (c) Fabien Potencier <fabien@symfony.com>
7 *
8 * For the full copyright and license information, please view the LICENSE
9 * file that was distributed with this source code.
10 */
11
12namespace Symfony\Component\Messenger;
13
14use Psr\Log\LoggerInterface;
15use Symfony\Component\EventDispatcher\Event;
16use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
17use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
18use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
19use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent;
20use Symfony\Component\Messenger\Event\WorkerRunningEvent;
21use Symfony\Component\Messenger\Event\WorkerStartedEvent;
22use Symfony\Component\Messenger\Event\WorkerStoppedEvent;
23use Symfony\Component\Messenger\Exception\HandlerFailedException;
24use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException;
25use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp;
26use Symfony\Component\Messenger\Stamp\ReceivedStamp;
27use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
28use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
29
30/**
31 * @author Samuel Roze <samuel.roze@gmail.com>
32 * @author Tobias Schultze <http://tobion.de>
33 *
34 * @final
35 */
36class Worker
37{
38    private $receivers;
39    private $bus;
40    private $eventDispatcher;
41    private $logger;
42    private $shouldStop = false;
43
44    /**
45     * @param ReceiverInterface[] $receivers Where the key is the transport name
46     */
47    public function __construct(array $receivers, MessageBusInterface $bus, EventDispatcherInterface $eventDispatcher = null, LoggerInterface $logger = null)
48    {
49        $this->receivers = $receivers;
50        $this->bus = $bus;
51        $this->logger = $logger;
52        $this->eventDispatcher = class_exists(Event::class) ? LegacyEventDispatcherProxy::decorate($eventDispatcher) : $eventDispatcher;
53    }
54
55    /**
56     * Receive the messages and dispatch them to the bus.
57     *
58     * Valid options are:
59     *  * sleep (default: 1000000): Time in microseconds to sleep after no messages are found
60     */
61    public function run(array $options = []): void
62    {
63        $this->dispatchEvent(new WorkerStartedEvent($this));
64
65        $options = array_merge([
66            'sleep' => 1000000,
67        ], $options);
68
69        while (false === $this->shouldStop) {
70            $envelopeHandled = false;
71            foreach ($this->receivers as $transportName => $receiver) {
72                $envelopes = $receiver->get();
73
74                foreach ($envelopes as $envelope) {
75                    $envelopeHandled = true;
76
77                    $this->handleMessage($envelope, $receiver, $transportName);
78                    $this->dispatchEvent(new WorkerRunningEvent($this, false));
79
80                    if ($this->shouldStop) {
81                        break 2;
82                    }
83                }
84
85                // after handling a single receiver, quit and start the loop again
86                // this should prevent multiple lower priority receivers from
87                // blocking too long before the higher priority are checked
88                if ($envelopeHandled) {
89                    break;
90                }
91            }
92
93            if (false === $envelopeHandled) {
94                $this->dispatchEvent(new WorkerRunningEvent($this, true));
95
96                usleep($options['sleep']);
97            }
98        }
99
100        $this->dispatchEvent(new WorkerStoppedEvent($this));
101    }
102
103    private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, string $transportName): void
104    {
105        $event = new WorkerMessageReceivedEvent($envelope, $transportName);
106        $this->dispatchEvent($event);
107
108        if (!$event->shouldHandle()) {
109            return;
110        }
111
112        try {
113            $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp()));
114        } catch (\Throwable $throwable) {
115            $rejectFirst = $throwable instanceof RejectRedeliveredMessageException;
116            if ($rejectFirst) {
117                // redelivered messages are rejected first so that continuous failures in an event listener or while
118                // publishing for retry does not cause infinite redelivery loops
119                $receiver->reject($envelope);
120            }
121
122            if ($throwable instanceof HandlerFailedException) {
123                $envelope = $throwable->getEnvelope();
124            }
125
126            $this->dispatchEvent(new WorkerMessageFailedEvent($envelope, $transportName, $throwable));
127
128            if (!$rejectFirst) {
129                $receiver->reject($envelope);
130            }
131
132            return;
133        }
134
135        $this->dispatchEvent(new WorkerMessageHandledEvent($envelope, $transportName));
136
137        if (null !== $this->logger) {
138            $message = $envelope->getMessage();
139            $context = [
140                'message' => $message,
141                'class' => \get_class($message),
142            ];
143            $this->logger->info('{class} was handled successfully (acknowledging to transport).', $context);
144        }
145
146        $receiver->ack($envelope);
147    }
148
149    public function stop(): void
150    {
151        $this->shouldStop = true;
152    }
153
154    private function dispatchEvent(object $event): void
155    {
156        if (null === $this->eventDispatcher) {
157            return;
158        }
159
160        $this->eventDispatcher->dispatch($event);
161    }
162}