Have you ever wondered how to send job to all Gearman workers after all of the single tasks are done? You can find below an explanation how I’ve done it.
The simplest solution that I could think of was to send parallel tasks to each worker after all of the single tasks are done.
My first job is to set unique ID with the same prefix for each worker so I can get the total number of the running workers later using gearadmin and this unique identifier.
Here’s how you can set worker’s unique ID with PHP:
I’m getting the total number of the running workers using the prefix above (pusher_) and running the following command in PHP:
The total number of the workers is count($output). Now we want to create exactly count($output) tasks to be run in parallel – one for each worker:
{
GearmanClient::addTask('last_task', "$workers", null, $workers);
}
For the purpose of this article we will make a query processor that will group small queries into bigger one. The numbers that can be processed in one query can’t be more than 10.
e.g. Instead of making 15 single queries like this:
INSERT INTO page_ids (id) VALUE (2)
..
INSERT INTO page_ids (id) VALUE (15)
we will make the following grouped queries:
INSERT INTO page_ids (id) VALUE (11)...(15)
I separated the base worker logic in a single class so you can extend it later and make different types of workers. You can skip reviewing it for now, because it’s not directly related to this article. However, here’s the source:
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| class Worker | |
| { | |
| public function __construct($host, $port) | |
| { | |
| $this->worker = new GearmanWorker(); | |
| $this->worker->addServer($host, $port); | |
| echo 'Waiting for work…' . PHP_EOL; | |
| } | |
| public function set_id_prefix($worker_id_prefix) | |
| { | |
| $this->worker->setId($worker_id_prefix . uniqid(true)); | |
| } | |
| public function register_function($function) | |
| { | |
| $this->worker->addFunction($function, array($this, $function . '_action')); | |
| } | |
| public function run() | |
| { | |
| while($this->worker->work()) | |
| { | |
| if ($this->worker->returnCode() != GEARMAN_SUCCESS) | |
| { | |
| echo $this->worker->returnCode() . PHP_EOL; | |
| break; | |
| } | |
| } | |
| } | |
| } |
Here is the worker’s class that will process the numbers:
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| <?php | |
| class QueryWorker extends Worker | |
| { | |
| public $numbers; | |
| public $numbers_per_query = 10; | |
| public function write_action($job) | |
| { | |
| $this->numbers[] = $job->workload(); | |
| if (count($this->numbers) >= $this->numbers_per_query) | |
| { | |
| echo 'Writing big block of numbers with just one query' . PHP_EOL; | |
| print_r($this->numbers); | |
| $this->process_numbers(); | |
| $this->numbers = array(); | |
| } | |
| } | |
| public function write_last_action($job) | |
| { | |
| echo 'Numbers left to be processed ' | |
| . '(parallel job #' . $job->workload() . '):' . PHP_EOL; | |
| if ($this->numbers) | |
| { | |
| print_r($this->numbers); | |
| $this->process_numbers(); | |
| $this->numbers = array(); | |
| } | |
| echo '—————–' . PHP_EOL; | |
| } | |
| public function process_numbers() | |
| { | |
| $sql = 'INSERT INTO page_ids (id) VALUES' | |
| . ' (' . implode('),(', $this->numbers) . ')'; | |
| echo 'Processing query:' . PHP_EOL; | |
| echo $sql . PHP_EOL; | |
| return mysql_query($sql) or die(mysql_error()); | |
| echo 'Done' . PHP_EOL; | |
| } | |
| } |
You can run the worker using the following code:
$worker->set_id_prefix('pushers_');
$worker->register_function('write');
$worker->register_function('write_last');
$worker->run();
Please note that you need MySQL connection to make the worker running. You’ll find the schema of the table used in this example at the end of the article.
Now it’s time to run the client, assign single tasks to the workers and run the parallel task for each worker after all of the single tasks are done.
$port = 4730;
$client = new GearmanClient();
$client->addServer($host, $port);
for ($i = 1; $i <= 15; $i++)
{
$client->doNormal('write', $i);
}
$exec = exec('gearadmin --workers | grep pushers_', $output, $res);
for ($workers = 0; $workers < count($output); $workers++)
{
$client->addTask('write_last_lines', "$workers", null, $workers_count);
}
$client->runTasks();
I made a test with two workers. Here’s the output of the first one and the output of the second one.
The first worker executed the following queries:
INSERT INTO page_ids (id) VALUES (12),(13),(15)
The second worker executed the following query:
I found this approach very useful when posting huge amount of data to Apache Solr.
Download the source of the files used in this article.