Send job to all Gearman workers with PHP

Have you ever wondered how to send job to all Gearman workers after all of the single tasks are done? You can find below an explanation how I’ve done it.

The simplest solution that I could think of was to send parallel tasks to each worker after all of the single tasks are done.

My first job is to set unique ID with the same prefix for each worker so I can get the total number of the running workers later using gearadmin and this unique identifier.

Here’s how you can set worker’s unique ID with PHP:

GearmanWorker::setId('pusher_' . uniqid(true));

I’m getting the total number of the running workers using the prefix above (pusher_) and running the following command in PHP:

$exec = exec('gearadmin --workers | grep pushers_', $output, $res);

The total number of the workers is count($output). Now we want to create exactly count($output) tasks to be run in parallel – one for each worker:

for ($workers = 0; $workers < count($output); $workers++)
{
  GearmanClient::addTask('last_task', "$workers", null, $workers);
}

For the purpose of this article we will make a query processor that will group small queries into bigger one. The numbers that can be processed in one query can’t be more than 10.

e.g. Instead of making 15 single queries like this:

INSERT INTO page_ids (id) VALUE (1)
INSERT INTO page_ids (id) VALUE (2)
..
INSERT INTO page_ids (id) VALUE (15)

we will make the following grouped queries:

INSERT INTO page_ids (id) VALUE (1),(2),(3)...(10)
INSERT INTO page_ids (id) VALUE (11)...(15)

I separated the base worker logic in a single class so you can extend it later and make different types of workers. You can skip reviewing it for now, because it’s not directly related to this article. However, here’s the source:


<?php
class Worker
{
public function __construct($host, $port)
{
$this->worker = new GearmanWorker();
$this->worker->addServer($host, $port);
echo 'Waiting for work…' . PHP_EOL;
}
public function set_id_prefix($worker_id_prefix)
{
$this->worker->setId($worker_id_prefix . uniqid(true));
}
public function register_function($function)
{
$this->worker->addFunction($function, array($this, $function . '_action'));
}
public function run()
{
while($this->worker->work())
{
if ($this->worker->returnCode() != GEARMAN_SUCCESS)
{
echo $this->worker->returnCode() . PHP_EOL;
break;
}
}
}
}

view raw

gistfile1.php

hosted with ❤ by GitHub

Here is the worker’s class that will process the numbers:


<?php
class QueryWorker extends Worker
{
public $numbers;
public $numbers_per_query = 10;
public function write_action($job)
{
$this->numbers[] = $job->workload();
if (count($this->numbers) >= $this->numbers_per_query)
{
echo 'Writing big block of numbers with just one query' . PHP_EOL;
print_r($this->numbers);
$this->process_numbers();
$this->numbers = array();
}
}
public function write_last_action($job)
{
echo 'Numbers left to be processed '
. '(parallel job #' . $job->workload() . '):' . PHP_EOL;
if ($this->numbers)
{
print_r($this->numbers);
$this->process_numbers();
$this->numbers = array();
}
echo '—————–' . PHP_EOL;
}
public function process_numbers()
{
$sql = 'INSERT INTO page_ids (id) VALUES'
. ' (' . implode('),(', $this->numbers) . ')';
echo 'Processing query:' . PHP_EOL;
echo $sql . PHP_EOL;
return mysql_query($sql) or die(mysql_error());
echo 'Done' . PHP_EOL;
}
}

view raw

gistfile1.php

hosted with ❤ by GitHub

You can run the worker using the following code:

$worker = new QueryWorker('127.0.0.1', 4730);
$worker->set_id_prefix('pushers_');
$worker->register_function('write');
$worker->register_function('write_last');
$worker->run();

Please note that you need MySQL connection to make the worker running. You’ll find the schema of the table used in this example at the end of the article.

Now it’s time to run the client, assign single tasks to the workers and run the parallel task for each worker after all of the single tasks are done.

$host = '127.0.0.1';
$port = 4730;

$client = new GearmanClient();
$client->addServer($host, $port);

for ($i = 1; $i <= 15; $i++)
{
  $client->doNormal('write', $i);
}

$exec = exec('gearadmin --workers | grep pushers_', $output, $res);

for ($workers = 0; $workers < count($output); $workers++)
{
  $client->addTask('write_last_lines', "$workers", null, $workers_count);
}

$client->runTasks();

I made a test with two workers. Here’s the output of the first one and the output of the second one.

The first worker executed the following queries:

INSERT INTO page_ids (id) VALUES (1),(2),(4),(5),(6),(7),(8),(9),(10),(11)
INSERT INTO page_ids (id) VALUES (12),(13),(15)

The second worker executed the following query:

INSERT INTO page_ids (id) VALUES (3),(14)

I found this approach very useful when posting huge amount of data to Apache Solr.

Download the source of the files used in this article.