PHP Multithreading - Thread Pool Example
When you start with multi-threading in PHP, you probably start by creating a Thread class and instantiating one for each of the tasks that you need doing. This is okay when you have just a few tasks, but you can quickly overload your server if you have a lot. Also, even if you scheduled the threads so that they didn't all run at once, it is highly inefficient to spawn a thread per task as there is overhead in creating and destroying the threads. It is much more efficient to re-use threads over and over again to complete these jobs. This is where a "pool" comes in and where one needs to separate "work" (or job/task) from "worker" (the thread).
Below is a script that defines a task class (MyTask
) which gets instantiated a number of times with. Each time the task is given a list of numbers to perform calculations on ($inputs
).
We then instantiate a thread pool and hand the list of tasks to it to work on. The pool of threads will work until all the jobs have been completed, with each thread in the pool working on one job at a time.
When all the tasks have been completed we get all the results out by asking the task objects for the results. We don't try to get the data back by going to the pool or the threads.
<?php
class MyTask extends Threaded
{
private $m_inputs;
private $m_outputs;
public function __construct(array $inputs)
{
$this->m_inputs = $inputs;
$this->m_outputs = new Threaded(); // we will store the results in here.
}
public function run()
{
foreach ($this->m_inputs as $input)
{
// casting the array to an array is not a mistake
// but actually super important for this to work
// https://github.com/krakjoe/pthreads/issues/813#issuecomment-361955116
$this->m_outputs[] = (array)array(
'property1' => $input * 2,
'property2' => ($input + 2),
);
}
}
# Accessors
public function getResults() { return $this->m_outputs; }
}
function main()
{
$inputs = range(0,10000);
$numInputsPerTask = 20;
$inputGroups = array_chunk($inputs, $numInputsPerTask);
$numCpus = 4; // I would nomrally dynamically fetch this and sometimes large (e.g. aws C5 instances)
$numTasks = count($inputGroups);
$numThreads = min($numTasks, $numCpus); // don't need to spawn more threads than tasks.
$pool = new Pool($numThreads);
$tasks = array(); // collection to hold all the tasks to get the results from afterwards.
foreach ($inputGroups as $inputsForTask)
{
$task = new MyTask($inputsForTask);
$tasks[] = $task;
$pool->submit($task);
}
while ($pool->collect());
# We could submit more stuff here, the Pool is still waiting for work to be submitted.
$pool->shutdown();
# All tasks should have been completed at this point. Get the results!
$results = array();
foreach ($tasks as $task)
{
$results[] = $task->getResults();
}
print "results: " . print_r($results, true);
}
main();
Getting the Answers Out
When I first started working with threads, I found it was easy enough to create them and get them to output messages, but I was confused as to how to retrieve the results by the main thread when the work had been done. This was because it hadn't clicked in my mind that the "job" stores the information about the parameters of the work and the result, not the thread. The thread just runs the job, with no clue about how it runs. You can think of the thread as a trained monkey that has been taught to turn a crank, and the jobs are containers with cranks on the outside that the monkey can access. The monkey has no clue whats going on inside the container when he turns the crank, but all the containers get their work done and have their result inside. One of the containers may have peeled potatoes when the crank was turned, whereas another might have been a manual generator that charged a battery.
References
First published: 16th August 2018