Programster's Blog

Tutorials focusing on Linux, programming, and open source

RabbitMQ Exchanges With PHP

This tutorial will show you how to setup RabbitMQ in terms of an exchange, rather than for a queu as demonstrated below:

The source code for this tutorial can be downloaded from Github.

Related Posts

What is An Exchange?

An exchange is an endpoint for the Publish-subscribe design pattern. It is where the subscribers register to listen for messages, and where publishers push their messages for others to listen to.

Why Use An Exchange?

In a talk from Bitly on service-oriented architecture, there was a great point (at 18:20) about how it's much better to publish "events" rather than to push "commands" directly to other services. This allows greater decoupling of services whereby services do not have to know what other services are out there, or how they work. Also, new services can be written later that may utilize the information in some manner, with multiple services listening to the same event. When adding new services, the developer does not have to update the publishing services for the new services to receive events, instead the new service can be thrown into the system without touching anything else.

Steps

We will now create a publisher and subscriber that make use of RabbitMQ exchanges. The publishing service will send the message to the exchange which other services may be listening to. If there are other services listening to this exchange, then each of the services will receive a copy of the message. At this point (even if nothing was listening) the message is dropped from RabbitMQ. If we were to publish our message to a queue instead, then these messages be stored and would pile up until a service retrieved them, but anything taking away from the queue would be taking the message away from all the other services as well. Thus you may want to use a combination of an exchange and queues whereby messages published into an exchange are pushed into multiple queues.

The code can be downloaded from Github and is largely based on the RabbitMQ Publish/Subscribe page for PHP.

First you will need to deploy a RabbitMQ server.

Then install the necessary packages through composer by executing:

composer require php-amqplib/php-amqplib

For composer to pull down the package, if you are running PHP 7 on Ubuntu 16.04, you will need to have installed the php7.0-mbstring and php7.0-bcmath packages.

Publisher

Now copy and paste the following code into a file callled Publisher.php to create the publisher.

<?php
require_once __DIR__ . '/vendor/autoload.php';

define('RABBITMQ_HOST', 'rabbitmq.programster.org');
define('RABBITMQ_PORT', '5672');
define('RABBITMQ_USERNAME', 'guest');
define('RABBITMQ_PASSWORD', 'guest');
define('EXCHANGE_NAME', 'logs');


function logEvent()
{
    $connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
        RABBITMQ_HOST, 
        RABBITMQ_PORT, 
        RABBITMQ_USERNAME, 
        RABBITMQ_PASSWORD
    );
    
    $channel = $connection->channel();
    
    # Create the exchange if it doesnt exist already.
    $channel->exchange_declare(
        EXCHANGE_NAME, 
        'fanout', # type
        false,    # passive
        false,    # durable
        false     # auto_delete
    );
    
    $data = "Event created!";    
    $msg = new \PhpAmqpLib\Message\AMQPMessage($data);
    $channel->basic_publish($msg, EXCHANGE_NAME);
    echo "Published: $data" . PHP_EOL;
    
    $channel->close();
    $connection->close();
}

logEvent();

Subscriber

Copy the code below into a file called Subscriber.php to create the subscriber.

<?php

require_once __DIR__ . '/vendor/autoload.php';

define('RABBITMQ_HOST', 'rabbitmq.programster.org');
define('RABBITMQ_PORT', '5672');
define('RABBITMQ_USERNAME', 'guest');
define('RABBITMQ_PASSWORD', 'guest');
define('EXCHANGE_NAME', 'logs');

$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
    RABBITMQ_HOST, 
    RABBITMQ_PORT, 
    RABBITMQ_USERNAME, 
    RABBITMQ_PASSWORD
);

$channel = $connection->channel();

# Create the exchange if it doesnt exist already.
$channel->exchange_declare(
    EXCHANGE_NAME, 
    'fanout', # type
    false,    # passive
    false,    # durable
    false     # auto_delete
);

list($queue_name, ,) = $channel->queue_declare(
    "",    # queue
    false, # passive
    false, # durable
    true,  # exclusive
    false  # auto delete
);

$channel->queue_bind($queue_name, 'logs');
print 'Waiting for logs. To exit press CTRL+C' . PHP_EOL;

$callback = function($msg){
    print "Read: " . $msg->body . PHP_EOL;
};

$channel->basic_consume(
    $queue_name, 
    '', 
    false, 
    true, 
    false, 
    false, 
    $callback
);

while (count($channel->callbacks)) 
{
    $channel->wait();
}

$channel->close();
$connection->close();

Testing

Now that we have our publisher and subscriber we can test how it works. Run the publisher without any subscribers and then launch a subscriber and you will see that the subscriber does not receive previous messages, unlike a queue. Launch two subscribers before running the publisher once, and you will see that both subscribers receive the message, unlike the queue where the messages were evenly distributed.