Recommend this page to a friend! |
Download |
Info | Example | Files | Install with Composer | Download | Reputation | Support forum | Blog | Links |
Ratings | Unique User Downloads | Download Rankings | ||||
69% | Total: 311 | All time: 7,313 This week: 660 |
Version | License | PHP version | Categories | |||
threadable 1.0.2 | MIT/X Consortium ... | 5 | Algorithms, PHP 5, Unix, Language, Tr... |
Description | Author | |
This package can start and manage parallel threads running classes. Innovation Award
|
#!/usr/bin/env php |
Easy-to-use threading library providing all basic features to perform work in background mode.
All you need to have installed: - _pcntl_ - _posix_ - _sockets_
Worker - is a basic class for any worker. It is composed of two substances (physically, stored in one class, but providing different functionalities):
The all you need it to extend wapmorgan\Threadable\Worker
class and reimplement onPayload($data)
public method.
For example:
use wapmorgan\Threadable\Worker;
class SleepingWorker extends Worker
{
public function onPayload($data)
{
echo 'I have started at '.date('r').PHP_EOL;
sleep(3);
echo 'I have ended at '.date('r').PHP_EOL;
return true;
}
}
WorkersPool (_wapmorgan\Threadable\WorkersPool_) - is a container for Worker
's, intended for handling similar tasks.
It takes care of all maintenance, payload dispatching and life-cycle of workers. Allows you change the size of the pool dynamically and other useful stuff.
For example, you want to just background downloading work. Let's use wapmorgan\Threadable\BackgroundWork
class to background it and show progress for user (or store in DB/...).
Everything you need to do:
1. Prepare payloads for DownloadWorker
2. Launch BackgroundWork::doInBackground()
or BackgroundWork::doInBackgroundParallel()
for one thread or few threads respectively.
DownloadWorker
needs an array with source
and target
elements. Prepare it:
use wapmorgan\Threadable\BackgroundWork;
use wapmorgan\Threadable\DownloadWorker;
use wapmorgan\Threadable\Worker;
$file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip'];
$files = [];
foreach ($file_sources as $file_to_download) {
$files[] = [
'source' => $file_to_download,
'size' => DownloadWorker::getRemoteFileSize($file_to_download),
'target' => tempnam(sys_get_temp_dir(), 'thrd_test'),
];
}
Run it in one thread with doInBackground
function. Signature is following:
`doInBackground(Worker $worker,
array $payloads,
callable $payloadHandlingCallback = null,
callable $onPayloadFinishCallback = null,
$sleepMicroTime = 1000)`
So, collect all information to run it:
$result = BackgroundWork::doInBackground(new DownloadWorker(), $files,
function (Worker $worker, $payloadI, $payloadData) {
clearstatcache(true, $payloadData['target']);
echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' downloading ' . round(filesize($payloadData['target']) * 100 / $payloadData['size'], 2) . '%';
},
function (Worker $worker, $payloadI, $payloadData, $payloadResult) {
echo "\r" . '#' . ($payloadI + 1) . '. ' . basename($payloadData['source']) . ' successfully downloaded' . PHP_EOL;
return true;
}
);
if ($result)
echo 'All files downloaded successfully'.PHP_EOL;
Example is in bin/example_file_downloading_easy
file.
To run it in few threads use doInBackgroundParallel
. It has almost the same signature as one-thread function:
`doInBackgroundParallel(Worker $worker,
array $payloads,
callable $payloadHandlingCallback = null,
callable $onPayloadFinishCallback = null,
$sleepMicroTime = 1000,
$poolSize = self::BY_CPU_NUMBER)`
By adjusting $poolSize
you can select number of workers that should be used.
Example is in bin/example_file_downloading_pool_easy
file.
If you just need to parallel some work and do it in another thread, you can utilize just Worker
class without any other dependencies.
To use it correctly you need to understand the life-cycle of worker:
Background work happens in 2 steps, where worker thread runs onPayload($data)
method of class with actual payload.
To summarize, this is an example of downloading file in another thread with real-time displaying of progress:
Settings and structures
// Implement class-downloader
class DownloadWorker extends Worker
{
public function onPayload($data)
{
echo 'Started '.$data[0].' into '.$data[2].PHP_EOL;
copy($data[0], $data[2]);
}
}
// supplementary function, just to avoid hand-writing of file sizes
function remote_filesize($path)
{
$fp = fopen($path, 'r');
$inf = stream_get_meta_data($fp);
fclose($fp);
foreach($inf["wrapper_data"] as $v) {
if (stristr($v,"content-length")) {
$v = explode(":",$v);
return (int)trim($v[1]);
}
}
}
// our function to print actual status of downloads
function show_status(&$files)
{
foreach ($files as $i => $file) {
if (file_exists($file[2])) {
clearstatcache(true, $file[2]);
$downloaded_size = filesize($file[2]);
if ($downloaded_size == $file[1]) {
echo $file[0].' downloaded'.PHP_EOL;
unset($files[$i]);
unlink($file[2]);
} else if ($downloaded_size === 0) {
// echo $file[0].' in queue'.PHP_EOL;
} else {
echo $file[0].' downloading '.round($downloaded_size * 100 / $file[1], 2).'%'.PHP_EOL;
}
}
}
}
// list of files to be downloaded
$file_sources = ['https://yandex.ru/images/today?size=1920x1080', 'http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip'];
// process of remote file size detection and creation temp local file for this downloading
$files = [];
foreach ($file_sources as $file_to_download) {
$file_size = remote_filesize($file_to_download);
$output = tempnam(sys_get_temp_dir(), 'thrd_test');
$files[] = [$file_to_download, $file_size, $output];
}
Real work
// construct and start new worker
$worker = new DownloadWorker();
$worker->start();
// add files to work queue
foreach ($files as $file) {
echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL;
$worker->sendPayload([$file]);
}
// main worker thread loop
while ($worker->state !== Worker::TERMINATED) {
// Worker::RUNNING state indicates that worker thread is still working over some payload
if ($worker->state == Worker::RUNNING) {
// prints status of all files
show_status($files);
// call check for finishing all tasks
$worker->checkForFinish();
usleep(500000);
}
// Worker::IDLE state indicates that worker thread does not have any work right now
else if ($worker->state == Worker::IDLE) {
echo 'Ended. Stopping worker...'.PHP_EOL;
// we don't need worker anymore, just stop it
$worker->stop();
usleep(500000);
}
// Worker::TERMINATING state indicates that worker thread is going to be stopped and can't be used to process data
else if ($worker->state == Worker::TERMINATING) {
echo 'Wait for terminating ...'.PHP_EOL;
// just to set Worker::TERMINATED state
$worker->checkForTermination();
usleep(500000);
}
}
_Result:_
Enqueuing https://yandex.ru/images/today?size=1920x1080 with size 343103
Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477
Started https://yandex.ru/images/today?size=1920x1080 into /tmp/thrd_test0Y3i3k
Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testrwwYiE
https://yandex.ru/images/today?size=1920x1080 downloaded
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 28.89%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 66.06%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded
Ended. Stopping worker...
Wait for terminating ...
This code equipped with a lot of comments, but you can simplify this example if you don't need to re-use worker when all your work is done. You can replace this huge loop with a smaller one:
// loops works only when worker is running.
// just to show information about downloaded files
while ($worker->state == Worker::RUNNING) {
show_status($files);
$worker->checkForFinish();
usleep(500000);
}
// when thread is in idle state, just stop right now (`true` as 1st argument forces it to send stop command and wait it termination).
$worker->stop(true);
_Result:_
Enqueuing https://yandex.ru/images/today?size=1920x1080 with size 343103
Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477
Started https://yandex.ru/images/today?size=1920x1080 into /tmp/thrd_testbGsRBp
Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testv0E5Qy
https://yandex.ru/images/today?size=1920x1080 downloaded
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 17.4%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 36.82%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 55.95%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 76%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 95.05%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded
But what if you need do few jobs simultaneously? You can create few instances of your worker, but it will be pain in the a$$ to manipulate and synchronize them.
In this case you can use WorkersPool
, which takes care of following this:
Rich feature-set, right?! Let's rewrite our downloader with 2 threads to speed-up downloading.
The Settings and structures block of code remains the same, but for demonstating purposes let's use two big files:
// ...
$file_sources = ['http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip', 'http://soft.eurodir.ru/test-speed-100Mb.bin'];
// ...
We need to update only code working with threads.
// create pool with downloading-workers
$pool = new WorkersPool('DownloadWorker');
/
* Also, you can create pool out of object:
* $pool = new WorkersPool(new DownloadWorker());
* This is useful, when you open shared sources within worker constructor so all workers can use them.
*/
// use only 2 workers (this is enough for our work)
$pool->setPoolSize(2);
// dispatch payload to workers. Notice! WorkersPool uses sendData() method instead of sendPayload().
foreach ($files as $file) {
echo 'Enqueuing '.$file[0].' with size '.$file[1].PHP_EOL;
$pool->sendData($file);
}
// register tracker, which should be launched every 0.5 seconds.
// This method will hold the execution until all workers finish their work and go in Worker::IDLE state
$pool->waitToFinish([
'0.5' => function ($pool) use (&$files) {
show_status($files);
}]
);
_Result:_
Enqueuing http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip with size 52693477
Enqueuing http://soft.eurodir.ru/test-speed-100Mb.bin with size 102854656
Started http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip into /tmp/thrd_testchcHBK
Started http://soft.eurodir.ru/test-speed-100Mb.bin into /tmp/thrd_testt6dyJa
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 23.26%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 1.3%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 47.08%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 3.08%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 72.62%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 5.66%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloading 98.7%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 8.05%
http://hosting-obzo-ru.1gb.ru/hosting-obzor.ru.zip downloaded
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 19.15%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 31.31%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 43.69%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 56.87%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 71.95%
http://soft.eurodir.ru/test-speed-100Mb.bin downloading 87.56%
As you can see, we got few improvements:
Information:
- isActive(): boolean
- true if worker is in Worker::RUNNING
or Worker::IDLE
states.
- isRunning(): boolean
- true if worker is in Worker::RUNNING
state.
- isIdle(): boolean
- true if worker is in Worker::IDLE
state.
- getPid(): int
- returns process id of worker.
- getCurrentPayload(): int
- returns serial number of last done payload.
Warning about worker re-using! You can't restart a worker that has been terminated (with stop()
or kill()
), you need to create new worker and start it with start()
.
As you've seen in examples, we created a downloading worker. But there is no need for this, we could use predefined DownloadWorker
which does the same.
Examples of programs that can be built with Threadable
:
Files (24) | / | bin |
File | Role | Description |
---|---|---|
example_file_downloading | Class | Class source |
example_file_downloading_easy | Example | Example script |
example_file_downloading_pool | Class | Class source |
example_file_downloading_pool_easy | Example | Example script |
example_file_downloading_smaller | Class | Class source |
test | Example | Example script |
test_highload | Class | Class source |
test_pool | Example | Example script |
test_pool_highload | Class | Class source |
test_pool_object | Class | Class source |
test_pool_overhead | Class | Class source |
test_pool_reuse | Class | Class source |
test_pool_wait | Class | Class source |
test_reuse | Example | Example script |
Files (24) | / | src |
File | Role | Description |
---|---|---|
BackgroundWork.php | Class | Class source |
DownloadWorker.php | Class | Class source |
Threadable.php | Class | Class source |
Worker.php | Class | Class source |
WorkersPool.php | Class | Class source |
Files (24) | / | tests |
File | Role | Description |
---|---|---|
CpuEatingWorkerTest.php | Class | Class source |
WorkerTest.php | Class | Class source |
The PHP Classes site has supported package installation using the Composer tool since 2013, as you may verify by reading this instructions page. |
Install with Composer |
Version Control | Unique User Downloads | Download Rankings | |||||||||||||||
100% |
|
|
User Ratings | ||||||||||||||||||||||||||||||
|
Applications that use this package |
If you know an application of this package, send a message to the author to add a link here.