RabbitMQ Job Queue With PHP
This tutorial will show you how to deploy and utilize a job queue in PHP using the RabbitMQ server we deployed through Docker.
Why Use Job Queues?
Job queues allows your infrastructure to gracefully cope with massive fluctuations in demand without having to scale up/down rapidly. For example, if you were to create your own version of Youtube, you may wish to create a queue for converting videos that users upload. Without a queue, your video conversion servers would become overloaded during the peak hours (middle of the day), but with a queue, they can process one video at a time, and if there is a major backlog, the admins have time to deploy more servers, or just rely on the queue completing overnight when people aren't uploading videos.
Steps
First, we need to use composer to install the PHP package for interfacing with RabbitMQ. You will need to install composer if you haven't got it already.
composer require php-amqplib/php-amqplib
php7.0-mbstring
and php7.0-bcmath
packages.
The Job Creator
Now let's create our publisher. This will create jobs that the workers will execute. For this tutorial, I have called it publisher.php
.
<?php
require_once(__DIR__ . '/vendor/autoload.php');
define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
RABBITMQ_HOST,
RABBITMQ_PORT,
RABBITMQ_USERNAME,
RABBITMQ_PASSWORD
);
$channel = $connection->channel();
# Create the queue if it does not already exist.
$channel->queue_declare(
$queue = RABBITMQ_QUEUE_NAME,
$passive = false,
$durable = true,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
);
$job_id=0;
while (true)
{
$jobArray = array(
'id' => $job_id++,
'task' => 'sleep',
'sleep_period' => rand(0, 3)
);
$msg = new \PhpAmqpLib\Message\AMQPMessage(
json_encode($jobArray, JSON_UNESCAPED_SLASHES),
array('delivery_mode' => 2) # make message persistent
);
$channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME);
print 'Job created' . PHP_EOL;
sleep(1);
}
In this example, I have used JSON for the body of the message to convey a job for the workers. Messages just need to be a string, so you can use any format you like.
You may also notice that I specified an id
in the job. This is so that later, we will see that no single task gets executed by more than one worker.
The Worker
<?php
require_once __DIR__ . '/vendor/autoload.php';
define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
RABBITMQ_HOST,
RABBITMQ_PORT,
RABBITMQ_USERNAME,
RABBITMQ_PASSWORD
);
$channel = $connection->channel();
# Create the queue if it doesnt already exist.
$channel->queue_declare(
$queue = RABBITMQ_QUEUE_NAME,
$passive = false,
$durable = true,
$exclusive = false,
$auto_delete = false,
$nowait = false,
$arguments = null,
$ticket = null
);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg){
echo " [x] Received ", $msg->body, "\n";
$job = json_decode($msg->body, $assocForm=true);
sleep($job['sleep_period']);
echo " [x] Done", "\n";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
$channel->basic_qos(null, 1, null);
$channel->basic_consume(
$queue = RABBITMQ_QUEUE_NAME,
$consumer_tag = '',
$no_local = false,
$no_ack = false,
$exclusive = false,
$nowait = false,
$callback
);
while (count($channel->callbacks))
{
$channel->wait();
}
$channel->close();
$connection->close();
ctrl
-c
to quit out. If you want a worker that will work whilst tasks are available and then gracefully quit, use the non-blocking-worker example in this tutorial's online source code on Github.
Testing
Now that we have both a task creator (publisher.php) and task consumer (worker.php), we can start see the queue in action. Simply start your publisher process to create jobs, and then start your worker threads to process them as demonstrated in the video below.
Connecting Using AMQPS (TLS Encrypted)
If you wish to use an encrypted connection to communicate with your RabbitMQ server, and you have configured it with TLS certificates then you can do so by simply changing the code that creates the connection to:
define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5671);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");
$sslOptions = array(
'verify_peer' => true, # verify the serer has a valid certificate
);
$connection = new \PhpAmqpLib\Connection\AMQPSSLConnection(
host: RABBITMQ_HOST,
port: RABBITMQ_PORT,
user: RABBITMQ_USERNAME,
password: RABBITMQ_PASSWORD,
ssl_options: $sslOptions,
);
Port Change
Please note that the port changed from 5672 to 5671 as I am using RabbitMQ's default ports, your server may use different ports though.
Peer Verification
In this scenario, my $sslOptions
array is very simple because I am only verifying that the server is who they say they are (and my server is using a Let's Encrypt certificate
so I don't need to specify a trusted certificate authority). If your server is
configured to verify the peers that connect to it,
then you will need to fill in the other SSL context options
such as local_cert
which is your certificate to provide the server to tell it that you are who you say you are.
References
- RabbitMQ Docs - Work Queues (using php-amqplib)
- LornaJane Blog - Connect to RabbitMQ from PHP over AMQPS
First published: 16th August 2018