203 lines
6.0 KiB
PHP
203 lines
6.0 KiB
PHP
<?php
|
|
|
|
declare(strict_types=1);
|
|
|
|
/**
|
|
* @file classes/core/PKPQueueProvider.php
|
|
*
|
|
* Copyright (c) 2014-2023 Simon Fraser University
|
|
* Copyright (c) 2000-2023 John Willinsky
|
|
* Distributed under the GNU GPL v3. For full terms see the file docs/COPYING.
|
|
*
|
|
* @class PKPQueueProvider
|
|
*
|
|
* @ingroup core
|
|
*
|
|
* @brief Registers Events Service Provider and boots data on events and their listeners
|
|
*/
|
|
|
|
namespace PKP\core;
|
|
|
|
use APP\core\Application;
|
|
use Illuminate\Contracts\Debug\ExceptionHandler;
|
|
use Illuminate\Database\Eloquent\Builder;
|
|
use Illuminate\Queue\Events\JobFailed;
|
|
use Illuminate\Queue\QueueServiceProvider as IlluminateQueueServiceProvider;
|
|
use Illuminate\Queue\Worker;
|
|
use Illuminate\Queue\WorkerOptions;
|
|
use Illuminate\Support\Facades\Facade;
|
|
use Illuminate\Support\Facades\Queue;
|
|
use PKP\config\Config;
|
|
use PKP\job\models\Job as PKPJobModel;
|
|
use PKP\queue\JobRunner;
|
|
use PKP\queue\WorkerConfiguration;
|
|
|
|
class PKPQueueProvider extends IlluminateQueueServiceProvider
|
|
{
|
|
/**
|
|
* Specific queue to target to run the associated jobs
|
|
*/
|
|
protected ?string $queue = null;
|
|
|
|
/**
|
|
* Set a specific queue to target to run the associated jobs
|
|
*/
|
|
public function forQueue(string $queue): self
|
|
{
|
|
$this->queue = $queue;
|
|
|
|
return $this;
|
|
}
|
|
|
|
/**
|
|
* Get a job model builder instance to query the jobs table
|
|
*/
|
|
public function getJobModelBuilder(): Builder
|
|
{
|
|
return PKPJobModel::isAvailable()
|
|
->nonEmptyQueue()
|
|
->when($this->queue, fn ($query) => $query->onQueue($this->queue))
|
|
->when(is_null($this->queue), fn ($query) => $query->notQueue(PKPJobModel::TESTING_QUEUE))
|
|
->notExceededAttempts();
|
|
}
|
|
|
|
/**
|
|
* Get the worker options object
|
|
*/
|
|
public function getWorkerOptions(array $options = []): WorkerOptions
|
|
{
|
|
return new WorkerOptions(
|
|
...array_values(WorkerConfiguration::withOptions($options)->getWorkerOptions())
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Run the queue worker via an infinite loop daemon
|
|
*/
|
|
public function runJobsViaDaemon(string $connection, string $queue, array $workerOptions = []): void
|
|
{
|
|
$worker = PKPContainer::getInstance()['queue.worker']; /** @var \Illuminate\Queue\Worker $worker */
|
|
|
|
$worker
|
|
->setCache(app()->get('cache.store'))
|
|
->daemon(
|
|
$connection,
|
|
$queue,
|
|
$this->getWorkerOptions($workerOptions)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Run the queue worker to process queue the jobs
|
|
*/
|
|
public function runJobInQueue(): void
|
|
{
|
|
$job = $this->getJobModelBuilder()->limit(1)->first();
|
|
|
|
if ($job === null) {
|
|
return;
|
|
}
|
|
|
|
$laravelContainer = PKPContainer::getInstance();
|
|
|
|
$laravelContainer['queue.worker']->runNextJob(
|
|
'database',
|
|
$job->queue ?? Config::getVar('queues', 'default_queue', 'queue'),
|
|
$this->getWorkerOptions()
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Bootstrap any application services.
|
|
*
|
|
*/
|
|
public function boot()
|
|
{
|
|
if (Config::getVar('queues', 'job_runner', true)) {
|
|
register_shutdown_function(function () {
|
|
// As this runs at the current request's end but the 'register_shutdown_function' registered
|
|
// at the service provider's registration time at application initial bootstrapping,
|
|
// need to check the maintenance status within the 'register_shutdown_function'
|
|
if (Application::get()->isUnderMaintenance()) {
|
|
return;
|
|
}
|
|
|
|
if (Config::getVar('general', 'sandbox', false)) {
|
|
error_log(__('admin.cli.tool.jobs.sandbox.message'));
|
|
return;
|
|
}
|
|
|
|
(new JobRunner($this))
|
|
->withMaxExecutionTimeConstrain()
|
|
->withMaxJobsConstrain()
|
|
->withMaxMemoryConstrain()
|
|
->withEstimatedTimeToProcessNextJobConstrain()
|
|
->processJobs();
|
|
});
|
|
}
|
|
|
|
Queue::failing(function (JobFailed $event) {
|
|
error_log($event->exception->__toString());
|
|
|
|
app('queue.failer')->log(
|
|
$event->connectionName,
|
|
$event->job->getQueue(),
|
|
$event->job->getRawBody(),
|
|
json_encode([
|
|
'message' => $event->exception->getMessage(),
|
|
'code' => $event->exception->getCode(),
|
|
'file' => $event->exception->getFile(),
|
|
'line' => $event->exception->getLine(),
|
|
'trace' => $event->exception->getTrace(),
|
|
])
|
|
);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the database queue connector.
|
|
*
|
|
* @param \Illuminate\Queue\QueueManager $manager
|
|
*/
|
|
protected function registerDatabaseConnector($manager)
|
|
{
|
|
$manager->addConnector('database', function () {
|
|
return new PKPQueueDatabaseConnector($this->app['db']);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Register the queue worker.
|
|
*
|
|
*/
|
|
protected function registerWorker()
|
|
{
|
|
$this->app->singleton('queue.worker', function ($app) {
|
|
$isDownForMaintenance = function () {
|
|
return $this->app->isDownForMaintenance();
|
|
};
|
|
|
|
$resetScope = function () use ($app) {
|
|
if (method_exists($app['db'], 'getConnections')) {
|
|
foreach ($app['db']->getConnections() as $connection) {
|
|
$connection->resetTotalQueryDuration();
|
|
$connection->allowQueryDurationHandlersToRunAgain();
|
|
}
|
|
}
|
|
|
|
$app->forgetScopedInstances();
|
|
|
|
return Facade::clearResolvedInstances();
|
|
};
|
|
|
|
return new Worker(
|
|
$app['queue'],
|
|
$app['events'],
|
|
$app[ExceptionHandler::class],
|
|
$isDownForMaintenance,
|
|
$resetScope
|
|
);
|
|
});
|
|
}
|
|
}
|