RabbitMQ Exchanges With PHP
This tutorial will show you how to setup RabbitMQ in terms of an exchange, rather than for a queu as demonstrated below:
Related Posts
What is An Exchange?
An exchange is an endpoint for the Publish-subscribe design pattern. It is where the subscribers register to listen for messages, and where publishers push their messages for others to listen to.
Why Use An Exchange?
In a talk from Bitly on service-oriented architecture, there was a great point (at 18:20) about how it's much better to publish "events" rather than to push "commands" directly to other services. This allows greater decoupling of services whereby services do not have to know what other services are out there, or how they work. Also, new services can be written later that may utilize the information in some manner, with multiple services listening to the same event. When adding new services, the developer does not have to update the publishing services for the new services to receive events, instead the new service can be thrown into the system without touching anything else.
Steps
We will now create a publisher and subscriber that make use of RabbitMQ exchanges. The publishing service will send the message to the exchange which other services may be listening to. If there are other services listening to this exchange, then each of the services will receive a copy of the message. At this point (even if nothing was listening) the message is dropped from RabbitMQ. If we were to publish our message to a queue instead, then these messages be stored and would pile up until a service retrieved them, but anything taking away from the queue would be taking the message away from all the other services as well. Thus you may want to use a combination of an exchange and queues whereby messages published into an exchange are pushed into multiple queues.
First you will need to deploy a RabbitMQ server.
Then install the necessary packages through composer by executing:
composer require php-amqplib/php-amqplib
php7.0-mbstring
and php7.0-bcmath
packages.
Publisher
Now copy and paste the following code into a file callled Publisher.php
to create the publisher.
<?php
require_once __DIR__ . '/vendor/autoload.php';
define('RABBITMQ_HOST', 'rabbitmq.programster.org');
define('RABBITMQ_PORT', '5672');
define('RABBITMQ_USERNAME', 'guest');
define('RABBITMQ_PASSWORD', 'guest');
define('EXCHANGE_NAME', 'logs');
function logEvent()
{
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
RABBITMQ_HOST,
RABBITMQ_PORT,
RABBITMQ_USERNAME,
RABBITMQ_PASSWORD
);
$channel = $connection->channel();
# Create the exchange if it doesnt exist already.
$channel->exchange_declare(
EXCHANGE_NAME,
'fanout', # type
false, # passive
false, # durable
false # auto_delete
);
$data = "Event created!";
$msg = new \PhpAmqpLib\Message\AMQPMessage($data);
$channel->basic_publish($msg, EXCHANGE_NAME);
echo "Published: $data" . PHP_EOL;
$channel->close();
$connection->close();
}
logEvent();
Subscriber
Copy the code below into a file called Subscriber.php
to create the subscriber.
<?php
require_once __DIR__ . '/vendor/autoload.php';
define('RABBITMQ_HOST', 'rabbitmq.programster.org');
define('RABBITMQ_PORT', '5672');
define('RABBITMQ_USERNAME', 'guest');
define('RABBITMQ_PASSWORD', 'guest');
define('EXCHANGE_NAME', 'logs');
$connection = new \PhpAmqpLib\Connection\AMQPStreamConnection(
RABBITMQ_HOST,
RABBITMQ_PORT,
RABBITMQ_USERNAME,
RABBITMQ_PASSWORD
);
$channel = $connection->channel();
# Create the exchange if it doesnt exist already.
$channel->exchange_declare(
EXCHANGE_NAME,
'fanout', # type
false, # passive
false, # durable
false # auto_delete
);
list($queue_name, ,) = $channel->queue_declare(
"", # queue
false, # passive
false, # durable
true, # exclusive
false # auto delete
);
$channel->queue_bind($queue_name, 'logs');
print 'Waiting for logs. To exit press CTRL+C' . PHP_EOL;
$callback = function($msg){
print "Read: " . $msg->body . PHP_EOL;
};
$channel->basic_consume(
$queue_name,
'',
false,
true,
false,
false,
$callback
);
while (count($channel->callbacks))
{
$channel->wait();
}
$channel->close();
$connection->close();
Testing
Now that we have our publisher and subscriber we can test how it works. Run the publisher without any subscribers and then launch a subscriber and you will see that the subscriber does not receive previous messages, unlike a queue. Launch two subscribers before running the publisher once, and you will see that both subscribers receive the message, unlike the queue where the messages were evenly distributed.
First published: 16th August 2018