1
0
mirror of https://github.com/danog/file.git synced 2024-11-30 04:19:39 +01:00

Fix EioPoll busy watcher

Previously we used listen() + done(). This API design is quite prone to
errors, as it's easy to miss a done() call in some branch. Additionally
this commit ensures that pending operations are always completed before
the EioPoll is completely destructed. Previously unfinished operations
leaked into new EioPoll instances.
This commit is contained in:
Niklas Keller 2017-06-20 22:59:23 +02:00
parent 8cd21e4994
commit 6d2ca5e2b2
4 changed files with 117 additions and 82 deletions

View File

@ -19,10 +19,11 @@ class EioDriver implements Driver {
*/
public function open(string $path, string $mode): Promise {
$flags = \EIO_O_NONBLOCK | \EIO_O_FSYNC | $this->parseMode($mode);
$chmod = ($flags & \EIO_O_CREAT) ? 0644 : 0;
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$openArr = [$mode, $path, $deferred];
\eio_open($path, $flags, $chmod, \EIO_PRI_DEFAULT, [$this, "onOpenHandle"], $openArr);
@ -51,11 +52,9 @@ class EioDriver implements Driver {
private function onOpenHandle($openArr, $result, $req) {
list($mode, $path, $deferred) = $openArr;
if ($result === -1) {
$this->poll->done();
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} elseif ($mode[0] === "a") {
\array_unshift($openArr, $result);
\eio_ftruncate($result, $offset = 0, \EIO_PRI_DEFAULT, [$this, "onOpenFtruncate"], $openArr);
@ -66,12 +65,10 @@ class EioDriver implements Driver {
}
private function onOpenFtruncate($openArr, $result, $req) {
$this->poll->done();
list($fh, $mode, $path, $deferred) = $openArr;
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$handle = new EioHandle($this->poll, $fh, $path, $mode, $size = 0);
$deferred->resolve($handle);
@ -79,12 +76,9 @@ class EioDriver implements Driver {
}
private function onOpenFstat($openArr, $result, $req) {
$this->poll->done();
list($fh, $mode, $path, $deferred) = $openArr;
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
StatCache::set($path, $result);
$handle = new EioHandle($this->poll, $fh, $path, $mode, $result["size"]);
@ -100,8 +94,9 @@ class EioDriver implements Driver {
return new Success($stat);
}
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
$data = [$deferred, $path];
\eio_stat($path, $priority, [$this, "onStat"], $data);
@ -111,7 +106,6 @@ class EioDriver implements Driver {
private function onStat($data, $result, $req) {
list($deferred, $path) = $data;
$this->poll->done();
if ($result === -1) {
$deferred->resolve(null);
} else {
@ -125,6 +119,7 @@ class EioDriver implements Driver {
*/
public function exists(string $path): Promise {
$deferred = new Deferred;
$this->stat($path)->onResolve(function ($error, $result) use ($deferred) {
$deferred->resolve((bool) $result);
});
@ -137,6 +132,7 @@ class EioDriver implements Driver {
*/
public function isdir(string $path): Promise {
$deferred = new Deferred;
$this->stat($path)->onResolve(function ($error, $result) use ($deferred) {
if ($result) {
$deferred->resolve(!($result["mode"] & \EIO_S_IFREG));
@ -153,6 +149,7 @@ class EioDriver implements Driver {
*/
public function isfile(string $path): Promise {
$deferred = new Deferred;
$this->stat($path)->onResolve(function ($error, $result) use ($deferred) {
if ($result) {
$deferred->resolve((bool) ($result["mode"] & \EIO_S_IFREG));
@ -169,6 +166,7 @@ class EioDriver implements Driver {
*/
public function size(string $path): Promise {
$deferred = new Deferred;
$this->stat($path)->onResolve(function ($error, $result) use ($deferred) {
if (empty($result)) {
$deferred->fail(new FilesystemException(
@ -191,6 +189,7 @@ class EioDriver implements Driver {
*/
public function mtime(string $path): Promise {
$deferred = new Deferred;
$this->stat($path)->onResolve(function ($error, $result) use ($deferred) {
if ($result) {
$deferred->resolve($result["mtime"]);
@ -209,6 +208,7 @@ class EioDriver implements Driver {
*/
public function atime(string $path): Promise {
$deferred = new Deferred;
$this->stat($path)->onResolve(function ($error, $result) use ($deferred) {
if ($result) {
$deferred->resolve($result["atime"]);
@ -227,6 +227,7 @@ class EioDriver implements Driver {
*/
public function ctime(string $path): Promise {
$deferred = new Deferred;
$this->stat($path)->onResolve(function ($error, $result) use ($deferred) {
if ($result) {
$deferred->resolve($result["ctime"]);
@ -244,8 +245,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function lstat(string $path): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_lstat($path, $priority, [$this, "onLstat"], $deferred);
@ -253,7 +255,6 @@ class EioDriver implements Driver {
}
private function onLstat($deferred, $result, $req) {
$this->poll->done();
if ($result === -1) {
$deferred->resolve(null);
} else {
@ -265,8 +266,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function symlink(string $target, string $link): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_symlink($target, $link, $priority, [$this, "onGenericResult"], $deferred);
@ -277,8 +279,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function link(string $target, string $link): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_link($target, $link, $priority, [$this, "onGenericResult"], $deferred);
@ -289,19 +292,17 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function readlink(string $path): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_readlink($path, $priority, [$this, "onGenericResult"], $deferred);
return $deferred->promise();
}
private function onGenericResult($deferred, $result, $req) {
$this->poll->done();
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$deferred->resolve(true);
}
@ -311,8 +312,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function rename(string $from, string $to): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_rename($from, $to, $priority, [$this, "onGenericResult"], $deferred);
@ -323,8 +325,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function unlink(string $path): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
$data = [$deferred, $path];
\eio_unlink($path, $priority, [$this, "onUnlink"], $data);
@ -334,11 +337,9 @@ class EioDriver implements Driver {
private function onUnlink($data, $result, $req) {
list($deferred, $path) = $data;
$this->poll->done();
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
StatCache::clear($path);
$deferred->resolve(true);
@ -349,8 +350,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function mkdir(string $path, int $mode = 0644, bool $recursive = false): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
if ($recursive) {
@ -390,8 +392,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function rmdir(string $path): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
$data = [$deferred, $path];
\eio_rmdir($path, $priority, [$this, "onRmdir"], $data);
@ -401,11 +404,9 @@ class EioDriver implements Driver {
private function onRmdir($data, $result, $req) {
list($deferred, $path) = $data;
$this->poll->done();
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
StatCache::clear($path);
$deferred->resolve(true);
@ -416,8 +417,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function scandir(string $path): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$flags = \EIO_READDIR_STAT_ORDER | \EIO_READDIR_DIRS_FIRST;
$priority = \EIO_PRI_DEFAULT;
\eio_readdir($path, $flags, $priority, [$this, "onScandir"], $deferred);
@ -426,11 +428,8 @@ class EioDriver implements Driver {
}
private function onScandir($deferred, $result, $req) {
$this->poll->done();
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$deferred->resolve($result["names"]);
}
@ -440,8 +439,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function chmod(string $path, int $mode): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_chmod($path, $mode, $priority, [$this, "onGenericResult"], $deferred);
@ -452,8 +452,9 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function chown(string $path, int $uid, int $gid): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_chown($path, $uid, $gid, $priority, [$this, "onGenericResult"], $deferred);
@ -466,8 +467,9 @@ class EioDriver implements Driver {
public function touch(string $path): Promise {
$atime = $mtime = \time();
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$priority = \EIO_PRI_DEFAULT;
\eio_utime($path, $atime, $mtime, $priority, [$this, "onGenericResult"], $deferred);
@ -482,8 +484,9 @@ class EioDriver implements Driver {
$mode = 0;
$priority = \EIO_PRI_DEFAULT;
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
\eio_open($path, $flags, $mode, $priority, [$this, "onGetOpen"], $deferred);
return $deferred->promise();
@ -491,9 +494,7 @@ class EioDriver implements Driver {
private function onGetOpen($deferred, $result, $req) {
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$priority = \EIO_PRI_DEFAULT;
\eio_fstat($result, $priority, [$this, "onGetFstat"], [$result, $deferred]);
@ -502,10 +503,9 @@ class EioDriver implements Driver {
private function onGetFstat($fhAndPromisor, $result, $req) {
list($fh, $deferred) = $fhAndPromisor;
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
return;
}
@ -517,12 +517,11 @@ class EioDriver implements Driver {
private function onGetRead($fhAndPromisor, $result, $req) {
list($fh, $deferred) = $fhAndPromisor;
$priority = \EIO_PRI_DEFAULT;
\eio_close($fh, $priority, [$this->poll, "done"]);
\eio_close($fh);
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$deferred->resolve($result);
}
@ -536,8 +535,9 @@ class EioDriver implements Driver {
$mode = \EIO_S_IRUSR | \EIO_S_IWUSR | \EIO_S_IXUSR;
$priority = \EIO_PRI_DEFAULT;
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$data = [$contents, $deferred];
\eio_open($path, $flags, $mode, $priority, [$this, "onPutOpen"], $data);
@ -546,10 +546,9 @@ class EioDriver implements Driver {
private function onPutOpen($data, $result, $req) {
list($contents, $deferred) = $data;
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$length = strlen($contents);
$offset = 0;
@ -562,13 +561,11 @@ class EioDriver implements Driver {
private function onPutWrite($fhAndPromisor, $result, $req) {
list($fh, $deferred) = $fhAndPromisor;
\eio_close($fh);
$priority = \EIO_PRI_DEFAULT;
\eio_close($fh, $priority, [$this->poll, "done"]);
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$deferred->resolve($result);
}

View File

@ -43,7 +43,7 @@ class EioHandle implements Handle {
$length = $length > $remaining ? $remaining : $length;
$deferred = new Deferred;
$this->poll->listen();
$this->poll->listen($deferred->promise());
\eio_read(
$this->fh,
@ -59,7 +59,7 @@ class EioHandle implements Handle {
private function onRead(Deferred $deferred, $result, $req) {
$this->isActive = false;
$this->poll->done();
if ($result === -1) {
$deferred->fail(new FilesystemException(
sprintf('Reading from file failed: %s.', \eio_get_last_error($req))
@ -101,8 +101,9 @@ class EioHandle implements Handle {
private function push(string $data): Promise {
$length = \strlen($data);
$deferred = new Deferred;
$this->poll->listen();
$this->poll->listen($deferred->promise());
\eio_write(
$this->fh,
@ -128,8 +129,6 @@ class EioHandle implements Handle {
}
private function onWrite(Deferred $deferred, $result, $req) {
$this->poll->done();
if ($this->queue->isEmpty()) {
$deferred->fail(new FilesystemException('No pending write, the file may have been closed'));
}
@ -157,19 +156,17 @@ class EioHandle implements Handle {
* {@inheritdoc}
*/
public function close(): Promise {
$this->poll->listen();
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
\eio_close($this->fh, \EIO_PRI_DEFAULT, [$this, "onClose"], $deferred);
return $deferred->promise();
}
private function onClose($deferred, $result, $req) {
$this->poll->done();
if ($result === -1) {
$deferred->fail(new FilesystemException(
\eio_get_last_error($req)
));
$deferred->fail(new FilesystemException(\eio_get_last_error($req)));
} else {
$deferred->resolve();
}

View File

@ -2,9 +2,13 @@
namespace Amp\File\Internal;
use Amp\CallableMaker;
use Amp\Loop;
use Amp\Promise;
class EioPoll {
use CallableMaker;
/** @var resource */
private static $stream;
@ -14,7 +18,12 @@ class EioPoll {
/** @var int */
private $requests = 0;
/** @var callable */
private $onDone;
public function __construct() {
$this->onDone = $this->callableFromInstanceMethod("done");
if (!self::$stream) {
\eio_init();
self::$stream = \eio_get_event_stream();
@ -27,21 +36,40 @@ class EioPoll {
});
Loop::disable($this->watcher);
Loop::setState(self::class, new class ($this->watcher) {
private $watcher;
private $driver;
public function __construct(string $watcher) {
$this->watcher = $watcher;
$this->driver = Loop::get();
}
public function __destruct() {
$this->driver->cancel($this->watcher);
// Ensure there are no active operations anymore. This is a safe-guard as some operations might not be
// finished on loop exit due to not being yielded. This also ensures a clean shutdown for these if PHP
// exists.
\eio_event_loop();
}
});
}
public function listen() {
public function listen(Promise $promise) {
if ($this->requests++ === 0) {
Loop::enable($this->watcher);
}
$promise->onResolve($this->onDone);
}
public function done() {
private function done() {
if (--$this->requests === 0) {
Loop::disable($this->watcher);
}
}
public function __destruct() {
Loop::cancel($this->watcher);
\assert($this->requests >= 0);
}
}

View File

@ -11,7 +11,6 @@ class StatCache {
private static $now = null;
private static function init() {
Loop::setState(self::class, true);
self::$now = \time();
$watcher = Loop::repeat(1000, function () {
@ -29,6 +28,20 @@ class StatCache {
});
Loop::unreference($watcher);
Loop::setState(self::class, new class ($watcher) {
private $watcher;
private $driver;
public function __construct(string $watcher) {
$this->watcher = $watcher;
$this->driver = Loop::get();
}
public function __destruct() {
$this->driver->cancel($this->watcher);
}
});
}
public static function get(string $path) {