diff --git a/src/Driver/BlockingFile.php b/src/Driver/BlockingFile.php index fa785bc..c7a35d3 100644 --- a/src/Driver/BlockingFile.php +++ b/src/Driver/BlockingFile.php @@ -4,6 +4,7 @@ namespace Amp\File\Driver; use Amp\ByteStream\ClosedException; use Amp\ByteStream\StreamException; +use Amp\CancellationToken; use Amp\File\File; use Amp\Future; @@ -33,7 +34,7 @@ final class BlockingFile implements File } } - public function read(int $length = self::DEFAULT_READ_LENGTH): ?string + public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string { if ($this->handle === null) { throw new ClosedException("The file '{$this->path}' has been closed"); @@ -170,13 +171,13 @@ final class BlockingFile implements File return \ftell($this->handle); } - public function eof(): bool + public function atEnd(): bool { if ($this->handle === null) { throw new ClosedException("The file '{$this->path}' has been closed"); } - return \feof($this->handle); + return \fatEnd($this->handle); } public function getPath(): string diff --git a/src/Driver/EioDriver.php b/src/Driver/EioDriver.php index 6ebf4c4..7221c4d 100644 --- a/src/Driver/EioDriver.php +++ b/src/Driver/EioDriver.php @@ -7,7 +7,6 @@ use Amp\File\Driver; use Amp\File\File; use Amp\File\FilesystemException; use Amp\File\Internal; -use Revolt\EventLoop; use Revolt\EventLoop\Driver as EventLoopDriver; final class EioDriver implements Driver diff --git a/src/Driver/EioFile.php b/src/Driver/EioFile.php index 1f7624a..9c7f193 100644 --- a/src/Driver/EioFile.php +++ b/src/Driver/EioFile.php @@ -4,12 +4,13 @@ namespace Amp\File\Driver; use Amp\ByteStream\ClosedException; use Amp\ByteStream\StreamException; +use Amp\CancellationToken; use Amp\Deferred; use Amp\File\File; use Amp\File\Internal; use Amp\File\PendingOperationError; use Amp\Future; -use function Amp\coroutine; +use function Amp\launch; final class EioFile implements File { @@ -46,7 +47,7 @@ final class EioFile implements File $this->queue = new \SplQueue; } - public function read(int $length = self::DEFAULT_READ_LENGTH): ?string + public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string { if ($this->isActive) { throw new PendingOperationError; @@ -63,6 +64,10 @@ final class EioFile implements File $onRead = function (Deferred $deferred, $result, $req): void { $this->isActive = false; + if ($deferred->isComplete()) { + return; + } + if ($result === -1) { $error = \eio_get_last_error($req); if ($error === "Bad file descriptor") { @@ -76,7 +81,7 @@ final class EioFile implements File } }; - \eio_read( + $request = \eio_read( $this->fh, $length, $this->position, @@ -85,9 +90,16 @@ final class EioFile implements File $deferred ); + $id = $token?->subscribe(function (\Throwable $exception) use ($request, $deferred): void { + $this->isActive = false; + $deferred->error($exception); + \eio_cancel($request); + }); + try { return $deferred->getFuture()->await(); } finally { + $token?->unsubscribe($id); $this->poll->done(); } } @@ -108,7 +120,7 @@ final class EioFile implements File $future = $this->push($data); } else { $future = $this->queue->top(); - $future = coroutine(function () use ($future, $data): void { + $future = launch(function () use ($future, $data): void { $future->await(); $this->push($data)->await(); }); @@ -121,7 +133,7 @@ final class EioFile implements File public function end(string $data = ""): Future { - return coroutine(function () use ($data): void { + return launch(function () use ($data): void { try { $future = $this->write($data); $this->writable = false; @@ -171,7 +183,7 @@ final class EioFile implements File $future = $this->trim($size); } else { $future = $this->queue->top(); - $future = coroutine(function () use ($future, $size): void { + $future = launch(function () use ($future, $size): void { $future->await(); $this->trim($size)->await(); }); @@ -210,7 +222,7 @@ final class EioFile implements File return $this->position; } - public function eof(): bool + public function atEnd(): bool { return $this->queue->isEmpty() && $this->size <= $this->position; } diff --git a/src/Driver/ParallelDriver.php b/src/Driver/ParallelDriver.php index 55809c0..2acfcfa 100644 --- a/src/Driver/ParallelDriver.php +++ b/src/Driver/ParallelDriver.php @@ -2,16 +2,17 @@ namespace Amp\File\Driver; +use Amp\CancellationToken; use Amp\File\Driver; use Amp\File\File; use Amp\File\FilesystemException; use Amp\File\Internal; use Amp\Future; use Amp\Parallel\Worker\Pool; -use Amp\Parallel\Worker\Worker; use Amp\Parallel\Worker\TaskFailureThrowable; +use Amp\Parallel\Worker\Worker; use Amp\Parallel\Worker\WorkerException; -use function Amp\coroutine; +use function Amp\launch; use function Amp\Parallel\Worker\pool; final class ParallelDriver implements Driver @@ -69,7 +70,7 @@ final class ParallelDriver implements Driver $this->pendingWorker->await(); // Wait for any currently pending request for a worker. if ($this->workerStorage->count() < $this->workerLimit) { - $this->pendingWorker = coroutine(fn() => $this->pool->getWorker()); + $this->pendingWorker = launch(fn() => $this->pool->getWorker()); $worker = $this->pendingWorker->await(); if ($this->workerStorage->contains($worker)) { diff --git a/src/Driver/ParallelFile.php b/src/Driver/ParallelFile.php index dfa5c17..f11031c 100644 --- a/src/Driver/ParallelFile.php +++ b/src/Driver/ParallelFile.php @@ -4,15 +4,17 @@ namespace Amp\File\Driver; use Amp\ByteStream\ClosedException; use Amp\ByteStream\StreamException; +use Amp\CancellationToken; use Amp\File\File; use Amp\File\Internal; use Amp\File\PendingOperationError; use Amp\Future; use Amp\Parallel\Worker\TaskException; +use Amp\Parallel\Worker\TaskFailureException; use Amp\Parallel\Worker\Worker; use Amp\Parallel\Worker\WorkerException; use Revolt\EventLoop; -use function Amp\coroutine; +use function Amp\launch; final class ParallelFile implements File { @@ -77,7 +79,7 @@ final class ParallelFile implements File $this->writable = false; - $this->closing = coroutine(function (): void { + $this->closing = launch(function (): void { $id = $this->id; $this->id = null; $this->worker->enqueue(new Internal\FileTask('fclose', [], $id)); @@ -116,12 +118,12 @@ final class ParallelFile implements File } } - public function eof(): bool + public function atEnd(): bool { return $this->pendingWrites === 0 && $this->size <= $this->position; } - public function read(int $length = self::DEFAULT_READ_LENGTH): ?string + public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string { if ($this->id === null) { throw new ClosedException("The file has been closed"); @@ -134,12 +136,12 @@ final class ParallelFile implements File $this->busy = true; try { - $data = $this->worker->enqueue(new Internal\FileTask('fread', [$length], $this->id)); + $data = $this->worker->enqueue(new Internal\FileTask('fread', [null, $length], $this->id), $token); if ($data !== null) { $this->position += \strlen($data); } - } catch (TaskException $exception) { + } catch (TaskFailureException $exception) { throw new StreamException("Reading from the file failed", 0, $exception); } catch (WorkerException $exception) { throw new StreamException("Sending the task to the worker failed", 0, $exception); @@ -167,7 +169,7 @@ final class ParallelFile implements File ++$this->pendingWrites; $this->busy = true; - return coroutine(function () use ($data): void { + return launch(function () use ($data): void { try { $this->worker->enqueue(new Internal\FileTask('fwrite', [$data], $this->id)); $this->position += \strlen($data); @@ -185,7 +187,7 @@ final class ParallelFile implements File public function end(string $data = ""): Future { - return coroutine(function () use ($data): void { + return launch(function () use ($data): void { try { $future = $this->write($data); $this->writable = false; diff --git a/src/Driver/StatusCachingFile.php b/src/Driver/StatusCachingFile.php index 5afea35..4bb6dc9 100644 --- a/src/Driver/StatusCachingFile.php +++ b/src/Driver/StatusCachingFile.php @@ -2,9 +2,10 @@ namespace Amp\File\Driver; +use Amp\CancellationToken; use Amp\File\File; use Amp\Future; -use function Amp\coroutine; +use function Amp\launch; final class StatusCachingFile implements File { @@ -25,14 +26,14 @@ final class StatusCachingFile implements File $this->invalidateCallback = $invalidateCallback; } - public function read(int $length = self::DEFAULT_READ_LENGTH): ?string + public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string { - return $this->file->read($length); + return $this->file->read($token, $length); } public function write(string $data): Future { - return coroutine(function () use ($data): void { + return launch(function () use ($data): void { try { $this->file->write($data)->await(); } finally { @@ -43,7 +44,7 @@ final class StatusCachingFile implements File public function end(string $data = ""): Future { - return coroutine(function () use ($data): void { + return launch(function () use ($data): void { try { $this->file->end($data)->await(); } finally { @@ -67,9 +68,9 @@ final class StatusCachingFile implements File return $this->file->tell(); } - public function eof(): bool + public function atEnd(): bool { - return $this->file->eof(); + return $this->file->atEnd(); } public function getPath(): string diff --git a/src/Driver/UvDriver.php b/src/Driver/UvDriver.php index cd18ba3..7605f54 100644 --- a/src/Driver/UvDriver.php +++ b/src/Driver/UvDriver.php @@ -7,7 +7,6 @@ use Amp\File\Driver; use Amp\File\File; use Amp\File\FilesystemException; use Amp\File\Internal; -use Amp\Future; use Revolt\EventLoop\Driver as EventLoopDriver; use Revolt\EventLoop\Driver\UvDriver as UvLoopDriver; diff --git a/src/Driver/UvFile.php b/src/Driver/UvFile.php index c7f1a1f..3c67cac 100644 --- a/src/Driver/UvFile.php +++ b/src/Driver/UvFile.php @@ -4,13 +4,14 @@ namespace Amp\File\Driver; use Amp\ByteStream\ClosedException; use Amp\ByteStream\StreamException; +use Amp\CancellationToken; use Amp\Deferred; use Amp\File\File; use Amp\File\Internal; use Amp\File\PendingOperationError; use Amp\Future; use Revolt\EventLoop\Driver\UvDriver as UvLoopDriver; -use function Amp\coroutine; +use function Amp\launch; final class UvFile implements File { @@ -70,7 +71,7 @@ final class UvFile implements File $this->priorVersion = \version_compare(\phpversion('uv'), '0.3.0', '<'); } - public function read(int $length = self::DEFAULT_READ_LENGTH): ?string + public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string { if ($this->isActive) { throw new PendingOperationError; @@ -84,6 +85,10 @@ final class UvFile implements File $onRead = function ($result, $buffer) use ($deferred): void { $this->isActive = false; + if ($deferred->isComplete()) { + return; + } + if (\is_int($buffer)) { $error = \uv_strerror($buffer); if ($error === "bad file descriptor") { @@ -111,9 +116,15 @@ final class UvFile implements File \uv_fs_read($this->loop, $this->fh, $this->position, $length, $onRead); + $id = $token?->subscribe(function (\Throwable $exception) use ($deferred): void { + $this->isActive = false; + $deferred->error($exception); + }); + try { return $deferred->getFuture()->await(); } finally { + $token?->unsubscribe($id); $this->poll->done(); } } @@ -134,7 +145,7 @@ final class UvFile implements File $future = $this->push($data); } else { $future = $this->queue->top(); - $future = coroutine(function () use ($future, $data): void { + $future = launch(function () use ($future, $data): void { $future->await(); $this->push($data)->await(); }); @@ -147,7 +158,7 @@ final class UvFile implements File public function end(string $data = ""): Future { - return coroutine(function () use ($data): void { + return launch(function () use ($data): void { try { $future = $this->write($data); $this->writable = false; @@ -174,7 +185,7 @@ final class UvFile implements File $future = $this->trim($size); } else { $future = $this->queue->top(); - $future = coroutine(function () use ($future, $size): void { + $future = launch(function () use ($future, $size): void { $future->await(); $this->trim($size)->await(); }); @@ -214,7 +225,7 @@ final class UvFile implements File return $this->position; } - public function eof(): bool + public function atEnd(): bool { return $this->queue->isEmpty() && $this->size <= $this->position; } diff --git a/src/File.php b/src/File.php index f6be395..83e2d52 100644 --- a/src/File.php +++ b/src/File.php @@ -2,78 +2,25 @@ namespace Amp\File; +use Amp\ByteStream\ClosableStream; use Amp\ByteStream\InputStream; use Amp\ByteStream\OutputStream; -use Amp\Future; +use Amp\ByteStream\SeekableStream; +use Amp\CancellationToken; -interface File extends InputStream, OutputStream +interface File extends InputStream, OutputStream, ClosableStream, SeekableStream { public const DEFAULT_READ_LENGTH = 8192; - public const SEEK_SET = \SEEK_SET; - public const SEEK_CUR = \SEEK_CUR; - public const SEEK_END = \SEEK_END; - /** * Read $length bytes from the open file handle. * + * @param CancellationToken|null $token * @param int $length * * @return string|null */ - public function read(int $length = self::DEFAULT_READ_LENGTH): ?string; - - /** - * Write $data to the open file handle. - * - * @param string $data - */ - public function write(string $data): Future; - - /** - * Write $data to the open file handle and close the handle once the write completes. - * - * @param string $data - */ - public function end(string $data = ""): Future; - - /** - * Close the file handle. - * - * Applications are not required to manually close handles -- they will - * be unloaded automatically when the object is garbage collected. - */ - public function close(): void; - - /** - * Set the handle's internal pointer position. - * - * $whence values: - * - * SEEK_SET - Set position equal to offset bytes. - * SEEK_CUR - Set position to current location plus offset. - * SEEK_END - Set position to end-of-file plus offset. - * - * @param int $position - * @param int $whence - * - * @return int New offset position. - */ - public function seek(int $position, int $whence = self::SEEK_SET): int; - - /** - * Return the current internal offset position of the file handle. - * - * @return int - */ - public function tell(): int; - - /** - * Test for "end-of-file" on the file handle. - * - * @return bool - */ - public function eof(): bool; + public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string; /** * Retrieve the path used when opening the file handle. diff --git a/src/Internal/FileTask.php b/src/Internal/FileTask.php index cd8e63f..c53c440 100644 --- a/src/Internal/FileTask.php +++ b/src/Internal/FileTask.php @@ -5,6 +5,7 @@ namespace Amp\File\Internal; use Amp\CancellationToken; use Amp\File\Driver\BlockingDriver; use Amp\File\Driver\BlockingFile; +use Amp\File\File; use Amp\File\FilesystemException; use Amp\Parallel\Worker\Environment; use Amp\Parallel\Worker\Task; @@ -109,10 +110,14 @@ final class FileTask implements Task switch ($this->operation) { case "fread": + \array_shift($this->args); + return $file->read($token, ...$this->args); case "fwrite": + return $file->write(...$this->args)->await(); case "fseek": + return $file->seek(...$this->args); case "ftruncate": - return ([$file, \substr($this->operation, 1)])(...$this->args); + return $file->truncate(...$this->args); case "fclose": $environment->delete($id); diff --git a/src/PendingOperationError.php b/src/PendingOperationError.php index e6e00c2..9a04683 100644 --- a/src/PendingOperationError.php +++ b/src/PendingOperationError.php @@ -6,9 +6,8 @@ final class PendingOperationError extends \Error { public function __construct( string $message = "The previous file operation must complete before another can be started", - int $code = 0, - \Throwable $previous = null + ?\Throwable $previous = null ) { - parent::__construct($message, $code, $previous); + parent::__construct($message, $previous); } } diff --git a/test/AsyncFileTest.php b/test/AsyncFileTest.php index 97327ff..eb3ac01 100644 --- a/test/AsyncFileTest.php +++ b/test/AsyncFileTest.php @@ -2,9 +2,12 @@ namespace Amp\File\Test; +use Amp\CancellationTokenSource; +use Amp\CancelledException; use Amp\File; use Amp\File\PendingOperationError; -use function Amp\coroutine; +use Amp\TimeoutCancellationToken; +use function Amp\launch; abstract class AsyncFileTest extends FileTest { @@ -14,8 +17,8 @@ abstract class AsyncFileTest extends FileTest $handle = $this->driver->openFile(__FILE__, "r"); - $promise1 = coroutine(fn() => $handle->read(20)); - $promise2 = coroutine(fn() => $handle->read(20)); + $promise1 = launch(fn() => $handle->read(length: 20)); + $promise2 = launch(fn() => $handle->read(length: 20)); $expected = \substr(File\read(__FILE__), 0, 20); $this->assertSame($expected, $promise1->await()); @@ -29,8 +32,8 @@ abstract class AsyncFileTest extends FileTest $handle = $this->driver->openFile(__FILE__, "r"); - $promise1 = coroutine(fn() => $handle->read(10)); - $promise2 = coroutine(fn() => $handle->read(0)); + $promise1 = launch(fn() => $handle->read(length: 10)); + $promise2 = launch(fn() => $handle->read(length: 0)); $expected = \substr(File\read(__FILE__), 0, 10); $this->assertSame($expected, $promise1->await()); @@ -49,7 +52,7 @@ abstract class AsyncFileTest extends FileTest $data = "test"; $promise1 = $handle->write($data); - $promise2 = coroutine(fn() => $handle->read(10)); + $promise2 = launch(fn() => $handle->read(length: 10)); $this->assertNull($promise1->await()); @@ -64,11 +67,32 @@ abstract class AsyncFileTest extends FileTest $handle = $this->driver->openFile($path, "c+"); - $promise1 = coroutine(fn() => $handle->read(10)); + $promise1 = launch(fn() => $handle->read(length: 10)); $promise2 = $handle->write("test"); $this->assertNull($promise1->await()); $promise2->await(); } + + public function testCancelReadThenReadAgain() + { + $path = Fixture::path() . "/temp"; + + $handle = $this->driver->openFile($path, "c+"); + + $tokenSource = new CancellationTokenSource(); + $tokenSource->cancel(); + + $handle->write("test")->await(); + $handle->seek(0); + + try { + $handle->read(token: $tokenSource->getToken(), length: 2); + $handle->seek(0); // If the read succeeds (e.g.: ParallelFile), we need to seek back to 0. + } catch (CancelledException) { + } + + $this->assertSame("test", $handle->read()); + } } diff --git a/test/FileTest.php b/test/FileTest.php index 46c8703..73bdc07 100644 --- a/test/FileTest.php +++ b/test/FileTest.php @@ -20,7 +20,7 @@ abstract class FileTest extends FilesystemTest $handle->seek(0); $contents = $handle->read(); $this->assertSame(6, $handle->tell()); - $this->assertTrue($handle->eof()); + $this->assertTrue($handle->atEnd()); $this->assertSame("foobar", $contents); $handle->close(); @@ -85,7 +85,7 @@ abstract class FileTest extends FilesystemTest $this->assertSame("barfoobaz", $handle->read()); } - public function testReadingToEof() + public function testReadingToatEnd() { $handle = $this->driver->openFile(__FILE__, "r"); $contents = ""; @@ -94,8 +94,8 @@ abstract class FileTest extends FilesystemTest $stat = $this->driver->getStatus(__FILE__); $chunkSize = (int) \floor(($stat["size"] / 5)); - while (!$handle->eof()) { - $chunk = $handle->read($chunkSize); + while (!$handle->atEnd()) { + $chunk = $handle->read(length: $chunkSize); $contents .= $chunk; $position += \strlen($chunk ?? ''); $this->assertSame($position, $handle->tell()); @@ -112,8 +112,8 @@ abstract class FileTest extends FilesystemTest $handle = $this->driver->openFile(__FILE__, "r"); $contents = ""; - $contents .= $handle->read(10); - $contents .= $handle->read(10); + $contents .= $handle->read(length: 10); + $contents .= $handle->read(length: 10); $expected = \substr($this->driver->read(__FILE__), 0, 20); $this->assertSame($expected, $contents); @@ -127,7 +127,7 @@ abstract class FileTest extends FilesystemTest $this->assertSame(0, $handle->tell()); $handle->seek(10); $this->assertSame(10, $handle->tell()); - $chunk = $handle->read(90); + $chunk = $handle->read(length: 90); $this->assertSame(100, $handle->tell()); $expected = \substr($this->driver->read(__FILE__), 10, 90); $this->assertSame($expected, $chunk); @@ -139,8 +139,9 @@ abstract class FileTest extends FilesystemTest { $this->expectException(\Error::class); + $handle = $this->driver->openFile(__FILE__, "r"); + try { - $handle = $this->driver->openFile(__FILE__, "r"); $handle->seek(0, 99999); } finally { $handle->close(); @@ -204,7 +205,7 @@ abstract class FileTest extends FilesystemTest $handle->truncate(4); $handle->seek(0); $contents = $handle->read(); - $this->assertTrue($handle->eof()); + $this->assertTrue($handle->atEnd()); $this->assertSame("foob", $contents); $handle->write("bar")->await(); @@ -229,7 +230,7 @@ abstract class FileTest extends FilesystemTest $this->assertSame(3, $handle->tell()); $handle->seek(0); $contents = $handle->read(); - $this->assertTrue($handle->eof()); + $this->assertTrue($handle->atEnd()); $this->assertSame("foo\0\0\0", $contents); $handle->write("bar")->await();