1
0
mirror of https://github.com/danog/file.git synced 2024-11-29 20:09:10 +01:00

Update to amphp/parallel 0.2

This commit is contained in:
Aaron Piotrowski 2017-12-14 22:36:16 -06:00
parent f4c5a623ff
commit fb58fe8dd5
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
6 changed files with 12 additions and 36 deletions

View File

@ -34,7 +34,7 @@
"require": { "require": {
"amphp/amp": "^2", "amphp/amp": "^2",
"amphp/byte-stream": "^1", "amphp/byte-stream": "^1",
"amphp/parallel": "^0.1.8" "amphp/parallel": "^0.2"
}, },
"require-dev": { "require-dev": {
"amphp/phpunit-util": "^1", "amphp/phpunit-util": "^1",

View File

@ -15,7 +15,7 @@ use Amp\Parallel\Worker\Task;
* @internal * @internal
*/ */
class FileTask implements Task { class FileTask implements Task {
const ENV_PREFIX = self::class . '#'; const ENV_PREFIX = "amp/file#";
/** @var string */ /** @var string */
private $operation; private $operation;
@ -112,8 +112,8 @@ class FileTask implements Task {
return ([$file, \substr($this->operation, 1)])(...$this->args); return ([$file, \substr($this->operation, 1)])(...$this->args);
case "fclose": case "fclose":
$environment->delete($id);
$file->close(); $file->close();
$environment->delete($this->id);
return; return;
default: default:

View File

@ -3,7 +3,6 @@
namespace Amp\File; namespace Amp\File;
use Amp\Coroutine; use Amp\Coroutine;
use Amp\Deferred;
use Amp\Parallel\Worker; use Amp\Parallel\Worker;
use Amp\Parallel\Worker\Pool; use Amp\Parallel\Worker\Pool;
use Amp\Parallel\Worker\TaskException; use Amp\Parallel\Worker\TaskException;
@ -23,42 +22,29 @@ class ParallelDriver implements Driver {
*/ */
public function __construct(Pool $pool = null) { public function __construct(Pool $pool = null) {
$this->pool = $pool ?: Worker\pool(); $this->pool = $pool ?: Worker\pool();
if (!$this->pool->isRunning()) {
$this->pool->start();
}
} }
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function open(string $path, string $mode): Promise { public function open(string $path, string $mode): Promise {
$worker = $this->pool->get(); return call(function () use ($path, $mode) {
$worker = $this->pool->get();
$task = new Internal\FileTask("fopen", [$path, $mode]); try {
list($id, $size, $mode) = yield $worker->enqueue(new Internal\FileTask("fopen", [$path, $mode]));
$deferred = new Deferred; } catch (TaskException $exception) {
$promise = $worker->enqueue($task); throw new FilesystemException("Could not open file", $exception);
$promise->onResolve(static function ($exception, array $result = null) use ($worker, $deferred, $path) { } catch (WorkerException $exception) {
if ($exception) { throw new FilesystemException("Could not send open request to worker", $exception);
$deferred->fail($exception);
return;
} }
return new ParallelHandle($worker, $id, $path, $size, $mode);
list($id, $size, $mode) = $result;
$deferred->resolve(new ParallelHandle($worker, $id, $path, $size, $mode));
}); });
return $deferred->promise();
} }
private function runFileTask(Internal\FileTask $task): \Generator { private function runFileTask(Internal\FileTask $task): \Generator {
try { try {
return yield $this->pool->enqueue($task); return yield $this->pool->enqueue($task);
} catch (TaskException $exception) { } catch (TaskException $exception) {
if (\strcasecmp(\substr($exception->getName(), -5), "Error") === 0) {
throw new \Error($exception->getMessage());
}
throw new FilesystemException("The file operation failed", $exception); throw new FilesystemException("The file operation failed", $exception);
} catch (WorkerException $exception) { } catch (WorkerException $exception) {
throw new FilesystemException("Could not send the file task to worker", $exception); throw new FilesystemException("Could not send the file task to worker", $exception);

View File

@ -47,14 +47,6 @@ function driver(): Driver {
return new EioDriver; return new EioDriver;
} }
if (\strncasecmp(\PHP_OS, "WIN", 3) === 0) {
return new BlockingDriver;
}
if (\PHP_SAPI !== "cli" && \PHP_SAPI !== "phpdbg") { // We don't have a binary to launch sub-processes
return new BlockingDriver;
}
if (\defined("AMP_WORKER")) { // Prevent spawning infinite workers. if (\defined("AMP_WORKER")) { // Prevent spawning infinite workers.
return new BlockingDriver; return new BlockingDriver;
} }

View File

@ -11,7 +11,6 @@ class ParallelDriverTest extends DriverTest {
protected function execute(callable $cb) { protected function execute(callable $cb) {
Loop::run(function () use ($cb) { Loop::run(function () use ($cb) {
$pool = new DefaultPool; $pool = new DefaultPool;
$pool->start();
File\filesystem(new File\ParallelDriver($pool)); File\filesystem(new File\ParallelDriver($pool));
yield call($cb); yield call($cb);

View File

@ -11,7 +11,6 @@ class ParallelHandleTest extends AsyncHandleTest {
protected function execute(callable $cb) { protected function execute(callable $cb) {
Loop::run(function () use ($cb) { Loop::run(function () use ($cb) {
$pool = new DefaultPool; $pool = new DefaultPool;
$pool->start();
File\filesystem(new File\ParallelDriver($pool)); File\filesystem(new File\ParallelDriver($pool));
yield call($cb); yield call($cb);