vendor/pimcore/data-hub-simple-rest/src/Queue/QueueService.php line 39

Open in your IDE?
  1. <?php
  2. /**
  3.  * Pimcore
  4.  *
  5.  * This source file is available under following license:
  6.  * - Pimcore Commercial License (PCL)
  7.  *
  8.  *  @copyright  Copyright (c) Pimcore GmbH (http://www.pimcore.org)
  9.  *  @license    http://www.pimcore.org/license     PCL
  10.  */
  11. namespace Pimcore\Bundle\DataHubSimpleRestBundle\Queue;
  12. use Carbon\Carbon;
  13. use Doctrine\DBAL\Exception\TableNotFoundException;
  14. use Pimcore\Db;
  15. class QueueService
  16. {
  17.     const QUEUE_TABLE_NAME 'bundle_data_hub_rest_index_queue';
  18.     /**
  19.      * @return Db\Connection|Db\ConnectionInterface
  20.      */
  21.     protected function getDb()
  22.     {
  23.         return Db::get();
  24.     }
  25.     protected function getCurrentQueueTableOperationTime(): int
  26.     {
  27.         /** @var Carbon $carbonNow */
  28.         $carbonNow Carbon::now();
  29.         return (int)($carbonNow->getTimestamp() . str_pad((string)$carbonNow->milli3'0'));
  30.     }
  31.     public function addItemToQueue($elementIdstring $entityTypestring $configName null)
  32.     {
  33.         try {
  34.             $this->getDb()->executeQuery(sprintf(
  35.                 'INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE timestamp = VALUES(timestamp)',
  36.                 self::QUEUE_TABLE_NAME,
  37.                 implode(',', ['elementId''timestamp''configName''entityType']),
  38.                 implode(',', [
  39.                     $elementId,
  40.                     $this->getCurrentQueueTableOperationTime(),
  41.                     $this->getDb()->quote($configName),
  42.                     $this->getDb()->quote($entityType)
  43.                 ])
  44.             ));
  45.         } catch (TableNotFoundException $exception) {
  46.             $this->createQueueTableIfNotExisting(function () use ($elementId$entityType$configName) {
  47.                 $this->addItemToQueue($elementId$entityType$configName);
  48.             });
  49.         }
  50.     }
  51.     protected function createQueueTableIfNotExisting(\Closure $callable null)
  52.     {
  53.         $this->getDb()->executeQuery(sprintf('CREATE TABLE IF NOT EXISTS %s (
  54.             elementId bigint NOT NULL,
  55.             timestamp bigint NULL,
  56.             configName varchar(50) NOT NULL,
  57.             entityType varchar(50) NOT NULL,
  58.             dispatched bigint NULL,
  59.             workerId varchar(13) NULL,
  60.             PRIMARY KEY (elementId, configName, entityType),
  61.             KEY `bundle_index_queue_configName_workerId` (`workerId`),
  62.             KEY `bundle_index_queue_configName_index` (`configName`))
  63.         'self::QUEUE_TABLE_NAME));
  64.         if ($callable) {
  65.             return $callable();
  66.         }
  67.     }
  68.     public function getAllQueueEntries($limit 100000bool $dispatch false): array
  69.     {
  70.         try {
  71.             if ($dispatch === true) {
  72.                 $dispatchId time();
  73.                 $workerId uniqid();
  74.                 $this->getDb()->executeQuery('UPDATE ' self::QUEUE_TABLE_NAME ' SET dispatched = ?, workerId = ? WHERE (ISNULL(dispatched) OR dispatched < ?) LIMIT ' intval($limit),
  75.                     [$dispatchId$workerId$dispatchId 3000]);
  76.                 $results $this->getDb()->fetchAll(
  77.                     sprintf('SELECT * FROM %s WHERE workerId = ?'self::QUEUE_TABLE_NAME),
  78.                     [$workerId]
  79.                 );
  80.             } else {
  81.                 $results $this->getDb()->fetchAll(
  82.                     sprintf('SELECT * FROM %s'self::QUEUE_TABLE_NAME)
  83.                 );
  84.             }
  85.             return $results ?? [];
  86.         } catch (TableNotFoundException $exception) {
  87.             return $this->createQueueTableIfNotExisting(function () use ($limit) {
  88.                 return $this->getAllQueueEntries($limit);
  89.             });
  90.         }
  91.     }
  92.     /**
  93.      * @return int
  94.      */
  95.     public function getQueueItemCount(): int
  96.     {
  97.         try {
  98.             return $this->getDb()->fetchOne(
  99.                 sprintf('SELECT count(*) as count FROM %s'self::QUEUE_TABLE_NAME)
  100.             ) ?? 0;
  101.         } catch (TableNotFoundException $exception) {
  102.             return $this->createQueueTableIfNotExisting(function () {
  103.                 return $this->getQueueItemCount();
  104.             });
  105.         }
  106.     }
  107.     public function markQueueEntryAsProcessed($elementId$entityType$configName)
  108.     {
  109.         try {
  110.             $this->getDb()->executeQuery(
  111.                 sprintf('DELETE FROM %s WHERE elementId = ? AND entityType = ? AND configName = ?'self::QUEUE_TABLE_NAME),
  112.                 [$elementId$entityType$configName ?? '']
  113.             );
  114.         } catch (TableNotFoundException $exception) {
  115.             $this->createQueueTableIfNotExisting();
  116.         }
  117.     }
  118. }