mirror of
https://github.com/danog/parallel.git
synced 2025-01-22 14:01:14 +01:00
Remove $this magic from Thread callback
Channel is now passed as the first argument.
This commit is contained in:
parent
4d4841f449
commit
852f580915
@ -5,13 +5,14 @@ require dirname(__DIR__).'/vendor/autoload.php';
|
||||
use Amp\Delayed;
|
||||
use Amp\Loop;
|
||||
use Amp\Parallel\Context\Thread;
|
||||
use Amp\Parallel\Sync\Channel;
|
||||
use Amp\Parallel\Sync\Parcel;
|
||||
use Amp\Parallel\Sync\ThreadedParcel;
|
||||
|
||||
Loop::run(function () {
|
||||
$parcel = new ThreadedParcel(1);
|
||||
|
||||
$context = Thread::spawn(function (Parcel $parcel) {
|
||||
$context = Thread::spawn(function (Channel $channel, Parcel $parcel) {
|
||||
$value = yield $parcel->synchronized(function (int $value) {
|
||||
return $value + 1;
|
||||
});
|
||||
|
@ -5,6 +5,7 @@ require dirname(__DIR__).'/vendor/autoload.php';
|
||||
use Amp\Delayed;
|
||||
use Amp\Loop;
|
||||
use Amp\Parallel\Context\Thread;
|
||||
use Amp\Parallel\Sync\Channel;
|
||||
|
||||
Loop::run(function () {
|
||||
$timer = Loop::repeat(1000, function () {
|
||||
@ -15,15 +16,13 @@ Loop::run(function () {
|
||||
|
||||
try {
|
||||
// Create a new child thread that does some blocking stuff.
|
||||
$context = Thread::spawn(function () {
|
||||
printf("\$this: %s\n", get_class($this));
|
||||
|
||||
printf("Received the following from parent: %s\n", yield $this->receive());
|
||||
$context = Thread::spawn(function (Channel $channel): \Generator {
|
||||
printf("Received the following from parent: %s\n", yield $channel->receive());
|
||||
|
||||
print "Sleeping for 3 seconds...\n";
|
||||
sleep(3); // Blocking call in thread.
|
||||
|
||||
yield $this->send("Data sent from child.");
|
||||
yield $channel->send("Data sent from child.");
|
||||
|
||||
print "Sleeping for 2 seconds...\n";
|
||||
sleep(2); // Blocking call in thread.
|
||||
|
@ -118,13 +118,7 @@ class Thread extends \Thread {
|
||||
*/
|
||||
private function execute(Channel $channel): \Generator {
|
||||
try {
|
||||
if ($this->function instanceof \Closure) {
|
||||
$result = call($this->function->bindTo($channel, null), ...$this->args);
|
||||
} else {
|
||||
$result = call($this->function, ...$this->args);
|
||||
}
|
||||
|
||||
$result = new ExitSuccess(yield $result);
|
||||
$result = new ExitSuccess(yield call($this->function, $channel, ...$this->args));
|
||||
} catch (\Throwable $exception) {
|
||||
$result = new ExitFailure($exception);
|
||||
}
|
||||
|
@ -53,7 +53,9 @@ class Thread implements Context {
|
||||
/**
|
||||
* Spawns a new thread and runs it.
|
||||
*
|
||||
* @param callable $function The callable to invoke in the thread.
|
||||
* @param callable $function The callable to invoke in the thread. First argument is an instance of
|
||||
* \Amp\Parallel\Sync\Channel.
|
||||
* @param mixed ...$args Additional arguments to pass to the given callable.
|
||||
*
|
||||
* @return Thread The thread object that was spawned.
|
||||
*/
|
||||
@ -66,7 +68,9 @@ class Thread implements Context {
|
||||
/**
|
||||
* Creates a new thread.
|
||||
*
|
||||
* @param callable $function The callable to invoke in the thread when run.
|
||||
* @param callable $function The callable to invoke in the thread. First argument is an instance of
|
||||
* \Amp\Parallel\Sync\Channel.
|
||||
* @param mixed ...$args Additional arguments to pass to the given callable.
|
||||
*
|
||||
* @throws \Error Thrown if the pthreads extension is not available.
|
||||
*/
|
||||
|
@ -3,6 +3,7 @@
|
||||
namespace Amp\Parallel\Worker;
|
||||
|
||||
use Amp\Parallel\Context\Thread;
|
||||
use Amp\Parallel\Sync\Channel;
|
||||
use Amp\Promise;
|
||||
|
||||
/**
|
||||
@ -14,7 +15,7 @@ class WorkerThread extends AbstractWorker {
|
||||
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
|
||||
*/
|
||||
public function __construct(string $envClassName = BasicEnvironment::class) {
|
||||
parent::__construct(new Thread(function (string $className): Promise {
|
||||
parent::__construct(new Thread(function (Channel $channel, string $className): Promise {
|
||||
if (!\class_exists($className)) {
|
||||
throw new \Error(\sprintf("Invalid environment class name '%s'", $className));
|
||||
}
|
||||
@ -29,7 +30,7 @@ class WorkerThread extends AbstractWorker {
|
||||
\define("AMP_WORKER", "amp-worker");
|
||||
}
|
||||
|
||||
$runner = new TaskRunner($this, $environment);
|
||||
$runner = new TaskRunner($channel, $environment);
|
||||
return $runner->run();
|
||||
}, $envClassName));
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ namespace Amp\Parallel\Test\Sync;
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Parallel\Context\Thread;
|
||||
use Amp\Parallel\Sync\Channel;
|
||||
use Amp\Parallel\Sync\ThreadedParcel;
|
||||
|
||||
/**
|
||||
@ -19,7 +20,7 @@ class ThreadedParcelTest extends AbstractParcelTest {
|
||||
$value = 1;
|
||||
$parcel = new ThreadedParcel($value);
|
||||
|
||||
$thread = Thread::spawn(function (ThreadedParcel $parcel) {
|
||||
$thread = Thread::spawn(function (Channel $channel, ThreadedParcel $parcel) {
|
||||
$parcel->synchronized(function (int $value) {
|
||||
return $value + 1;
|
||||
});
|
||||
|
Loading…
x
Reference in New Issue
Block a user