Programster's Blog

Tutorials focusing on Linux, programming, and open-source

RabbitMQ Job Queue With PHP

This tutorial will show you how to deploy and utilize a job queue in PHP using the RabbitMQ server we deployed through Docker.

Why Use Job Queues?

Job queues allows your infrastructure to gracefully cope with massive fluctuations in demand without having to scale up/down rapidly. For example, if you were to create your own version of Youtube, you may wish to create a queue for converting videos that users upload. Without a queue, your video conversion servers would become overloaded during the peak hours (middle of the day), but with a queue, they can process one video at a time, and if there is a major backlog, the admins have time to deploy more servers, or just rely on the queue completing overnight when people aren't uploading videos.

If you want to skip the explanation and go straight to playing, you can just download the source code for this tutorial and go to the testing section.

Steps

First, we need to use composer to install the PHP package for interfacing with RabbitMQ. You will need to install composer if you haven't got it already.

composer require php-amqplib/php-amqplib

If you are using Ubuntu 16.04 with PHP 7, you will need to have installed the php7.0-mbstring and php7.0-bcmath packages.

The Job Creator

Now let's create our publisher. This will create jobs that the workers will execute. For this tutorial, I have called it publisher.php.

<?php

require_once(__DIR__ . '/vendor/autoload.php');

define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    RABBITMQ_HOST, 
    RABBITMQ_PORT, 
    RABBITMQ_USERNAME, 
    RABBITMQ_PASSWORD
);

$channel = $connection->channel();

# Create the queue if it does not already exist.
$channel->queue_declare(
    $queue = RABBITMQ_QUEUE_NAME,
    $passive = false,
    $durable = true,
    $exclusive = false,
    $auto_delete = false,
    $nowait = false,
    $arguments = null,
    $ticket = null
);

$job_id=0;
while (true)
{
    $jobArray = array(
        'id' => $job_id++,
        'task' => 'sleep',
        'sleep_period' => rand(0, 3)
    );

    $msg = new \PhpAmqpLib\Message\AMQPMessage(
        json_encode($jobArray, JSON_UNESCAPED_SLASHES),
        array('delivery_mode' => 2) # make message persistent
    );

    $channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME);
    print 'Job created' . PHP_EOL;
    sleep(1);
}

In this example, I have used JSON for the body of the message to convey a job for the workers. Messages just need to be a string, so you can use any format you like.

You may also notice that I specified an id in the job. This is so that later, we will see that no single task gets executed by more than one worker.

The Worker

<?php

require_once __DIR__ . '/vendor/autoload.php';

define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    RABBITMQ_HOST, 
    RABBITMQ_PORT, 
    RABBITMQ_USERNAME, 
    RABBITMQ_PASSWORD
);


$channel = $connection->channel();

# Create the queue if it doesnt already exist.
$channel->queue_declare(
    $queue = RABBITMQ_QUEUE_NAME,
    $passive = false,
    $durable = true,
    $exclusive = false,
    $auto_delete = false,
    $nowait = false,
    $arguments = null,
    $ticket = null
);


echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    $job = json_decode($msg->body, $assocForm=true);
    sleep($job['sleep_period']);
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);

$channel->basic_consume(
    $queue = RABBITMQ_QUEUE_NAME,
    $consumer_tag = '',
    $no_local = false,
    $no_ack = false,
    $exclusive = false,
    $nowait = false,
    $callback
);

while (count($channel->callbacks)) 
{
    $channel->wait();
}

$channel->close();
$connection->close();

This worker will run forever until you press ctrl-c to quit out. If you want a worker that will work whilst tasks are available and then gracefully quit, use the non-blocking-worker example in this tutorial's online source code on Github.

Testing

Now that we have both a task creator (publisher.php) and task consumer (worker.php), we can start see the queue in action. Simply start your publisher process to create jobs, and then start your worker threads to process them as demonstrated in the video below.

Connecting Using AMQPS (TLS Encrypted)

If you wish to use an encrypted connection to communicate with your RabbitMQ server, and you have configured it with TLS certificates then you can do so by simply changing the code that creates the connection to:

define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5671);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");

$sslOptions = array(
    'verify_peer' => true, # verify the serer has a valid certificate
);

$connection = new \PhpAmqpLib\Connection\AMQPSSLConnection(
    host: RABBITMQ_HOST,
    port: RABBITMQ_PORT,
    user: RABBITMQ_USERNAME,
    password: RABBITMQ_PASSWORD,
    ssl_options: $sslOptions,
);

Port Change

Please note that the port changed from 5672 to 5671 as I am using RabbitMQ's default ports, your server may use different ports though.

Peer Verification

In this scenario, my $sslOptions array is very simple because I am only verifying that the server is who they say they are (and my server is using a Let's Encrypt certificate so I don't need to specify a trusted certificate authority). If your server is configured to verify the peers that connect to it, then you will need to fill in the other SSL context options such as local_cert which is your certificate to provide the server to tell it that you are who you say you are.

References

Last updated: 9th July 2024
First published: 16th August 2018

This blog is created by Stuart Page

I'm a freelance web developer and technology consultant based in Surrey, UK, with over 10 years experience in web development, DevOps, Linux Administration, and IT solutions.

Need support with your infrastructure or web services?

Get in touch