diff --git a/lib/EioDescriptor.php b/lib/EioDescriptor.php new file mode 100644 index 0000000..4d124e6 --- /dev/null +++ b/lib/EioDescriptor.php @@ -0,0 +1,154 @@ +reactor = $reactor; + $this->fh = $fh; + $this->increment = $inc; + $this->decrement = $dec; + } + + public function __destruct() { + if (empty($this->isCloseInitialized)) { + $this->close(); + } + } + + /** + * {@inheritdoc} + */ + public function read($offset, $len) { + \call_user_func($this->increment); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_read($this->fh, $offset, $len, $priority, [$this, "onRead"], $promisor); + + return $promisor->promise(); + } + + private function onRead($promisor, $result, $req) { + \call_user_func($this->decrement); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed($result); + } + } + + /** + * {@inheritdoc} + */ + public function write($offset, $data) { + \call_user_func($this->increment); + $promisor = new Deferred; + $length = 0; + $priority = \EIO_PRI_DEFAULT; + \eio_write($this->fh, $data, $length, $offset, $priority, [$this, "onWrite"], $promisor); + + return $promisor->promise(); + } + + private function onWrite($promisor, $result, $req) { + \call_user_func($this->decrement); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed($result); + } + } + + /** + * {@inheritdoc} + */ + public function truncate($length = 0) { + \call_user_func($this->increment); + $promisor = new Deferred; + $length = 0; + $priority = \EIO_PRI_DEFAULT; + \eio_truncate($this->fh, $length, $priority, [$this, "onTruncate"], $promisor); + + return $promisor->promise(); + } + + private function onTruncate($promisor, $result, $req) { + \call_user_func($this->decrement); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed(); + } + } + + /** + * {@inheritdoc} + */ + public function stat() { + \call_user_func($this->increment); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_fstat($this->fh, $priority, [$this, "onStat"], $promisor); + + return $promisor->promise(); + } + + private function onStat($promisor, $result, $req) { + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $stat["isdir"] = (bool) ($stat["mode"] & Filesystem::S_IFDIR); + $stat["isfile"] = (bool) ($stat["mode"] & Filesystem::S_IFREG); + } + \call_user_func($this->decrement); + $promisor->succeed($result); + } + + /** + * {@inheritdoc} + */ + public function close() { + $this->isCloseInitialized = true; + \call_user_func($this->increment); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_close($this->fh, $priority, [$this, "onClose"], $promisor); + + return $promisor->promise(); + } + + private function onClose($promisor, $result, $req) { + \call_user_func($this->decrement); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed(); + } + } +} diff --git a/lib/EioFilesystem.php b/lib/EioFilesystem.php new file mode 100644 index 0000000..bfb0ca2 --- /dev/null +++ b/lib/EioFilesystem.php @@ -0,0 +1,368 @@ +reactor = $reactor ?: \Amp\reactor(); + $this->stream = \eio_get_event_stream(); + $this->callableDelReq = function() { + $this->decrementPending(); + }; + $this->internalIncrement = function() { + $this->incrementPending(); + }; + $this->internalDecrement = function() { + $this->decrementPending(); + }; + $this->watcher = $this->reactor->onReadable($this->stream, function() { + while (\eio_npending()) { + \eio_poll(); + } + }, $options = ["enable" => false]); + } + + private function incrementPending() { + if ($this->pending++ === 0) { + $this->reactor->enable($this->watcher); + } + } + + private function decrementPending() { + if ($this->pending-- === 1) { + $this->reactor->disable($this->watcher); + } + } + + /** + * {@inheritdoc} + */ + public function open($path, $mode = self::READ) { + $flags = 0; + + if (($mode & self::READ) && ($mode & self::WRITE)) { + $flags = \EIO_O_RDWR | \EIO_O_CREAT; + } elseif ($mode & self::READ) { + $flags = \EIO_O_RDONLY; + } elseif ($mode & self::WRITE) { + $flags = \EIO_O_WRONLY | \EIO_O_CREAT; + } else { + return new Failure(new \InvalidArgumentException( + "Invalid file open mode: Filesystem::READ or Filesystem::WRITE or both required" + )); + } + + $mode = \EIO_S_IRUSR | \EIO_S_IWUSR | \EIO_S_IXUSR; + $priority = \EIO_PRI_DEFAULT; + + $this->incrementPending(); + $promisor = new Deferred; + \eio_open($path, $flags, $mode, $priority, [$this, "onOpen"], $promisor); + + return $promisor->promise(); + } + + private function onOpen($promisor, $result, $req) { + $this->decrementPending(); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $descriptor = new EioDescriptor( + $this->reactor, + $result, + $this->internalIncrement, + $this->internalDecrement + ); + $promisor->succeed($descriptor); + } + } + + /** + * {@inheritdoc} + */ + public function stat($path) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_stat($path, $priority, [$this, "onStat"], $promisor); + + return $promisor->promise(); + } + + private function onStat($promisor, $result, $req) { + if ($result === -1) { + $stat = null; + } else { + $stat = $result; + $stat["isdir"] = (bool) ($stat["mode"] & self::S_IFDIR); + $stat["isfile"] = (bool) ($stat["mode"] & self::S_IFREG); + } + $this->decrementPending(); + $promisor->succeed($stat); + } + + /** + * {@inheritdoc} + */ + public function lstat($path) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_lstat($path, $priority, [$this, "onStat"], $promisor); + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function symlink($target, $link) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_symlink($target, $link, $priority, [$this, "onGenericResult"], $promisor); + + return $promisor->promise(); + } + + private function onGenericResult($promisor, $result, $req) { + $this->decrementPending(); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed(true); + } + } + + /** + * {@inheritdoc} + */ + public function rename($from, $to) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_rename($from, $to, $priority, [$this, "onGenericResult"], $promisor); + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function unlink($path) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_unlink($path, $priority, [$this, "onGenericResult"], $promisor); + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function mkdir($path, $mode = 0644) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_mkdir($path, $mode, $priority, [$this, "onGenericResult"], $promisor); + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function rmdir($path) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_rmdir($path, $priority, [$this, "onGenericResult"], $promisor); + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function scandir($path) { + $this->incrementPending(); + $promisor = new Deferred; + $flags = \EIO_READDIR_STAT_ORDER | \EIO_READDIR_DIRS_FIRST; + $priority = \EIO_PRI_DEFAULT; + \eio_readdir($path, $flags, $priority, [$this, "onScandir"], $promisor); + + return $promisor->promise(); + } + + private function onScandir($promisor, $result, $req) { + $this->decrementPending(); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed($result["names"]); + } + } + + /** + * {@inheritdoc} + */ + public function chmod($path, $mode) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_chmod($path, $mode, $priority, [$this, "onGenericResult"], $promisor); + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function chown($path, $uid, $gid) { + $this->incrementPending(); + $promisor = new Deferred; + $priority = \EIO_PRI_DEFAULT; + \eio_chown($path, $uid, $gid, $priority, [$this, "onGenericResult"], $promisor); + + return $promisor->promise(); + } + + /** + * {@inheritdoc} + */ + public function get($path) { + $flags = $flags = \EIO_O_RDONLY; + $mode = 0; + $priority = \EIO_PRI_DEFAULT; + + $this->incrementPending(); + $promisor = new Deferred; + \eio_open($path, $flags, $mode, $priority, [$this, "onGetOpen"], $promisor); + + return $promisor->promise(); + } + + private function onGetOpen($promisor, $result, $req) { + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $priority = \EIO_PRI_DEFAULT; + \eio_fstat($result, $priority, [$this, "onGetFstat"], [$result, $promisor]); + } + } + + private function onGetFstat($fhAndPromisor, $result, $req) { + list($fh, $promisor) = $fhAndPromisor; + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + return; + } + + $offset = 0; + $length = $result["size"]; + $priority = \EIO_PRI_DEFAULT; + \eio_read($fh, $length, $offset, $priority, [$this, "onGetRead"], $fhAndPromisor); + } + + private function onGetRead($fhAndPromisor, $result, $req) { + list($fh, $promisor) = $fhAndPromisor; + $priority = \EIO_PRI_DEFAULT; + \eio_close($fh, $priority, $this->callableDelReq); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed($result); + } + } + + /** + * {@inheritdoc} + */ + public function put($path, $contents) { + $flags = \EIO_O_RDWR | \EIO_O_CREAT; + $mode = \EIO_S_IRUSR | \EIO_S_IWUSR | \EIO_S_IXUSR; + $priority = \EIO_PRI_DEFAULT; + + $this->incrementPending(); + $promisor = new Deferred; + $data = [$contents, $promisor]; + \eio_open($path, $flags, $mode, $priority, [$this, "onPutOpen"], $data); + + return $promisor->promise(); + } + + private function onPutOpen($data, $result, $req) { + list($contents, $promisor) = $data; + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $length = strlen($contents); + $offset = 0; + $priority = \EIO_PRI_DEFAULT; + $callback = [$this, "onPutWrite"]; + $fhAndPromisor = [$result, $promisor]; + \eio_write($result, $contents, $length, $offset, $priority, $callback, $fhAndPromisor); + } + } + + private function onPutWrite($fhAndPromisor, $result, $req) { + list($fh, $promisor) = $fhAndPromisor; + \eio_close($fh); + $priority = \EIO_PRI_DEFAULT; + \eio_close($fh, $priority, $this->callableDelReq); + if ($result === -1) { + $promisor->fail(new \RuntimeException( + \eio_get_last_error($req) + )); + } else { + $promisor->succeed($result); + } + } + + public function __destruct() { + $this->stream = null; + $this->reactor->cancel($this->watcher); + + /** + * pecl/eio has a race condition issue when freeing threaded + * resources and we can get intermittent segfaults at script + * shutdown in certain cases if we don't wait for a moment. + * + * @TODO see if we can PR a fix for this problem in pecl/eio + */ + usleep(1000); + } +} diff --git a/lib/functions.php b/lib/functions.php index d74be2d..f1a1c60 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -16,12 +16,8 @@ function fs(Filesystem $assign = null) { return $filesystem; } elseif (\extension_loaded("uv")) { return ($filesystem = new UvFilesystem(\Amp\reactor())); - /* - // @TODO - } elseif (\extension_loaded("eio") { - return ($filesystem = new EioFilesystem); - } - */ + } elseif (\extension_loaded("eio")) { + return ($filesystem = new EioFilesystem(\Amp\reactor())); } else { return ($filesystem = new BlockingFilesystem(\Amp\reactor())); } diff --git a/test/EioFilesystemTest.php b/test/EioFilesystemTest.php new file mode 100644 index 0000000..6222a93 --- /dev/null +++ b/test/EioFilesystemTest.php @@ -0,0 +1,23 @@ +markTestSkipped( + "php-uv extension not loaded" + ); + } + } + + protected function getFilesystem(Reactor $reactor) { + return new EioFilesystem($reactor); + } +} diff --git a/test/FilesystemTest.php b/test/FilesystemTest.php index d324065..8a4e25b 100644 --- a/test/FilesystemTest.php +++ b/test/FilesystemTest.php @@ -104,7 +104,7 @@ abstract class FilesystemTest extends \PHPUnit_Framework_TestCase { $toUnlink = __DIR__ . "/fixture/unlink"; - yield $fs->put($toUnlink, ""); + yield $fs->put($toUnlink, "unlink me"); $this->assertTrue((bool) (yield $fs->stat($toUnlink))); yield $fs->unlink($toUnlink); $this->assertNull(yield $fs->stat($toUnlink)); @@ -125,5 +125,4 @@ abstract class FilesystemTest extends \PHPUnit_Framework_TestCase { $this->assertNull(yield $fs->stat($dir)); }); } - }