Programster's Blog

Tutorials focusing on Linux, programming, and open-source

RabbitMQ Job Queue With PHP

This tutorial will show you how to deploy and utilize a job queue in PHP using the RabbitMQ server we deployed through Docker.

Why Use Job Queues?

Job queues allows your infrastructure to gracefully cope with massive fluctuations in demand without having to scale up/down rapidly. For example, if you were to create your own version of Youtube, you may wish to create a queue for converting videos that users upload. Without a queue, your video conversion servers would become overloaded during the peak hours (middle of the day), but with a queue, they can process one video at a time, and if there is a major backlog, the admins have time to deploy more servers, or just rely on the queue completing overnight when people aren't uploading videos.

If you want to skip the explanation and go straight to playing, you can just download the source code for this tutorial and go to the testing section.

Steps

First, we need to use composer to install the PHP package for interfacing with RabbitMQ. You will need to install composer if you haven't got it already.

composer require php-amqplib/php-amqplib

If you are using Ubuntu 16.04 with PHP 7, you will need to have installed the php7.0-mbstring and php7.0-bcmath packages.

The Job Creator

Now let's create our publisher. This will create jobs that the workers will execute. For this tutorial, I have called it publisher.php.

<?php

require_once(__DIR__ . '/vendor/autoload.php');

define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    RABBITMQ_HOST, 
    RABBITMQ_PORT, 
    RABBITMQ_USERNAME, 
    RABBITMQ_PASSWORD
);

$channel = $connection->channel();

# Create the queue if it does not already exist.
$channel->queue_declare(
    $queue = RABBITMQ_QUEUE_NAME,
    $passive = false,
    $durable = true,
    $exclusive = false,
    $auto_delete = false,
    $nowait = false,
    $arguments = null,
    $ticket = null
);

$job_id=0;
while (true)
{
    $jobArray = array(
        'id' => $job_id++,
        'task' => 'sleep',
        'sleep_period' => rand(0, 3)
    );

    $msg = new \PhpAmqpLib\Message\AMQPMessage(
        json_encode($jobArray, JSON_UNESCAPED_SLASHES),
        array('delivery_mode' => 2) # make message persistent
    );

    $channel->basic_publish($msg, '', RABBITMQ_QUEUE_NAME);
    print 'Job created' . PHP_EOL;
    sleep(1);
}

In this example, I have used JSON for the body of the message to convey a job for the workers. Messages just need to be a string, so you can use any format you like.

You may also notice that I specified an id in the job. This is so that later, we will see that no single task gets executed by more than one worker.

The Worker

<?php

require_once __DIR__ . '/vendor/autoload.php';

define("RABBITMQ_HOST", "rabbitmq.programster.org");
define("RABBITMQ_PORT", 5672);
define("RABBITMQ_USERNAME", "guest");
define("RABBITMQ_PASSWORD", "guest");
define("RABBITMQ_QUEUE_NAME", "task_queue");

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    RABBITMQ_HOST, 
    RABBITMQ_PORT, 
    RABBITMQ_USERNAME, 
    RABBITMQ_PASSWORD
);


$channel = $connection->channel();

# Create the queue if it doesnt already exist.
$channel->queue_declare(
    $queue = RABBITMQ_QUEUE_NAME,
    $passive = false,
    $durable = true,
    $exclusive = false,
    $auto_delete = false,
    $nowait = false,
    $arguments = null,
    $ticket = null
);


echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
    echo " [x] Received ", $msg->body, "\n";
    $job = json_decode($msg->body, $assocForm=true);
    sleep($job['sleep_period']);
    echo " [x] Done", "\n";
    $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);

$channel->basic_consume(
    $queue = RABBITMQ_QUEUE_NAME,
    $consumer_tag = '',
    $no_local = false,
    $no_ack = false,
    $exclusive = false,
    $nowait = false,
    $callback
);

while (count($channel->callbacks)) 
{
    $channel->wait();
}

$channel->close();
$connection->close();

This worker will run forever until you press ctrl-c to quit out. If you want a worker that will work whilst tasks are available and then gracefully quit, use the non-blocking-worker example in this tutorial's online source code on Github.

Testing

Now that we have both a task creator (publisher.php) and task consumer (worker.php), we can start see the queue in action. Simply start your publisher process to create jobs, and then start your worker threads to process them as demonstrated in the video below.

References

Last updated: 16th September 2021
First published: 16th August 2018