From ab05daa5a5c7623716de2521a4128874593a7c9a Mon Sep 17 00:00:00 2001 From: Daniel Lowrey Date: Wed, 12 Aug 2015 19:02:41 -0400 Subject: [PATCH] Stream file handle support --- lib/BlockingDriver.php | 13 +++ lib/BlockingHandle.php | 108 +++++++++++++++++++ lib/Driver.php | 9 ++ lib/EioDriver.php | 135 ++++++++++++++++++------ lib/EioHandle.php | 201 ++++++++++++++++++++++++++++++++++++ lib/Handle.php | 76 ++++++++++++++ lib/UvDriver.php | 71 +++++++++++++ lib/UvHandle.php | 201 ++++++++++++++++++++++++++++++++++++ lib/functions.php | 11 ++ test/BlockingHandleTest.php | 16 +++ test/EioHandleTest.php | 42 ++++++++ test/Fixture.php | 47 +++++++++ test/HandleTest.php | 142 +++++++++++++++++++++++++ test/UvDriverTest.php | 1 + test/UvHandleTest.php | 44 ++++++++ 15 files changed, 1088 insertions(+), 29 deletions(-) create mode 100644 lib/BlockingHandle.php create mode 100644 lib/EioHandle.php create mode 100644 lib/Handle.php create mode 100644 lib/UvHandle.php create mode 100644 test/BlockingHandleTest.php create mode 100644 test/EioHandleTest.php create mode 100644 test/Fixture.php create mode 100644 test/HandleTest.php create mode 100644 test/UvHandleTest.php diff --git a/lib/BlockingDriver.php b/lib/BlockingDriver.php index ec75a70..2aa7fc4 100644 --- a/lib/BlockingDriver.php +++ b/lib/BlockingDriver.php @@ -7,6 +7,19 @@ use Amp\Success; use Amp\Failure; class BlockingDriver implements Driver { + /** + * {@inheritdoc} + */ + public function open($path, $mode) { + if (!$fh = \fopen($path, $mode)) { + return new Failure(new FilesystemException( + "Failed opening file handle" + )); + } + + return new Success(new BlockingHandle($fh, $path, $mode)); + } + /** * {@inheritdoc} */ diff --git a/lib/BlockingHandle.php b/lib/BlockingHandle.php new file mode 100644 index 0000000..d2abc16 --- /dev/null +++ b/lib/BlockingHandle.php @@ -0,0 +1,108 @@ +fh = $fh; + $this->path = $path; + $this->mode = $mode; + } + + /** + * {@inheritdoc} + */ + public function read($len) { + $data = \fread($this->fh, $len); + if ($data !== false) { + return new Success($data); + } else { + return new Failure(new \RuntimeException( + "Failed reading from file handle" + )); + } + } + + /** + * {@inheritdoc} + */ + public function write($data) { + $len = \fwrite($this->fh, $data); + if ($len !== false) { + return new Success($data); + } else { + return new Failure(new \RuntimeException( + "Failed writing to file handle" + )); + } + } + + /** + * {@inheritdoc} + */ + public function close() { + if (\fclose($this->fh)) { + return new Success; + } else { + return new Failure(new \RuntimeException( + "Failed closing file handle" + )); + } + } + + /** + * {@inheritdoc} + */ + public function seek($position, $whence = \SEEK_SET) { + switch ($whence) { + case \SEEK_SET: + case \SEEK_CUR: + case \SEEK_END: + \fseek($this->fh, $position, $whence); + return; + default: + throw new FilesystemException( + "Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected" + ); + } + } + + /** + * {@inheritdoc} + */ + public function tell() { + return \ftell($this->fh); + } + + /** + * {@inheritdoc} + */ + public function eof() { + return \feof($this->fh); + } + + /** + * {@inheritdoc} + */ + public function path() { + return $this->path; + } + + /** + * {@inheritdoc} + */ + public function mode() { + return $this->mode; + } +} \ No newline at end of file diff --git a/lib/Driver.php b/lib/Driver.php index 0265fca..d2c57c1 100644 --- a/lib/Driver.php +++ b/lib/Driver.php @@ -3,6 +3,15 @@ namespace Amp\File; interface Driver { + /** + * Open a handle for the specified path + * + * @param string $path + * @param string $mode + * @return \Amp\File\Handle + */ + public function open($path, $mode); + /** * Execute a file stat operation * diff --git a/lib/EioDriver.php b/lib/EioDriver.php index eff529a..4425c15 100644 --- a/lib/EioDriver.php +++ b/lib/EioDriver.php @@ -9,8 +9,9 @@ use Amp\Deferred; class EioDriver implements Driver { private $watcher; - private $callableDelReq; private $pending = 0; + private $incrementor; + private $callableDecrementor; private static $stream; /** @@ -24,8 +25,25 @@ class EioDriver implements Driver { \eio_init(); self::$stream = \eio_get_event_stream(); } - $this->callableDelReq = function() { - $this->decrementPending(); + $this->callableDecrementor = function() { + \call_user_func($this->incrementor, -1); + }; + $this->incrementor = function ($increment) { + switch ($increment) { + case 1: + case -1: + $this->pending += $increment; + break; + default: + throw new FilesystemException( + "Invalid pending event increment; 1 or -1 required" + ); + } + if ($this->pending === 0) { + \Amp\disable($this->watcher); + } elseif ($this->pending === 1) { + \Amp\enable($this->watcher); + } }; $this->watcher = \Amp\onReadable(self::$stream, function() { while (\eio_npending()) { @@ -34,15 +52,74 @@ class EioDriver implements Driver { }, $options = ["enable" => false]); } - private function incrementPending() { - if ($this->pending++ === 0) { - \Amp\enable($this->watcher); + /** + * {@inheritdoc} + */ + public function open($path, $mode) { + switch ($mode) { + case "r": $flags = \EIO_O_RDONLY; break; + case "r+": $flags = \EIO_O_RDWR; break; + case "w": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break; + case "w+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break; + case "a": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_APPEND; break; + case "a+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_APPEND; break; + case "x": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_EXCL; break; + case "x+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_EXCL; break; + case "c": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break; + case "c+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break; + default: return new Failure(new FilesystemException( + "Invalid open mode" + )); + } + $chmod = ($flags & \EIO_O_CREAT) ? 0644 : 0; + \call_user_func($this->incrementor, 1); + $promisor = new Deferred; + $openArr = [$mode, $path, $promisor]; + \eio_open($path, $flags, $chmod, $priority = null, [$this, "onOpenHandle"], $openArr); + + return $promisor->promise(); + } + + private function onOpenHandle($openArr, $result, $req) { + list($mode, $path, $promisor) = $openArr; + if ($result === -1) { + \call_user_func($this->incrementor, -1); + $promisor->fail(new FilesystemException( + \eio_get_last_error($req) + )); + } elseif ($mode[0] === "a") { + \array_unshift($openArr, $result); + \eio_ftruncate($result, $offset = 0, $priority = null, [$this, "onOpenFtruncate"], $openArr); + } else { + \array_unshift($openArr, $result); + \eio_fstat($result, $priority = null, [$this, "onOpenFstat"], $openArr); } } - private function decrementPending() { - if ($this->pending-- === 1) { - \Amp\disable($this->watcher); + private function onOpenFtruncate($openArr, $result, $req) { + \call_user_func($this->incrementor, -1); + list($fh, $mode, $path, $promisor) = $openArr; + if ($result === -1) { + $promisor->fail(new FilesystemException( + \eio_get_last_error($req) + )); + } else { + $handle = new EioHandle($this->incrementor, $fh, $path, $mode, $size = 0); + $promisor->succeed($handle); + } + } + + private function onOpenFstat($openArr, $result, $req) { + \call_user_func($this->incrementor, -1); + list($fh, $mode, $path, $promisor) = $openArr; + if ($result === -1) { + $promisor->fail(new FilesystemException( + \eio_get_last_error($req) + )); + } else { + StatCache::set($path, $result); + $handle = new EioHandle($this->incrementor, $fh, $path, $mode, $result["size"]); + $promisor->succeed($handle); } } @@ -54,7 +131,7 @@ class EioDriver implements Driver { return new Success($stat); } - $this->incrementPending(); + \call_user_func($this->incrementor, 1); $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; $data = [$promisor, $path]; @@ -65,7 +142,7 @@ class EioDriver implements Driver { private function onStat($data, $result, $req) { list($promisor, $path) = $data; - $this->decrementPending(); + \call_user_func($this->incrementor, -1); if ($result === -1) { $promisor->succeed(null); } else { @@ -198,7 +275,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function lstat($path) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; \eio_lstat($path, $priority, [$this, "onLstat"], $promisor); @@ -207,7 +284,7 @@ class EioDriver implements Driver { } private function onLstat($promisor, $result, $req) { - $this->decrementPending(); + \call_user_func($this->incrementor, -1); if ($result === -1) { $promisor->succeed(null); } else { @@ -219,7 +296,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function symlink($target, $link) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; \eio_symlink($target, $link, $priority, [$this, "onGenericResult"], $promisor); @@ -228,7 +305,7 @@ class EioDriver implements Driver { } private function onGenericResult($promisor, $result, $req) { - $this->decrementPending(); + \call_user_func($this->incrementor, -1); if ($result === -1) { $promisor->fail(new FilesystemException( \eio_get_last_error($req) @@ -242,7 +319,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function rename($from, $to) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; \eio_rename($from, $to, $priority, [$this, "onGenericResult"], $promisor); @@ -254,7 +331,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function unlink($path) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; $data = [$promisor, $path]; @@ -265,7 +342,7 @@ class EioDriver implements Driver { private function onUnlink($data, $result, $req) { list($promisor, $path) = $data; - $this->decrementPending(); + \call_user_func($this->incrementor, -1); if ($result === -1) { $promisor->fail(new FilesystemException( \eio_get_last_error($req) @@ -280,7 +357,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function mkdir($path, $mode = 0644) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; \eio_mkdir($path, $mode, $priority, [$this, "onGenericResult"], $promisor); @@ -292,7 +369,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function rmdir($path) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; $data = [$promisor, $path]; @@ -303,7 +380,7 @@ class EioDriver implements Driver { private function onRmdir($data, $result, $req) { list($promisor, $path) = $data; - $this->decrementPending(); + \call_user_func($this->incrementor, -1); if ($result === -1) { $promisor->fail(new FilesystemException( \eio_get_last_error($req) @@ -318,7 +395,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function scandir($path) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $flags = \EIO_READDIR_STAT_ORDER | \EIO_READDIR_DIRS_FIRST; $priority = \EIO_PRI_DEFAULT; @@ -328,7 +405,7 @@ class EioDriver implements Driver { } private function onScandir($promisor, $result, $req) { - $this->decrementPending(); + \call_user_func($this->incrementor, -1); if ($result === -1) { $promisor->fail(new FilesystemException( \eio_get_last_error($req) @@ -342,7 +419,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function chmod($path, $mode) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; \eio_chmod($path, $mode, $priority, [$this, "onGenericResult"], $promisor); @@ -354,7 +431,7 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function chown($path, $uid, $gid) { - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $priority = \EIO_PRI_DEFAULT; \eio_chown($path, $uid, $gid, $priority, [$this, "onGenericResult"], $promisor); @@ -382,7 +459,7 @@ class EioDriver implements Driver { $mode = 0; $priority = \EIO_PRI_DEFAULT; - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; \eio_open($path, $flags, $mode, $priority, [$this, "onGetOpen"], $promisor); @@ -418,7 +495,7 @@ class EioDriver implements Driver { private function onGetRead($fhAndPromisor, $result, $req) { list($fh, $promisor) = $fhAndPromisor; $priority = \EIO_PRI_DEFAULT; - \eio_close($fh, $priority, $this->callableDelReq); + \eio_close($fh, $priority, $this->callableDecrementor); if ($result === -1) { $promisor->fail(new FilesystemException( \eio_get_last_error($req) @@ -436,7 +513,7 @@ class EioDriver implements Driver { $mode = \EIO_S_IRUSR | \EIO_S_IWUSR | \EIO_S_IXUSR; $priority = \EIO_PRI_DEFAULT; - $this->incrementPending(); + \call_user_func($this->incrementor, 1);; $promisor = new Deferred; $data = [$contents, $promisor]; \eio_open($path, $flags, $mode, $priority, [$this, "onPutOpen"], $data); @@ -464,7 +541,7 @@ class EioDriver implements Driver { list($fh, $promisor) = $fhAndPromisor; \eio_close($fh); $priority = \EIO_PRI_DEFAULT; - \eio_close($fh, $priority, $this->callableDelReq); + \eio_close($fh, $priority, $this->callableDecrementor); if ($result === -1) { $promisor->fail(new FilesystemException( \eio_get_last_error($req) diff --git a/lib/EioHandle.php b/lib/EioHandle.php new file mode 100644 index 0000000..5d91e30 --- /dev/null +++ b/lib/EioHandle.php @@ -0,0 +1,201 @@ +incrementor = $incrementor; + $this->fh = $fh; + $this->path = $path; + $this->mode = $mode; + $this->size = $size; + $this->position = ($mode[0] === "a") ? $size : 0; + } + + /** + * {@inheritdoc} + */ + public function read($len) { + $promisor = new Deferred; + $op = new \StdClass; + $op->type = self::OP_READ; + $op->position = $this->position; + $op->promisor = $promisor; + $remaining = $this->size - $this->position; + $op->readLen = ($len > $remaining) ? $remaining : $len; + if ($this->isActive) { + $this->queue[] = $op; + } else { + \call_user_func($this->incrementor, 1); + $this->isActive = true; + \eio_read($this->fh, $op->readLen, $op->position, $priority = null, [$this, "onRead"], $op); + } + + return $promisor->promise(); + } + + private function dequeue() { + $this->isActive = true; + $op = \array_shift($this->queue); + switch ($op->type) { + case self::OP_READ: + \call_user_func($this->incrementor, 1); + $this->isActive = true; + \eio_read($this->fh, $op->readLen, $op->position, $priority = null, [$this, "onRead"], $op); + break; + case self::OP_WRITE: + \call_user_func($this->incrementor, 1); + $this->isActive = true; + \eio_write($this->fh, $op->writeData, \strlen($op->writeData), $op->position, $priority = null, [$this, "onWrite"], $op); + break; + } + } + + private function onRead($op, $result, $req) { + $this->isActive = false; + \call_user_func($this->incrementor, -1); + if ($result === -1) { + $op->promisor->fail(new FilesystemException( + \eio_get_last_error($req) + )); + } else { + $this->position = $op->position + \strlen($result); + $op->promisor->succeed($result); + } + if ($this->queue) { + $this->dequeue(); + } + } + + /** + * {@inheritdoc} + */ + public function write($data) { + $promisor = new Deferred; + $op = new \StdClass; + $op->type = self::OP_WRITE; + $op->position = $this->position; + $op->promisor = $promisor; + $op->writeData = $data; + $this->pendingWriteOps++; + if ($this->isActive) { + $this->queue[] = $op; + } else { + \call_user_func($this->incrementor, 1); + $this->isActive = true; + \eio_write($this->fh, $data, strlen($data), $op->position, $priority = null, [$this, "onWrite"], $op); + } + + return $promisor->promise(); + } + + private function onWrite($op, $result, $req) { + $this->isActive = false; + \call_user_func($this->incrementor, -1); + if ($result === -1) { + $op->promisor->fail(new FilesystemException( + \eio_get_last_error($req) + )); + } else { + StatCache::clear($this->path); + $bytesWritten = \strlen($op->writeData); + $this->pendingWriteOps--; + $newPosition = $op->position + $bytesWritten; + $delta = $newPosition - $this->position; + $this->position = ($this->mode[0] === "a") ? $this->position : $newPosition; + $this->size += $delta; + $op->promisor->succeed($result); + } + if ($this->queue) { + $this->dequeue(); + } + } + + /** + * {@inheritdoc} + */ + public function close() { + \call_user_func($this->incrementor, 1); + $promisor = new Deferred; + \eio_close($this->fh, $priority = null, [$this, "onClose"], $promisor); + + return $promisor->promise(); + } + + private function onClose($promisor, $result, $req) { + \call_user_func($this->incrementor, -1); + if ($result === -1) { + $promisor->fail(new FilesystemException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed(); + } + } + + /** + * {@inheritdoc} + */ + public function seek($offset, $whence = \SEEK_SET) { + $offset = (int) $offset; + switch ($whence) { + case \SEEK_SET: + $this->position = $offset; + break; + case \SEEK_CUR: + $this->position = $this->position + $offset; + break; + case \SEEK_END: + $this->position = $this->size + $offset; + break; + default: + throw new FilesystemException( + "Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected" + ); + } + } + + /** + * {@inheritdoc} + */ + public function tell() { + return $this->position; + } + + /** + * {@inheritdoc} + */ + public function eof() { + return ($this->pendingWriteOps > 0) ? false : ($this->size <= $this->position); + } + + /** + * {@inheritdoc} + */ + public function path() { + return $this->path; + } + + /** + * {@inheritdoc} + */ + public function mode() { + return $this->mode; + } +} diff --git a/lib/Handle.php b/lib/Handle.php new file mode 100644 index 0000000..de84078 --- /dev/null +++ b/lib/Handle.php @@ -0,0 +1,76 @@ +loop = $this->reactor->getLoop(); } + /** + * {@inheritdoc} + */ + public function open($path, $mode) { + switch ($mode) { + case "r": $flags = \UV::O_RDONLY; break; + case "r+": $flags = \UV::O_RDWR; break; + case "w": $flags = \UV::O_WRONLY | \UV::O_CREAT; break; + case "w+": $flags = \UV::O_RDWR | \UV::O_CREAT; break; + case "a": $flags = \UV::O_WRONLY | \UV::O_CREAT | \UV::O_APPEND; break; + case "a+": $flags = \UV::O_RDWR | \UV::O_CREAT | \UV::O_APPEND; break; + case "x": $flags = \UV::O_WRONLY | \UV::O_CREAT | \UV::O_EXCL; break; + case "x+": $flags = \UV::O_RDWR | \UV::O_CREAT | \UV::O_EXCL; break; + case "c": $flags = \UV::O_WRONLY | \UV::O_CREAT; break; + case "c+": $flags = \UV::O_RDWR | \UV::O_CREAT; break; + default: return new Failure(new FilesystemException( + "Invalid open mode" + )); + } + $chmod = ($flags & \UV::O_CREAT) ? 0644 : 0; + $this->reactor->addRef(); + $promisor = new Deferred; + $openArr = [$mode, $path, $promisor]; + \uv_fs_open($this->loop, $path, $flags, $chmod, function($fh) use ($openArr) { + if ($fh) { + $this->onOpenHandle($fh, $openArr); + } else { + $this->reactor->delRef(); + $promisor->fail(new \RuntimeException( + "Failed opening file handle" + )); + } + }); + + return $promisor->promise(); + } + + private function onOpenHandle($fh, array $openArr) { + list($mode) = $openArr; + if ($mode[0] === "w") { + \uv_fs_ftruncate($this->loop, $fh, $length = 0, function($fh) use ($openArr) { + $this->reactor->delRef(); + if ($fh) { + $this->finalizeHandle($fh, $size = 0, $openArr); + } else { + $promisor->fail(new FilesystemException( + "Failed truncating file" + )); + } + }); + } else { + \uv_fs_fstat($this->loop, $fh, function($fh, $stat) use ($openArr) { + $this->reactor->delRef(); + if ($fh) { + StatCache::set($openArr[1], $stat); + $this->finalizeHandle($fh, $stat["size"], $openArr); + } else { + $promisor->fail(new FilesystemException( + "Failed reading file size from open handle" + )); + } + }); + } + } + + private function finalizeHandle($fh, $size, array $openArr) { + list($mode, $path, $promisor) = $openArr; + $handle = new UvHandle($this->reactor, $fh, $path, $mode, $size); + $promisor->succeed($handle); + } + /** * {@inheritdoc} */ diff --git a/lib/UvHandle.php b/lib/UvHandle.php new file mode 100644 index 0000000..9c44f88 --- /dev/null +++ b/lib/UvHandle.php @@ -0,0 +1,201 @@ +reactor = $reactor; + $this->fh = $fh; + $this->path = $path; + $this->mode = $mode; + $this->size = $size; + $this->loop = $reactor->getLoop(); + $this->position = ($mode[0] === "a") ? $size : 0; + } + + /** + * {@inheritdoc} + */ + public function read($readLen) { + $promisor = new Deferred; + $op = new \StdClass; + $op->type = self::OP_READ; + $op->position = $this->position; + $op->promisor = $promisor; + $op->readLen = $readLen - 1; + if ($this->isActive) { + $this->queue[] = $op; + } else { + $this->isActive = true; + $this->doRead($op); + } + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function write($writeData) { + $this->pendingWriteOps++; + $promisor = new Deferred; + $op = new \StdClass; + $op->type = self::OP_WRITE; + $op->position = $this->position; + $op->promisor = $promisor; + $op->writeData = $writeData; + if ($this->isActive) { + $this->queue[] = $op; + } else { + $this->isActive = true; + $this->doWrite($op); + } + + return $promisor->promise(); + } + + private function doRead($op) { + $this->reactor->addRef(); + $onRead = function ($fh, $result, $buffer) use ($op) { + $this->isActive = false; + $this->reactor->delRef(); + if ($result < 0) { + $op->promisor->fail(new FilesystemException( + \uv_strerror($result) + )); + } else { + $this->position = $op->position + strlen($buffer); + $op->promisor->succeed($buffer); + } + if ($this->queue) { + $this->dequeue(); + } + }; + \uv_fs_read($this->loop, $this->fh, $op->position, $op->readLen, $onRead); + } + + private function doWrite($op) { + $this->reactor->addRef(); + $onWrite = function ($fh, $result) use ($op) { + $this->isActive = false; + $this->reactor->delRef(); + if ($result < 0) { + $op->promisor->fail(new FilesystemException( + \uv_strerror($result) + )); + } else { + StatCache::clear($this->path); + $bytesWritten = \strlen($op->writeData); + $this->pendingWriteOps--; + $newPosition = $op->position + $bytesWritten; + $delta = $newPosition - $this->position; + $this->position = ($this->mode[0] === "a") ? $this->position : $newPosition; + $this->size += $delta; + $op->promisor->succeed($result); + } + if ($this->queue) { + $this->dequeue(); + } + }; + \uv_fs_write($this->loop, $this->fh, $op->writeData, $op->position, $onWrite); + } + + private function dequeue() { + $this->isActive = true; + $op = \array_shift($this->queue); + switch ($op->type) { + case self::OP_READ: $this->doRead($op); break; + case self::OP_WRITE: $this->doWrite($op); break; + } + } + + /** + * {@inheritdoc} + */ + public function seek($offset, $whence = \SEEK_SET) { + $offset = (int) $offset; + switch ($whence) { + case \SEEK_SET: + $this->position = $offset; + break; + case \SEEK_CUR: + $this->position = $this->position + $offset; + break; + case \SEEK_END: + $this->position = $this->size + $offset; + break; + default: + throw new FilesystemException( + "Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected" + ); + } + } + + /** + * {@inheritdoc} + */ + public function tell() { + return $this->position; + } + + /** + * {@inheritdoc} + */ + public function eof() { + return ($this->pendingWriteOps > 0) ? false : ($this->size <= $this->position); + } + + /** + * {@inheritdoc} + */ + public function path() { + return $this->path; + } + + /** + * {@inheritdoc} + */ + public function mode() { + return $this->mode; + } + + /** + * {@inheritdoc} + */ + public function close() { + $this->isCloseInitialized = true; + $this->reactor->addRef(); + $promisor = new Deferred; + \uv_fs_close($this->loop, $this->fh, function($fh) use ($promisor) { + $this->reactor->delRef(); + $promisor->succeed(); + }); + + return $promisor->promise(); + } + + public function __destruct() { + if (empty($this->isCloseInitialized)) { + $this->close(); + } + } +} diff --git a/lib/functions.php b/lib/functions.php index 037ae8f..2aa8d59 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -35,6 +35,17 @@ function driver() { } } +/** + * Open a handle for the specified path + * + * @param string $path + * @param string $mode + * @return \Amp\File\Handle + */ +function open($path, $mode) { + return filesystem()->open($path, $mode); +} + /** * Execute a file stat operation * diff --git a/test/BlockingHandleTest.php b/test/BlockingHandleTest.php new file mode 100644 index 0000000..aefff6f --- /dev/null +++ b/test/BlockingHandleTest.php @@ -0,0 +1,16 @@ +markTestSkipped( + "eio extension not loaded" + ); + } + } + + public function testQueuedWritesOverrideEachOtherIfNotWaitedUpon() { + amp\run(function () { + $path = Fixture::path() . "/write"; + $handle = (yield file\open($path, "c+")); + $this->assertSame(0, $handle->tell()); + + $write1 = $handle->write("foo"); + $write2 = $handle->write("bar"); + + yield amp\all([$write1, $write2]); + + $handle->seek(0); + $contents = (yield $handle->read(8192)); + $this->assertSame(3, $handle->tell()); + $this->assertTrue($handle->eof()); + $this->assertSame("bar", $contents); + + yield $handle->close(); + yield file\unlink($path); + }); + } +} diff --git a/test/Fixture.php b/test/Fixture.php new file mode 100644 index 0000000..bb9b16c --- /dev/null +++ b/test/Fixture.php @@ -0,0 +1,47 @@ +assertSame(0, $handle->tell()); + + yield $handle->write("foo"); + yield $handle->write("bar"); + $handle->seek(0); + $contents = (yield $handle->read(8192)); + $this->assertSame(6, $handle->tell()); + $this->assertTrue($handle->eof()); + $this->assertSame("foobar", $contents); + + yield $handle->close(); + yield file\unlink($path); + }); + } + + public function testReadingToEof() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + $contents = ""; + $position = 0; + + $stat = (yield file\stat(__FILE__)); + $chunkSize = \floor(($stat["size"] / 5)); + + while (!$handle->eof()) { + $chunk = (yield $handle->read($chunkSize)); + $contents .= $chunk; + $position += \strlen($chunk); + $this->assertSame($position, $handle->tell()); + } + + $this->assertSame((yield file\get(__FILE__)), $contents); + }); + } + + public function testQueuedReads() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + + $contents = ""; + $read1 = $handle->read(10); + $handle->seek(10); + $read2 = $handle->read(10); + + $contents .= (yield $read1); + $contents .= (yield $read2); + + $expected = \substr((yield file\get(__FILE__)), 0, 20); + $this->assertSame($expected, $contents); + }); + } + + public function testReadingFromOffset() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + $this->assertSame(0, $handle->tell()); + $handle->seek(10); + $this->assertSame(10, $handle->tell()); + $chunk = (yield $handle->read(90)); + $this->assertSame(100, $handle->tell()); + $expected = \substr((yield file\get(__FILE__)), 10, 90); + $this->assertSame($expected, $chunk); + }); + } + + /** + * @expectedException Amp\File\FilesystemException + * @expectedExceptionMessage Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected + */ + public function testSeekThrowsOnInvalidWhence() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + $handle->seek(0, 99999); + }); + } + + public function testSeekSetCur() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + $this->assertSame(0, $handle->tell()); + $handle->seek(10); + $this->assertSame(10, $handle->tell()); + $handle->seek(-10, \SEEK_CUR); + $this->assertSame(0, $handle->tell()); + }); + } + + public function testSeekSetEnd() { + amp\run(function () { + $size = (yield file\size(__FILE__)); + $handle = (yield file\open(__FILE__, "r")); + $this->assertSame(0, $handle->tell()); + $handle->seek(-10, \SEEK_END); + $this->assertSame($size - 10, $handle->tell()); + }); + } + + public function testPath() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + $this->assertSame(__FILE__, $handle->path()); + }); + } + + public function testMode() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + $this->assertSame("r", $handle->mode()); + }); + } + + public function testClose() { + amp\run(function () { + $handle = (yield file\open(__FILE__, "r")); + yield $handle->close(); + }); + } +} diff --git a/test/UvDriverTest.php b/test/UvDriverTest.php index 7c0ff58..76d26e3 100644 --- a/test/UvDriverTest.php +++ b/test/UvDriverTest.php @@ -9,6 +9,7 @@ class UvDriverTest extends DriverTest { \Amp\reactor($reactor); $driver = new \Amp\File\UvDriver($reactor); \Amp\File\filesystem($driver); + parent::setUp(); } else { $this->markTestSkipped( "php-uv extension not loaded" diff --git a/test/UvHandleTest.php b/test/UvHandleTest.php new file mode 100644 index 0000000..25dbe7c --- /dev/null +++ b/test/UvHandleTest.php @@ -0,0 +1,44 @@ +markTestSkipped( + "php-uv extension not loaded" + ); + } + } + + public function testQueuedWritesOverrideEachOtherIfNotWaitedUpon() { + amp\run(function () { + $path = Fixture::path() . "/write"; + $handle = (yield file\open($path, "c+")); + $this->assertSame(0, $handle->tell()); + + $write1 = $handle->write("foo"); + $write2 = $handle->write("bar"); + + yield amp\all([$write1, $write2]); + + $handle->seek(0); + $contents = (yield $handle->read(8192)); + $this->assertSame(3, $handle->tell()); + $this->assertTrue($handle->eof()); + $this->assertSame("bar", $contents); + + yield $handle->close(); + yield file\unlink($path); + }); + } +}