From b8c3321533e430216eac7e43965f3d62b7d4b278 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 12 May 2017 15:38:03 -0500 Subject: [PATCH 1/2] Fix mode to flags parsing --- lib/EioDriver.php | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/lib/EioDriver.php b/lib/EioDriver.php index 9458ae6..3b639ff 100644 --- a/lib/EioDriver.php +++ b/lib/EioDriver.php @@ -54,21 +54,12 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function open(string $path, string $mode): Promise { - switch ($mode) { - case "r": $flags = \EIO_O_RDONLY; break; - case "r+": $flags = \EIO_O_RDWR; break; - case "w": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break; - case "w+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break; - case "a": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_APPEND; break; - case "a+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_APPEND; break; - case "x": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_EXCL; break; - case "x+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_EXCL; break; - case "c": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break; - case "c+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break; - default: return new Failure(new FilesystemException( - "Invalid open mode" - )); + try { + $flags = $this->parseMode($mode); + } catch (\Throwable $exception) { + return new Failure($exception); } + $chmod = ($flags & \EIO_O_CREAT) ? 0644 : 0; ($this->incrementor)(1); $deferred = new Deferred; @@ -78,6 +69,26 @@ class EioDriver implements Driver { return $deferred->promise(); } + private function parseMode(string $mode): int { + $mode = \str_replace(['b', 't'], '', $mode); + + switch ($mode) { + case 'r': return \EIO_O_RDONLY; + case 'r+': return \EIO_O_RDWR; + case 'w': return \EIO_O_WRONLY | \EIO_O_TRUNC | \EIO_O_CREAT; + case 'w+': return \EIO_O_RDWR | \EIO_O_TRUNC | \EIO_O_CREAT; + case 'a': return \EIO_O_WRONLY | \EIO_O_APPEND | \EIO_O_CREAT; + case 'a+': return \EIO_O_RDWR | \EIO_O_APPEND | \EIO_O_CREAT; + case 'x': return \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_EXCL; + case 'x+': return \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_EXCL; + case 'c': return \EIO_O_WRONLY | \EIO_O_CREAT; + case 'c+': return \EIO_O_RDWR | \EIO_O_CREAT; + + default: + throw new FilesystemException('Invalid file mode'); + } + } + private function onOpenHandle($openArr, $result, $req) { list($mode, $path, $deferred) = $openArr; if ($result === -1) { From 9b8b1daeb0d83eb6c12d050a003393f9faa901e4 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 12 May 2017 15:43:23 -0500 Subject: [PATCH 2/2] Implement stream interfaces in Handle --- composer.json | 3 ++- lib/BlockingHandle.php | 13 +++++++++++-- lib/EioHandle.php | 16 +++++++++++++--- lib/Handle.php | 19 ++++++++++++++++--- lib/ParallelHandle.php | 11 ++++++++++- lib/UvHandle.php | 16 +++++++++++++--- 6 files changed, 65 insertions(+), 13 deletions(-) diff --git a/composer.json b/composer.json index 332aed6..4c95a2f 100644 --- a/composer.json +++ b/composer.json @@ -26,7 +26,8 @@ } ], "require": { - "amphp/amp": "dev-master as 2.0" + "amphp/amp": "^2.0", + "amphp/byte-stream": "dev-master as 0.1" }, "minimum-stability": "dev", "require-dev": { diff --git a/lib/BlockingHandle.php b/lib/BlockingHandle.php index f73bdc2..e585fd2 100644 --- a/lib/BlockingHandle.php +++ b/lib/BlockingHandle.php @@ -29,14 +29,14 @@ class BlockingHandle implements Handle { /** * {@inheritdoc} */ - public function read(int $length): Promise { + public function read(int $length = self::DEFAULT_READ_LENGTH): Promise { if ($this->fh === null) { throw new \Error("The file has been closed"); } $data = \fread($this->fh, $length); if ($data !== false) { - return new Success($data); + return new Success(\strlen($data) ? $data : null); } else { return new Failure(new FilesystemException( "Failed reading from file handle" @@ -62,6 +62,15 @@ class BlockingHandle implements Handle { } } + /** + * {@inheritdoc} + */ + public function end(string $data = ""): Promise { + $promise = $this->write($data); + $promise->onResolve([$this, "close"]); + return $promise; + } + /** * {@inheritdoc} */ diff --git a/lib/EioHandle.php b/lib/EioHandle.php index c7c201e..a92a925 100644 --- a/lib/EioHandle.php +++ b/lib/EioHandle.php @@ -30,7 +30,7 @@ class EioHandle implements Handle { /** * {@inheritdoc} */ - public function read(int $length): Promise { + public function read(int $length = self::DEFAULT_READ_LENGTH): Promise { $deferred = new Deferred; $op = new \StdClass; $op->type = self::OP_READ; @@ -74,8 +74,9 @@ class EioHandle implements Handle { \eio_get_last_error($req) )); } else { - $this->position = $op->position + \strlen($result); - $op->deferred->resolve($result); + $length = \strlen($result); + $this->position = $op->position + $length; + $op->deferred->resolve($length ? $result : null); } if ($this->queue) { $this->dequeue(); @@ -104,6 +105,15 @@ class EioHandle implements Handle { return $deferred->promise(); } + /** + * {@inheritdoc} + */ + public function end(string $data = ""): Promise { + $promise = $this->write($data); + $promise->onResolve([$this, "close"]); + return $promise; + } + private function onWrite($op, $result, $req) { $this->isActive = false; ($this->incrementor)(-1); diff --git a/lib/Handle.php b/lib/Handle.php index 4220efe..e005c30 100644 --- a/lib/Handle.php +++ b/lib/Handle.php @@ -2,16 +2,20 @@ namespace Amp\File; +use Amp\ByteStream\InputStream; +use Amp\ByteStream\OutputStream; use Amp\Promise; -interface Handle { +interface Handle extends InputStream, OutputStream { + const DEFAULT_READ_LENGTH = 8192; + /** * Read $len bytes from the open file handle starting at $offset * * @param int $length - * @return \Amp\Promise + * @return \Amp\Promise */ - public function read(int $length): Promise; + public function read(int $length = 8192): Promise; /** * Write $data to the open file handle starting at $offset @@ -21,6 +25,15 @@ interface Handle { */ public function write(string $data): Promise; + /** + * Write $data to the open file handle and close the handle once the write completes. + * + * @param string $data + * + * @return \Amp\Promise + */ + public function end(string $data = ""): Promise; + /** * Close the file handle * diff --git a/lib/ParallelHandle.php b/lib/ParallelHandle.php index c60182a..c9b7e36 100644 --- a/lib/ParallelHandle.php +++ b/lib/ParallelHandle.php @@ -80,7 +80,7 @@ class ParallelHandle implements Handle { return ($this->pendingWrites > 0) ? false : ($this->size <= $this->position); } - public function read(int $length): Promise { + public function read(int $length = self::DEFAULT_READ_LENGTH): Promise { if ($this->id === null) { throw new \Error("The file has been closed"); } @@ -113,6 +113,15 @@ class ParallelHandle implements Handle { return new Coroutine($this->doWrite($data)); } + /** + * {@inheritdoc} + */ + public function end(string $data = ""): Promise { + $promise = $this->write($data); + $promise->onResolve([$this, "close"]); + return $promise; + } + private function doWrite(string $data): \Generator { ++$this->pendingWrites; diff --git a/lib/UvHandle.php b/lib/UvHandle.php index 4baa0cd..9771380 100644 --- a/lib/UvHandle.php +++ b/lib/UvHandle.php @@ -40,7 +40,7 @@ class UvHandle implements Handle { /** * {@inheritdoc} */ - public function read(int $readLen): Promise { + public function read(int $readLen = self::DEFAULT_READ_LENGTH): Promise { $deferred = new Deferred; $op = new \StdClass; $op->type = self::OP_READ; @@ -78,6 +78,15 @@ class UvHandle implements Handle { return $deferred->promise(); } + /** + * {@inheritdoc} + */ + public function end(string $data = ""): Promise { + $promise = $this->write($data); + $promise->onResolve([$this, "close"]); + return $promise; + } + private function doRead($op) { $this->driver->reference($this->busy); $onRead = function ($fh, $result, $buffer) use ($op) { @@ -88,8 +97,9 @@ class UvHandle implements Handle { \uv_strerror($result) )); } else { - $this->position = $op->position + strlen($buffer); - $op->promisor->resolve($buffer); + $length = strlen($buffer); + $this->position = $op->position + $length; + $op->promisor->resolve($length ? $buffer : null); } if ($this->queue) { $this->dequeue();