1. <?php
  2. /**
  3. * Task Queue Batch Processor CLI
  4. */
  5. require_once 'vendor/autoload.php'; // Autoload Composer dependencies
  6. use Symfony\Component\Console\Application;
  7. use Symfony\Component\Console\Command\Command;
  8. use Symfony\Component\Console\Input\InputInterface;
  9. use Symfony\Component\Console\Output\OutputInterface;
  10. use League\Queue\Laravel\LaravelQueue; //Example Queue implementation
  11. class BatchProcessorCommand extends Command
  12. {
  13. protected $name = 'batch-process';
  14. protected $description = 'Processes tasks from a queue in batches.';
  15. private $queue;
  16. private $batchSize;
  17. public function __construct(LaravelQueue $queue, $batchSize = 10)
  18. {
  19. parent::__construct();
  20. $this->queue = $queue;
  21. $this->batchSize = $batchSize;
  22. }
  23. /**
  24. * Set the batch size.
  25. *
  26. * @param string $batchSize
  27. * @return void
  28. */
  29. public function setBatchSize(string $batchSize)
  30. {
  31. $this->batchSize = (int)$batchSize;
  32. }
  33. /**
  34. * Define the command arguments.
  35. *
  36. * @param InputInterface $input
  37. * @param OutputInterface $output
  38. * @return void
  39. */
  40. protected function defineArguments(InputInterface $input, OutputInterface $output)
  41. {
  42. $output->registerArgument('batch-size', 'The number of tasks to process in each batch (default: 10)', 1);
  43. }
  44. /**
  45. * Execute the command.
  46. *
  47. * @param InputInterface $input
  48. * @param OutputInterface $output
  49. * @return void
  50. */
  51. public function run(InputInterface $input, OutputInterface $output)
  52. {
  53. $output->writeln('<info>Starting batch processing...</info>');
  54. try {
  55. $this->processQueueInBatches();
  56. } catch (\Exception $e) {
  57. $output->writeln('<error>Error during batch processing: ' . $e->getMessage() . '</error>');
  58. }
  59. $output->writeln('<info>Batch processing completed.</info>');
  60. }
  61. private function processQueueInBatches()
  62. {
  63. $queue = $this->queue;
  64. while (true) {
  65. $tasks = $queue->getTasks($this->batchSize);
  66. if (empty($tasks)) {
  67. break; // No more tasks in the queue
  68. }
  69. $output->writeln('<info>Processing batch of ' . count($tasks) . ' tasks...</info>');
  70. foreach ($tasks as $task) {
  71. try {
  72. $task->process(); // Execute the task
  73. } catch (\Exception $e) {
  74. $output->writeln('<error>Error processing task ' . $task->id . ': ' . $e->getMessage() . '</error>');
  75. // Optionally, handle task failures (e.g., retry, log to a dead-letter queue)
  76. }
  77. }
  78. }
  79. }
  80. }
  81. // Create the console application
  82. $application = new Application();
  83. $application->addCommand(new BatchProcessorCommand(new LaravelQueue()));
  84. $application->run();
  85. ?>

Add your comment