Code Coverage
 
Classes and Traits
Functions and Methods
Lines
Total
0.00% covered (danger)
0.00%
0 / 1
56.25% covered (warning)
56.25%
9 / 16
CRAP
94.08% covered (success)
94.08%
143 / 152
Connection
0.00% covered (danger)
0.00%
0 / 1
56.25% covered (warning)
56.25%
9 / 16
46.44
94.08% covered (success)
94.08%
143 / 152
 __construct
100.00% covered (success)
100.00%
1 / 1
2
100.00% covered (success)
100.00%
7 / 7
 getConfiguration
0.00% covered (danger)
0.00%
0 / 1
2
0.00% covered (danger)
0.00%
0 / 1
 buildConfiguration
0.00% covered (danger)
0.00%
0 / 1
5.01
93.33% covered (success)
93.33%
14 / 15
 send
0.00% covered (danger)
0.00%
0 / 1
2.00
93.75% covered (success)
93.75%
15 / 16
 get
0.00% covered (danger)
0.00%
0 / 1
7
96.67% covered (success)
96.67%
29 / 30
 ack
100.00% covered (success)
100.00%
1 / 1
2
100.00% covered (success)
100.00%
3 / 3
 reject
100.00% covered (success)
100.00%
1 / 1
2
100.00% covered (success)
100.00%
3 / 3
 setup
0.00% covered (danger)
0.00%
0 / 1
3.11
76.92% covered (warning)
76.92%
10 / 13
 getMessageCount
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
4 / 4
 findAll
0.00% covered (danger)
0.00%
0 / 1
2.02
83.33% covered (warning)
83.33%
5 / 6
 find
100.00% covered (success)
100.00%
1 / 1
3
100.00% covered (success)
100.00%
6 / 6
 createAvailableMessagesQueryBuilder
0.00% covered (danger)
0.00%
0 / 1
2.00
92.31% covered (success)
92.31%
12 / 13
 createQueryBuilder
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
3 / 3
 executeQuery
100.00% covered (success)
100.00%
1 / 1
4
100.00% covered (success)
100.00%
8 / 8
 getSchema
100.00% covered (success)
100.00%
1 / 1
8
100.00% covered (success)
100.00%
22 / 22
 decodeEnvelopeHeaders
100.00% covered (success)
100.00%
1 / 1
1
100.00% covered (success)
100.00%
2 / 2
<?php
/*
 * This file is part of the Symfony package.
 *
 * (c) Fabien Potencier <fabien@symfony.com>
 *
 * For the full copyright and license information, please view the LICENSE
 * file that was distributed with this source code.
 */
namespace Symfony\Component\Messenger\Transport\Doctrine;
use Doctrine\DBAL\Connection as DBALConnection;
use Doctrine\DBAL\DBALException;
use Doctrine\DBAL\Driver\ResultStatement;
use Doctrine\DBAL\Exception\TableNotFoundException;
use Doctrine\DBAL\Query\QueryBuilder;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Exception\TransportException;
/**
 * @author Vincent Touzet <vincent.touzet@gmail.com>
 *
 * @final
 */
class Connection
{
    private const DEFAULT_OPTIONS = [
        'table_name' => 'messenger_messages',
        'queue_name' => 'default',
        'redeliver_timeout' => 3600,
        'auto_setup' => true,
    ];
    /**
     * Configuration of the connection.
     *
     * Available options:
     *
     * * table_name: name of the table
     * * connection: name of the Doctrine's entity manager
     * * queue_name: name of the queue
     * * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default 3600
     * * auto_setup: Whether the table should be created automatically during send / get. Default : true
     */
    private $configuration = [];
    private $driverConnection;
    private $schemaSynchronizer;
    private $autoSetup;
    private static $useDeprecatedConstants;
    public function __construct(array $configuration, DBALConnection $driverConnection, SchemaSynchronizer $schemaSynchronizer = null)
    {
        $this->configuration = array_replace_recursive(self::DEFAULT_OPTIONS, $configuration);
        $this->driverConnection = $driverConnection;
        $this->schemaSynchronizer = $schemaSynchronizer ?? new SingleDatabaseSynchronizer($this->driverConnection);
        $this->autoSetup = $this->configuration['auto_setup'];
        if (null === self::$useDeprecatedConstants) {
            self::$useDeprecatedConstants = !class_exists(Types::class);
        }
    }
    public function getConfiguration(): array
    {
        return $this->configuration;
    }
    public static function buildConfiguration(string $dsn, array $options = []): array
    {
        if (false === $components = parse_url($dsn)) {
            throw new InvalidArgumentException(sprintf('The given Doctrine Messenger DSN "%s" is invalid.', $dsn));
        }
        $query = [];
        if (isset($components['query'])) {
            parse_str($components['query'], $query);
        }
        $configuration = ['connection' => $components['host']];
        $configuration += $options + $query + self::DEFAULT_OPTIONS;
        $configuration['auto_setup'] = filter_var($configuration['auto_setup'], FILTER_VALIDATE_BOOLEAN);
        // check for extra keys in options
        $optionsExtraKeys = array_diff(array_keys($options), array_keys(self::DEFAULT_OPTIONS));
        if (0 < \count($optionsExtraKeys)) {
            throw new InvalidArgumentException(sprintf('Unknown option found : [%s]. Allowed options are [%s].', implode(', ', $optionsExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
        }
        // check for extra keys in options
        $queryExtraKeys = array_diff(array_keys($query), array_keys(self::DEFAULT_OPTIONS));
        if (0 < \count($queryExtraKeys)) {
            throw new InvalidArgumentException(sprintf('Unknown option found in DSN: [%s]. Allowed options are [%s].', implode(', ', $queryExtraKeys), implode(', ', array_keys(self::DEFAULT_OPTIONS))));
        }
        return $configuration;
    }
    /**
     * @param int $delay The delay in milliseconds
     *
     * @return string The inserted id
     *
     * @throws \Doctrine\DBAL\DBALException
     */
    public function send(string $body, array $headers, int $delay = 0): string
    {
        $now = new \DateTime();
        $availableAt = (clone $now)->modify(sprintf('+%d seconds', $delay / 1000));
        $queryBuilder = $this->driverConnection->createQueryBuilder()
            ->insert($this->configuration['table_name'])
            ->values([
                'body' => '?',
                'headers' => '?',
                'queue_name' => '?',
                'created_at' => '?',
                'available_at' => '?',
            ]);
        $this->executeQuery($queryBuilder->getSQL(), [
            $body,
            json_encode($headers),
            $this->configuration['queue_name'],
            $now,
            $availableAt,
        ], self::$useDeprecatedConstants ? [
            null,
            null,
            null,
            Type::DATETIME,
            Type::DATETIME,
        ] : [
            null,
            null,
            null,
            Types::DATETIME_MUTABLE,
            Types::DATETIME_MUTABLE,
        ]);
        return $this->driverConnection->lastInsertId();
    }
    public function get(): ?array
    {
        get:
        $this->driverConnection->beginTransaction();
        try {
            $query = $this->createAvailableMessagesQueryBuilder()
                ->orderBy('available_at', 'ASC')
                ->setMaxResults(1);
            // use SELECT ... FOR UPDATE to lock table
            $doctrineEnvelope = $this->executeQuery(
                $query->getSQL().' '.$this->driverConnection->getDatabasePlatform()->getWriteLockSQL(),
                $query->getParameters(),
                $query->getParameterTypes()
            )->fetch();
            if (false === $doctrineEnvelope) {
                $this->driverConnection->commit();
                return null;
            }
            $doctrineEnvelope = $this->decodeEnvelopeHeaders($doctrineEnvelope);
            $queryBuilder = $this->driverConnection->createQueryBuilder()
                ->update($this->configuration['table_name'])
                ->set('delivered_at', '?')
                ->where('id = ?');
            $now = new \DateTime();
            $this->executeQuery($queryBuilder->getSQL(), [
                $now,
                $doctrineEnvelope['id'],
            ], [
                self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE,
            ]);
            $this->driverConnection->commit();
            return $doctrineEnvelope;
        } catch (\Throwable $e) {
            $this->driverConnection->rollBack();
            if ($this->autoSetup && $e instanceof TableNotFoundException) {
                $this->setup();
                goto get;
            }
            throw $e;
        }
    }
    public function ack(string $id): bool
    {
        try {
            return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
        } catch (DBALException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }
    }
    public function reject(string $id): bool
    {
        try {
            return $this->driverConnection->delete($this->configuration['table_name'], ['id' => $id]) > 0;
        } catch (DBALException $exception) {
            throw new TransportException($exception->getMessage(), 0, $exception);
        }
    }
    public function setup(): void
    {
        $configuration = $this->driverConnection->getConfiguration();
        // Since Doctrine 2.9 the getFilterSchemaAssetsExpression is deprecated
        $hasFilterCallback = method_exists($configuration, 'getSchemaAssetsFilter');
        if ($hasFilterCallback) {
            $assetFilter = $this->driverConnection->getConfiguration()->getSchemaAssetsFilter();
            $this->driverConnection->getConfiguration()->setSchemaAssetsFilter(null);
        } else {
            $assetFilter = $this->driverConnection->getConfiguration()->getFilterSchemaAssetsExpression();
            $this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression(null);
        }
        $this->schemaSynchronizer->updateSchema($this->getSchema(), true);
        if ($hasFilterCallback) {
            $this->driverConnection->getConfiguration()->setSchemaAssetsFilter($assetFilter);
        } else {
            $this->driverConnection->getConfiguration()->setFilterSchemaAssetsExpression($assetFilter);
        }
        $this->autoSetup = false;
    }
    public function getMessageCount(): int
    {
        $queryBuilder = $this->createAvailableMessagesQueryBuilder()
            ->select('COUNT(m.id) as message_count')
            ->setMaxResults(1);
        return $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchColumn();
    }
    public function findAll(int $limit = null): array
    {
        $queryBuilder = $this->createAvailableMessagesQueryBuilder();
        if (null !== $limit) {
            $queryBuilder->setMaxResults($limit);
        }
        $data = $this->executeQuery($queryBuilder->getSQL(), $queryBuilder->getParameters(), $queryBuilder->getParameterTypes())->fetchAll();
        return array_map(function ($doctrineEnvelope) {
            return $this->decodeEnvelopeHeaders($doctrineEnvelope);
        }, $data);
    }
    public function find($id): ?array
    {
        $queryBuilder = $this->createQueryBuilder()
            ->where('m.id = ?');
        $data = $this->executeQuery($queryBuilder->getSQL(), [
            $id,
        ])->fetch();
        return false === $data ? null : $this->decodeEnvelopeHeaders($data);
    }
    private function createAvailableMessagesQueryBuilder(): QueryBuilder
    {
        $now = new \DateTime();
        $redeliverLimit = (clone $now)->modify(sprintf('-%d seconds', $this->configuration['redeliver_timeout']));
        return $this->createQueryBuilder()
            ->where('m.delivered_at is null OR m.delivered_at < ?')
            ->andWhere('m.available_at <= ?')
            ->andWhere('m.queue_name = ?')
            ->setParameters([
                $redeliverLimit,
                $now,
                $this->configuration['queue_name'],
            ], self::$useDeprecatedConstants ? [
                Type::DATETIME,
                Type::DATETIME,
            ] : [
                Types::DATETIME_MUTABLE,
                Types::DATETIME_MUTABLE,
            ]);
    }
    private function createQueryBuilder(): QueryBuilder
    {
        return $this->driverConnection->createQueryBuilder()
            ->select('m.*')
            ->from($this->configuration['table_name'], 'm');
    }
    private function executeQuery(string $sql, array $parameters = [], array $types = []): ResultStatement
    {
        try {
            $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
        } catch (TableNotFoundException $e) {
            if ($this->driverConnection->isTransactionActive()) {
                throw $e;
            }
            // create table
            if ($this->autoSetup) {
                $this->setup();
            }
            $stmt = $this->driverConnection->executeQuery($sql, $parameters, $types);
        }
        return $stmt;
    }
    private function getSchema(): Schema
    {
        $schema = new Schema([], [], $this->driverConnection->getSchemaManager()->createSchemaConfig());
        $table = $schema->createTable($this->configuration['table_name']);
        $table->addColumn('id', self::$useDeprecatedConstants ? Type::BIGINT : Types::BIGINT)
            ->setAutoincrement(true)
            ->setNotnull(true);
        $table->addColumn('body', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
            ->setNotnull(true);
        $table->addColumn('headers', self::$useDeprecatedConstants ? Type::TEXT : Types::TEXT)
            ->setNotnull(true);
        $table->addColumn('queue_name', self::$useDeprecatedConstants ? Type::STRING : Types::STRING)
            ->setNotnull(true);
        $table->addColumn('created_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
            ->setNotnull(true);
        $table->addColumn('available_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
            ->setNotnull(true);
        $table->addColumn('delivered_at', self::$useDeprecatedConstants ? Type::DATETIME : Types::DATETIME_MUTABLE)
            ->setNotnull(false);
        $table->setPrimaryKey(['id']);
        $table->addIndex(['queue_name']);
        $table->addIndex(['available_at']);
        $table->addIndex(['delivered_at']);
        return $schema;
    }
    private function decodeEnvelopeHeaders(array $doctrineEnvelope): array
    {
        $doctrineEnvelope['headers'] = json_decode($doctrineEnvelope['headers'], true);
        return $doctrineEnvelope;
    }
}