관리-도구
편집 파일: Worker.php
<?php namespace Illuminate\Queue; use Illuminate\Contracts\Cache\Repository as CacheContract; use Illuminate\Contracts\Debug\ExceptionHandler; use Illuminate\Contracts\Events\Dispatcher; use Illuminate\Contracts\Queue\Factory as QueueManager; use Illuminate\Database\DetectsLostConnections; use Illuminate\Queue\Events\JobExceptionOccurred; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; use Illuminate\Queue\Events\Looping; use Illuminate\Queue\Events\WorkerStopping; use Illuminate\Support\Carbon; use Throwable; class Worker { use DetectsLostConnections; /** * The queue manager instance. * * @var \Illuminate\Contracts\Queue\Factory */ protected $manager; /** * The event dispatcher instance. * * @var \Illuminate\Contracts\Events\Dispatcher */ protected $events; /** * The cache repository implementation. * * @var \Illuminate\Contracts\Cache\Repository */ protected $cache; /** * The exception handler instance. * * @var \Illuminate\Contracts\Debug\ExceptionHandler */ protected $exceptions; /** * The callback used to determine if the application is in maintenance mode. * * @var callable */ protected $isDownForMaintenance; /** * Indicates if the worker should exit. * * @var bool */ public $shouldQuit = false; /** * Indicates if the worker is paused. * * @var bool */ public $paused = false; /** * Create a new queue worker. * * @param \Illuminate\Contracts\Queue\Factory $manager * @param \Illuminate\Contracts\Events\Dispatcher $events * @param \Illuminate\Contracts\Debug\ExceptionHandler $exceptions * @param callable $isDownForMaintenance * @return void */ public function __construct(QueueManager $manager, Dispatcher $events, ExceptionHandler $exceptions, callable $isDownForMaintenance) { $this->events = $events; $this->manager = $manager; $this->exceptions = $exceptions; $this->isDownForMaintenance = $isDownForMaintenance; } /** * Listen to the given queue in a loop. * * @param string $connectionName * @param string $queue * @param \Illuminate\Queue\WorkerOptions $options * @return void */ public function daemon($connectionName, $queue, WorkerOptions $options) { if ($this->supportsAsyncSignals()) { $this->listenForSignals(); } $lastRestart = $this->getTimestampOfLastQueueRestart(); while (true) { // Before reserving any jobs, we will make sure this queue is not paused and // if it is we will just pause this worker for a given amount of time and // make sure we do not need to kill this worker process off completely. if (! $this->daemonShouldRun($options, $connectionName, $queue)) { $this->pauseWorker($options, $lastRestart); continue; } // First, we will attempt to get the next job off of the queue. We will also // register the timeout handler and reset the alarm for this job so it is // not stuck in a frozen state forever. Then, we can fire off this job. $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); if ($this->supportsAsyncSignals()) { $this->registerTimeoutHandler($job, $options); } // If the daemon should run (not in maintenance mode, etc.), then we can run // fire off this job for processing. Otherwise, we will need to sleep the // worker so no more jobs are processed until they should be processed. if ($job) { $this->runJob($job, $connectionName, $options); } else { $this->sleep($options->sleep); } if ($this->supportsAsyncSignals()) { $this->resetTimeoutHandler(); } // Finally, we will check to see if we have exceeded our memory limits or if // the queue should restart based on other indications. If so, we'll stop // this worker and let whatever is "monitoring" it restart the process. $this->stopIfNecessary($options, $lastRestart, $job); } } /** * Register the worker timeout handler. * * @param \Illuminate\Contracts\Queue\Job|null $job * @param \Illuminate\Queue\WorkerOptions $options * @return void */ protected function registerTimeoutHandler($job, WorkerOptions $options) { // We will register a signal handler for the alarm signal so that we can kill this // process if it is running too long because it has frozen. This uses the async // signals supported in recent versions of PHP to accomplish it conveniently. pcntl_signal(SIGALRM, function () use ($job, $options) { if ($job) { $this->markJobAsFailedIfWillExceedMaxAttempts( $job->getConnectionName(), $job, (int) $options->maxTries, $this->maxAttemptsExceededException($job) ); } $this->kill(1); }); pcntl_alarm( max($this->timeoutForJob($job, $options), 0) ); } /** * Reset the worker timeout handler. * * @return void */ protected function resetTimeoutHandler() { pcntl_alarm(0); } /** * Get the appropriate timeout for the given job. * * @param \Illuminate\Contracts\Queue\Job|null $job * @param \Illuminate\Queue\WorkerOptions $options * @return int */ protected function timeoutForJob($job, WorkerOptions $options) { return $job && ! is_null($job->timeout()) ? $job->timeout() : $options->timeout; } /** * Determine if the daemon should process on this iteration. * * @param \Illuminate\Queue\WorkerOptions $options * @param string $connectionName * @param string $queue * @return bool */ protected function daemonShouldRun(WorkerOptions $options, $connectionName, $queue) { return ! ((($this->isDownForMaintenance)() && ! $options->force) || $this->paused || $this->events->until(new Looping($connectionName, $queue)) === false); } /** * Pause the worker for the current loop. * * @param \Illuminate\Queue\WorkerOptions $options * @param int $lastRestart * @return void */ protected function pauseWorker(WorkerOptions $options, $lastRestart) { $this->sleep($options->sleep > 0 ? $options->sleep : 1); $this->stopIfNecessary($options, $lastRestart); } /** * Stop the process if necessary. * * @param \Illuminate\Queue\WorkerOptions $options * @param int $lastRestart * @param mixed $job * @return void */ protected function stopIfNecessary(WorkerOptions $options, $lastRestart, $job = null) { if ($this->shouldQuit) { $this->stop(); } elseif ($this->memoryExceeded($options->memory)) { $this->stop(12); } elseif ($this->queueShouldRestart($lastRestart)) { $this->stop(); } elseif ($options->stopWhenEmpty && is_null($job)) { $this->stop(); } } /** * Process the next job on the queue. * * @param string $connectionName * @param string $queue * @param \Illuminate\Queue\WorkerOptions $options * @return void */ public function runNextJob($connectionName, $queue, WorkerOptions $options) { $job = $this->getNextJob( $this->manager->connection($connectionName), $queue ); // If we're able to pull a job off of the stack, we will process it and then return // from this method. If there is no job on the queue, we will "sleep" the worker // for the specified number of seconds, then keep processing jobs after sleep. if ($job) { return $this->runJob($job, $connectionName, $options); } $this->sleep($options->sleep); } /** * Get the next job from the queue connection. * * @param \Illuminate\Contracts\Queue\Queue $connection * @param string $queue * @return \Illuminate\Contracts\Queue\Job|null */ protected function getNextJob($connection, $queue) { try { foreach (explode(',', $queue) as $queue) { if (! is_null($job = $connection->pop($queue))) { return $job; } } } catch (Throwable $e) { $this->exceptions->report($e); $this->stopWorkerIfLostConnection($e); $this->sleep(1); } } /** * Process the given job. * * @param \Illuminate\Contracts\Queue\Job $job * @param string $connectionName * @param \Illuminate\Queue\WorkerOptions $options * @return void */ protected function runJob($job, $connectionName, WorkerOptions $options) { try { return $this->process($connectionName, $job, $options); } catch (Throwable $e) { $this->exceptions->report($e); $this->stopWorkerIfLostConnection($e); } } /** * Stop the worker if we have lost connection to a database. * * @param \Throwable $e * @return void */ protected function stopWorkerIfLostConnection($e) { if ($this->causedByLostConnection($e)) { $this->shouldQuit = true; } } /** * Process the given job from the queue. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @param \Illuminate\Queue\WorkerOptions $options * @return void * * @throws \Throwable */ public function process($connectionName, $job, WorkerOptions $options) { try { // First we will raise the before job event and determine if the job has already ran // over its maximum attempt limits, which could primarily happen when this job is // continually timing out and not actually throwing any exceptions from itself. $this->raiseBeforeJobEvent($connectionName, $job); $this->markJobAsFailedIfAlreadyExceedsMaxAttempts( $connectionName, $job, (int) $options->maxTries ); if ($job->isDeleted()) { return $this->raiseAfterJobEvent($connectionName, $job); } // Here we will fire off the job and let it process. We will catch any exceptions so // they can be reported to the developers logs, etc. Once the job is finished the // proper events will be fired to let any listeners know this job has finished. $job->fire(); $this->raiseAfterJobEvent($connectionName, $job); } catch (Throwable $e) { $this->handleJobException($connectionName, $job, $options, $e); } } /** * Handle an exception that occurred while the job was running. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @param \Illuminate\Queue\WorkerOptions $options * @param \Throwable $e * @return void * * @throws \Throwable */ protected function handleJobException($connectionName, $job, WorkerOptions $options, Throwable $e) { try { // First, we will go ahead and mark the job as failed if it will exceed the maximum // attempts it is allowed to run the next time we process it. If so we will just // go ahead and mark it as failed now so we do not have to release this again. if (! $job->hasFailed()) { $this->markJobAsFailedIfWillExceedMaxAttempts( $connectionName, $job, (int) $options->maxTries, $e ); $this->markJobAsFailedIfWillExceedMaxExceptions( $connectionName, $job, $e ); } $this->raiseExceptionOccurredJobEvent( $connectionName, $job, $e ); } finally { // If we catch an exception, we will attempt to release the job back onto the queue // so it is not lost entirely. This'll let the job be retried at a later time by // another listener (or this same one). We will re-throw this exception after. if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) { $job->release( method_exists($job, 'delaySeconds') && ! is_null($job->delaySeconds()) ? $job->delaySeconds() : $options->delay ); } } throw $e; } /** * Mark the given job as failed if it has exceeded the maximum allowed attempts. * * This will likely be because the job previously exceeded a timeout. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @param int $maxTries * @return void * * @throws \Throwable */ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries) { $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; $timeoutAt = $job->timeoutAt(); if ($timeoutAt && Carbon::now()->getTimestamp() <= $timeoutAt) { return; } if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) { return; } $this->failJob($job, $e = $this->maxAttemptsExceededException($job)); throw $e; } /** * Mark the given job as failed if it has exceeded the maximum allowed attempts. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @param int $maxTries * @param \Throwable $e * @return void */ protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, Throwable $e) { $maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries; if ($job->timeoutAt() && $job->timeoutAt() <= Carbon::now()->getTimestamp()) { $this->failJob($job, $e); } if ($maxTries > 0 && $job->attempts() >= $maxTries) { $this->failJob($job, $e); } } /** * Mark the given job as failed if it has exceeded the maximum allowed attempts. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @param \Throwable $e * @return void */ protected function markJobAsFailedIfWillExceedMaxExceptions($connectionName, $job, Throwable $e) { if (! $this->cache || is_null($uuid = $job->uuid()) || is_null($maxExceptions = $job->maxExceptions())) { return; } if (! $this->cache->get('job-exceptions:'.$uuid)) { $this->cache->put('job-exceptions:'.$uuid, 0, Carbon::now()->addDay()); } if ($maxExceptions <= $this->cache->increment('job-exceptions:'.$uuid)) { $this->cache->forget('job-exceptions:'.$uuid); $this->failJob($job, $e); } } /** * Mark the given job as failed and raise the relevant event. * * @param \Illuminate\Contracts\Queue\Job $job * @param \Throwable $e * @return void */ protected function failJob($job, Throwable $e) { return $job->fail($e); } /** * Raise the before queue job event. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @return void */ protected function raiseBeforeJobEvent($connectionName, $job) { $this->events->dispatch(new JobProcessing( $connectionName, $job )); } /** * Raise the after queue job event. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @return void */ protected function raiseAfterJobEvent($connectionName, $job) { $this->events->dispatch(new JobProcessed( $connectionName, $job )); } /** * Raise the exception occurred queue job event. * * @param string $connectionName * @param \Illuminate\Contracts\Queue\Job $job * @param \Throwable $e * @return void */ protected function raiseExceptionOccurredJobEvent($connectionName, $job, Throwable $e) { $this->events->dispatch(new JobExceptionOccurred( $connectionName, $job, $e )); } /** * Determine if the queue worker should restart. * * @param int|null $lastRestart * @return bool */ protected function queueShouldRestart($lastRestart) { return $this->getTimestampOfLastQueueRestart() != $lastRestart; } /** * Get the last queue restart timestamp, or null. * * @return int|null */ protected function getTimestampOfLastQueueRestart() { if ($this->cache) { return $this->cache->get('illuminate:queue:restart'); } } /** * Enable async signals for the process. * * @return void */ protected function listenForSignals() { pcntl_async_signals(true); pcntl_signal(SIGTERM, function () { $this->shouldQuit = true; }); pcntl_signal(SIGUSR2, function () { $this->paused = true; }); pcntl_signal(SIGCONT, function () { $this->paused = false; }); } /** * Determine if "async" signals are supported. * * @return bool */ protected function supportsAsyncSignals() { return extension_loaded('pcntl'); } /** * Determine if the memory limit has been exceeded. * * @param int $memoryLimit * @return bool */ public function memoryExceeded($memoryLimit) { return (memory_get_usage(true) / 1024 / 1024) >= $memoryLimit; } /** * Stop listening and bail out of the script. * * @param int $status * @return void */ public function stop($status = 0) { $this->events->dispatch(new WorkerStopping($status)); exit($status); } /** * Kill the process. * * @param int $status * @return void */ public function kill($status = 0) { $this->events->dispatch(new WorkerStopping($status)); if (extension_loaded('posix')) { posix_kill(getmypid(), SIGKILL); } exit($status); } /** * Create an instance of MaxAttemptsExceededException. * * @param \Illuminate\Contracts\Queue\Job $job * @return \Illuminate\Queue\MaxAttemptsExceededException */ protected function maxAttemptsExceededException($job) { return new MaxAttemptsExceededException( $job->resolveName().' has been attempted too many times or run too long. The job may have previously timed out.' ); } /** * Sleep the script for a given number of seconds. * * @param int|float $seconds * @return void */ public function sleep($seconds) { if ($seconds < 1) { usleep($seconds * 1000000); } else { sleep($seconds); } } /** * Set the cache repository implementation. * * @param \Illuminate\Contracts\Cache\Repository $cache * @return void */ public function setCache(CacheContract $cache) { $this->cache = $cache; } /** * Get the queue manager instance. * * @return \Illuminate\Queue\QueueManager */ public function getManager() { return $this->manager; } /** * Set the queue manager instance. * * @param \Illuminate\Contracts\Queue\Factory $manager * @return void */ public function setManager(QueueManager $manager) { $this->manager = $manager; } }