From 6d2ca5e2b26b2e190424aae9207646d7d2f89ec0 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Tue, 20 Jun 2017 22:59:23 +0200 Subject: [PATCH] Fix EioPoll busy watcher Previously we used listen() + done(). This API design is quite prone to errors, as it's easy to miss a done() call in some branch. Additionally this commit ensures that pending operations are always completed before the EioPoll is completely destructed. Previously unfinished operations leaked into new EioPoll instances. --- lib/EioDriver.php | 129 +++++++++++++++++++-------------------- lib/EioHandle.php | 17 +++--- lib/Internal/EioPoll.php | 38 ++++++++++-- lib/StatCache.php | 15 ++++- 4 files changed, 117 insertions(+), 82 deletions(-) diff --git a/lib/EioDriver.php b/lib/EioDriver.php index 7a1ee74..e5389ac 100644 --- a/lib/EioDriver.php +++ b/lib/EioDriver.php @@ -19,10 +19,11 @@ class EioDriver implements Driver { */ public function open(string $path, string $mode): Promise { $flags = \EIO_O_NONBLOCK | \EIO_O_FSYNC | $this->parseMode($mode); - $chmod = ($flags & \EIO_O_CREAT) ? 0644 : 0; - $this->poll->listen(); + $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $openArr = [$mode, $path, $deferred]; \eio_open($path, $flags, $chmod, \EIO_PRI_DEFAULT, [$this, "onOpenHandle"], $openArr); @@ -51,11 +52,9 @@ class EioDriver implements Driver { private function onOpenHandle($openArr, $result, $req) { list($mode, $path, $deferred) = $openArr; + if ($result === -1) { - $this->poll->done(); - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } elseif ($mode[0] === "a") { \array_unshift($openArr, $result); \eio_ftruncate($result, $offset = 0, \EIO_PRI_DEFAULT, [$this, "onOpenFtruncate"], $openArr); @@ -66,12 +65,10 @@ class EioDriver implements Driver { } private function onOpenFtruncate($openArr, $result, $req) { - $this->poll->done(); list($fh, $mode, $path, $deferred) = $openArr; + if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $handle = new EioHandle($this->poll, $fh, $path, $mode, $size = 0); $deferred->resolve($handle); @@ -79,12 +76,9 @@ class EioDriver implements Driver { } private function onOpenFstat($openArr, $result, $req) { - $this->poll->done(); list($fh, $mode, $path, $deferred) = $openArr; if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { StatCache::set($path, $result); $handle = new EioHandle($this->poll, $fh, $path, $mode, $result["size"]); @@ -100,8 +94,9 @@ class EioDriver implements Driver { return new Success($stat); } - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; $data = [$deferred, $path]; \eio_stat($path, $priority, [$this, "onStat"], $data); @@ -111,7 +106,6 @@ class EioDriver implements Driver { private function onStat($data, $result, $req) { list($deferred, $path) = $data; - $this->poll->done(); if ($result === -1) { $deferred->resolve(null); } else { @@ -125,6 +119,7 @@ class EioDriver implements Driver { */ public function exists(string $path): Promise { $deferred = new Deferred; + $this->stat($path)->onResolve(function ($error, $result) use ($deferred) { $deferred->resolve((bool) $result); }); @@ -137,6 +132,7 @@ class EioDriver implements Driver { */ public function isdir(string $path): Promise { $deferred = new Deferred; + $this->stat($path)->onResolve(function ($error, $result) use ($deferred) { if ($result) { $deferred->resolve(!($result["mode"] & \EIO_S_IFREG)); @@ -153,6 +149,7 @@ class EioDriver implements Driver { */ public function isfile(string $path): Promise { $deferred = new Deferred; + $this->stat($path)->onResolve(function ($error, $result) use ($deferred) { if ($result) { $deferred->resolve((bool) ($result["mode"] & \EIO_S_IFREG)); @@ -169,6 +166,7 @@ class EioDriver implements Driver { */ public function size(string $path): Promise { $deferred = new Deferred; + $this->stat($path)->onResolve(function ($error, $result) use ($deferred) { if (empty($result)) { $deferred->fail(new FilesystemException( @@ -191,6 +189,7 @@ class EioDriver implements Driver { */ public function mtime(string $path): Promise { $deferred = new Deferred; + $this->stat($path)->onResolve(function ($error, $result) use ($deferred) { if ($result) { $deferred->resolve($result["mtime"]); @@ -209,6 +208,7 @@ class EioDriver implements Driver { */ public function atime(string $path): Promise { $deferred = new Deferred; + $this->stat($path)->onResolve(function ($error, $result) use ($deferred) { if ($result) { $deferred->resolve($result["atime"]); @@ -227,6 +227,7 @@ class EioDriver implements Driver { */ public function ctime(string $path): Promise { $deferred = new Deferred; + $this->stat($path)->onResolve(function ($error, $result) use ($deferred) { if ($result) { $deferred->resolve($result["ctime"]); @@ -244,8 +245,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function lstat(string $path): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_lstat($path, $priority, [$this, "onLstat"], $deferred); @@ -253,7 +255,6 @@ class EioDriver implements Driver { } private function onLstat($deferred, $result, $req) { - $this->poll->done(); if ($result === -1) { $deferred->resolve(null); } else { @@ -265,8 +266,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function symlink(string $target, string $link): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_symlink($target, $link, $priority, [$this, "onGenericResult"], $deferred); @@ -277,8 +279,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function link(string $target, string $link): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_link($target, $link, $priority, [$this, "onGenericResult"], $deferred); @@ -289,19 +292,17 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function readlink(string $path): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_readlink($path, $priority, [$this, "onGenericResult"], $deferred); return $deferred->promise(); } private function onGenericResult($deferred, $result, $req) { - $this->poll->done(); if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $deferred->resolve(true); } @@ -311,8 +312,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function rename(string $from, string $to): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_rename($from, $to, $priority, [$this, "onGenericResult"], $deferred); @@ -323,8 +325,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function unlink(string $path): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; $data = [$deferred, $path]; \eio_unlink($path, $priority, [$this, "onUnlink"], $data); @@ -334,11 +337,9 @@ class EioDriver implements Driver { private function onUnlink($data, $result, $req) { list($deferred, $path) = $data; - $this->poll->done(); + if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { StatCache::clear($path); $deferred->resolve(true); @@ -349,8 +350,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function mkdir(string $path, int $mode = 0644, bool $recursive = false): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; if ($recursive) { @@ -390,8 +392,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function rmdir(string $path): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; $data = [$deferred, $path]; \eio_rmdir($path, $priority, [$this, "onRmdir"], $data); @@ -401,11 +404,9 @@ class EioDriver implements Driver { private function onRmdir($data, $result, $req) { list($deferred, $path) = $data; - $this->poll->done(); + if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { StatCache::clear($path); $deferred->resolve(true); @@ -416,8 +417,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function scandir(string $path): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $flags = \EIO_READDIR_STAT_ORDER | \EIO_READDIR_DIRS_FIRST; $priority = \EIO_PRI_DEFAULT; \eio_readdir($path, $flags, $priority, [$this, "onScandir"], $deferred); @@ -426,11 +428,8 @@ class EioDriver implements Driver { } private function onScandir($deferred, $result, $req) { - $this->poll->done(); if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $deferred->resolve($result["names"]); } @@ -440,8 +439,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function chmod(string $path, int $mode): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_chmod($path, $mode, $priority, [$this, "onGenericResult"], $deferred); @@ -452,8 +452,9 @@ class EioDriver implements Driver { * {@inheritdoc} */ public function chown(string $path, int $uid, int $gid): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_chown($path, $uid, $gid, $priority, [$this, "onGenericResult"], $deferred); @@ -466,8 +467,9 @@ class EioDriver implements Driver { public function touch(string $path): Promise { $atime = $mtime = \time(); - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $priority = \EIO_PRI_DEFAULT; \eio_utime($path, $atime, $mtime, $priority, [$this, "onGenericResult"], $deferred); @@ -482,8 +484,9 @@ class EioDriver implements Driver { $mode = 0; $priority = \EIO_PRI_DEFAULT; - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + \eio_open($path, $flags, $mode, $priority, [$this, "onGetOpen"], $deferred); return $deferred->promise(); @@ -491,9 +494,7 @@ class EioDriver implements Driver { private function onGetOpen($deferred, $result, $req) { if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $priority = \EIO_PRI_DEFAULT; \eio_fstat($result, $priority, [$this, "onGetFstat"], [$result, $deferred]); @@ -502,10 +503,9 @@ class EioDriver implements Driver { private function onGetFstat($fhAndPromisor, $result, $req) { list($fh, $deferred) = $fhAndPromisor; + if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); return; } @@ -517,12 +517,11 @@ class EioDriver implements Driver { private function onGetRead($fhAndPromisor, $result, $req) { list($fh, $deferred) = $fhAndPromisor; - $priority = \EIO_PRI_DEFAULT; - \eio_close($fh, $priority, [$this->poll, "done"]); + + \eio_close($fh); + if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $deferred->resolve($result); } @@ -536,8 +535,9 @@ class EioDriver implements Driver { $mode = \EIO_S_IRUSR | \EIO_S_IWUSR | \EIO_S_IXUSR; $priority = \EIO_PRI_DEFAULT; - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + $data = [$contents, $deferred]; \eio_open($path, $flags, $mode, $priority, [$this, "onPutOpen"], $data); @@ -546,10 +546,9 @@ class EioDriver implements Driver { private function onPutOpen($data, $result, $req) { list($contents, $deferred) = $data; + if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $length = strlen($contents); $offset = 0; @@ -562,13 +561,11 @@ class EioDriver implements Driver { private function onPutWrite($fhAndPromisor, $result, $req) { list($fh, $deferred) = $fhAndPromisor; + \eio_close($fh); - $priority = \EIO_PRI_DEFAULT; - \eio_close($fh, $priority, [$this->poll, "done"]); + if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $deferred->resolve($result); } diff --git a/lib/EioHandle.php b/lib/EioHandle.php index ddd07ec..94b2ffe 100644 --- a/lib/EioHandle.php +++ b/lib/EioHandle.php @@ -43,7 +43,7 @@ class EioHandle implements Handle { $length = $length > $remaining ? $remaining : $length; $deferred = new Deferred; - $this->poll->listen(); + $this->poll->listen($deferred->promise()); \eio_read( $this->fh, @@ -59,7 +59,7 @@ class EioHandle implements Handle { private function onRead(Deferred $deferred, $result, $req) { $this->isActive = false; - $this->poll->done(); + if ($result === -1) { $deferred->fail(new FilesystemException( sprintf('Reading from file failed: %s.', \eio_get_last_error($req)) @@ -101,8 +101,9 @@ class EioHandle implements Handle { private function push(string $data): Promise { $length = \strlen($data); + $deferred = new Deferred; - $this->poll->listen(); + $this->poll->listen($deferred->promise()); \eio_write( $this->fh, @@ -128,8 +129,6 @@ class EioHandle implements Handle { } private function onWrite(Deferred $deferred, $result, $req) { - $this->poll->done(); - if ($this->queue->isEmpty()) { $deferred->fail(new FilesystemException('No pending write, the file may have been closed')); } @@ -157,19 +156,17 @@ class EioHandle implements Handle { * {@inheritdoc} */ public function close(): Promise { - $this->poll->listen(); $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + \eio_close($this->fh, \EIO_PRI_DEFAULT, [$this, "onClose"], $deferred); return $deferred->promise(); } private function onClose($deferred, $result, $req) { - $this->poll->done(); if ($result === -1) { - $deferred->fail(new FilesystemException( - \eio_get_last_error($req) - )); + $deferred->fail(new FilesystemException(\eio_get_last_error($req))); } else { $deferred->resolve(); } diff --git a/lib/Internal/EioPoll.php b/lib/Internal/EioPoll.php index 8e6a41e..f68d934 100644 --- a/lib/Internal/EioPoll.php +++ b/lib/Internal/EioPoll.php @@ -2,9 +2,13 @@ namespace Amp\File\Internal; +use Amp\CallableMaker; use Amp\Loop; +use Amp\Promise; class EioPoll { + use CallableMaker; + /** @var resource */ private static $stream; @@ -14,7 +18,12 @@ class EioPoll { /** @var int */ private $requests = 0; + /** @var callable */ + private $onDone; + public function __construct() { + $this->onDone = $this->callableFromInstanceMethod("done"); + if (!self::$stream) { \eio_init(); self::$stream = \eio_get_event_stream(); @@ -27,21 +36,40 @@ class EioPoll { }); Loop::disable($this->watcher); + + Loop::setState(self::class, new class ($this->watcher) { + private $watcher; + private $driver; + + public function __construct(string $watcher) { + $this->watcher = $watcher; + $this->driver = Loop::get(); + } + + public function __destruct() { + $this->driver->cancel($this->watcher); + + // Ensure there are no active operations anymore. This is a safe-guard as some operations might not be + // finished on loop exit due to not being yielded. This also ensures a clean shutdown for these if PHP + // exists. + \eio_event_loop(); + } + }); } - public function listen() { + public function listen(Promise $promise) { if ($this->requests++ === 0) { Loop::enable($this->watcher); } + + $promise->onResolve($this->onDone); } - public function done() { + private function done() { if (--$this->requests === 0) { Loop::disable($this->watcher); } - } - public function __destruct() { - Loop::cancel($this->watcher); + \assert($this->requests >= 0); } } diff --git a/lib/StatCache.php b/lib/StatCache.php index 596f4b3..335a53d 100644 --- a/lib/StatCache.php +++ b/lib/StatCache.php @@ -11,7 +11,6 @@ class StatCache { private static $now = null; private static function init() { - Loop::setState(self::class, true); self::$now = \time(); $watcher = Loop::repeat(1000, function () { @@ -29,6 +28,20 @@ class StatCache { }); Loop::unreference($watcher); + + Loop::setState(self::class, new class ($watcher) { + private $watcher; + private $driver; + + public function __construct(string $watcher) { + $this->watcher = $watcher; + $this->driver = Loop::get(); + } + + public function __destruct() { + $this->driver->cancel($this->watcher); + } + }); } public static function get(string $path) {