1
0
mirror of https://github.com/danog/file.git synced 2025-01-22 21:31:15 +01:00

Update for stream changes and coroutine to launch rename

This commit is contained in:
Aaron Piotrowski 2021-11-21 13:12:23 -06:00
parent 7d31b42f82
commit b2b263092e
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
13 changed files with 118 additions and 116 deletions

View File

@ -4,6 +4,7 @@ namespace Amp\File\Driver;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\StreamException;
use Amp\CancellationToken;
use Amp\File\File;
use Amp\Future;
@ -33,7 +34,7 @@ final class BlockingFile implements File
}
}
public function read(int $length = self::DEFAULT_READ_LENGTH): ?string
public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
@ -170,13 +171,13 @@ final class BlockingFile implements File
return \ftell($this->handle);
}
public function eof(): bool
public function atEnd(): bool
{
if ($this->handle === null) {
throw new ClosedException("The file '{$this->path}' has been closed");
}
return \feof($this->handle);
return \fatEnd($this->handle);
}
public function getPath(): string

View File

@ -7,7 +7,6 @@ use Amp\File\Driver;
use Amp\File\File;
use Amp\File\FilesystemException;
use Amp\File\Internal;
use Revolt\EventLoop;
use Revolt\EventLoop\Driver as EventLoopDriver;
final class EioDriver implements Driver

View File

@ -4,12 +4,13 @@ namespace Amp\File\Driver;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\StreamException;
use Amp\CancellationToken;
use Amp\Deferred;
use Amp\File\File;
use Amp\File\Internal;
use Amp\File\PendingOperationError;
use Amp\Future;
use function Amp\coroutine;
use function Amp\launch;
final class EioFile implements File
{
@ -46,7 +47,7 @@ final class EioFile implements File
$this->queue = new \SplQueue;
}
public function read(int $length = self::DEFAULT_READ_LENGTH): ?string
public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->isActive) {
throw new PendingOperationError;
@ -63,6 +64,10 @@ final class EioFile implements File
$onRead = function (Deferred $deferred, $result, $req): void {
$this->isActive = false;
if ($deferred->isComplete()) {
return;
}
if ($result === -1) {
$error = \eio_get_last_error($req);
if ($error === "Bad file descriptor") {
@ -76,7 +81,7 @@ final class EioFile implements File
}
};
\eio_read(
$request = \eio_read(
$this->fh,
$length,
$this->position,
@ -85,9 +90,16 @@ final class EioFile implements File
$deferred
);
$id = $token?->subscribe(function (\Throwable $exception) use ($request, $deferred): void {
$this->isActive = false;
$deferred->error($exception);
\eio_cancel($request);
});
try {
return $deferred->getFuture()->await();
} finally {
$token?->unsubscribe($id);
$this->poll->done();
}
}
@ -108,7 +120,7 @@ final class EioFile implements File
$future = $this->push($data);
} else {
$future = $this->queue->top();
$future = coroutine(function () use ($future, $data): void {
$future = launch(function () use ($future, $data): void {
$future->await();
$this->push($data)->await();
});
@ -121,7 +133,7 @@ final class EioFile implements File
public function end(string $data = ""): Future
{
return coroutine(function () use ($data): void {
return launch(function () use ($data): void {
try {
$future = $this->write($data);
$this->writable = false;
@ -171,7 +183,7 @@ final class EioFile implements File
$future = $this->trim($size);
} else {
$future = $this->queue->top();
$future = coroutine(function () use ($future, $size): void {
$future = launch(function () use ($future, $size): void {
$future->await();
$this->trim($size)->await();
});
@ -210,7 +222,7 @@ final class EioFile implements File
return $this->position;
}
public function eof(): bool
public function atEnd(): bool
{
return $this->queue->isEmpty() && $this->size <= $this->position;
}

View File

@ -2,16 +2,17 @@
namespace Amp\File\Driver;
use Amp\CancellationToken;
use Amp\File\Driver;
use Amp\File\File;
use Amp\File\FilesystemException;
use Amp\File\Internal;
use Amp\Future;
use Amp\Parallel\Worker\Pool;
use Amp\Parallel\Worker\Worker;
use Amp\Parallel\Worker\TaskFailureThrowable;
use Amp\Parallel\Worker\Worker;
use Amp\Parallel\Worker\WorkerException;
use function Amp\coroutine;
use function Amp\launch;
use function Amp\Parallel\Worker\pool;
final class ParallelDriver implements Driver
@ -69,7 +70,7 @@ final class ParallelDriver implements Driver
$this->pendingWorker->await(); // Wait for any currently pending request for a worker.
if ($this->workerStorage->count() < $this->workerLimit) {
$this->pendingWorker = coroutine(fn() => $this->pool->getWorker());
$this->pendingWorker = launch(fn() => $this->pool->getWorker());
$worker = $this->pendingWorker->await();
if ($this->workerStorage->contains($worker)) {

View File

@ -4,15 +4,17 @@ namespace Amp\File\Driver;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\StreamException;
use Amp\CancellationToken;
use Amp\File\File;
use Amp\File\Internal;
use Amp\File\PendingOperationError;
use Amp\Future;
use Amp\Parallel\Worker\TaskException;
use Amp\Parallel\Worker\TaskFailureException;
use Amp\Parallel\Worker\Worker;
use Amp\Parallel\Worker\WorkerException;
use Revolt\EventLoop;
use function Amp\coroutine;
use function Amp\launch;
final class ParallelFile implements File
{
@ -77,7 +79,7 @@ final class ParallelFile implements File
$this->writable = false;
$this->closing = coroutine(function (): void {
$this->closing = launch(function (): void {
$id = $this->id;
$this->id = null;
$this->worker->enqueue(new Internal\FileTask('fclose', [], $id));
@ -116,12 +118,12 @@ final class ParallelFile implements File
}
}
public function eof(): bool
public function atEnd(): bool
{
return $this->pendingWrites === 0 && $this->size <= $this->position;
}
public function read(int $length = self::DEFAULT_READ_LENGTH): ?string
public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->id === null) {
throw new ClosedException("The file has been closed");
@ -134,12 +136,12 @@ final class ParallelFile implements File
$this->busy = true;
try {
$data = $this->worker->enqueue(new Internal\FileTask('fread', [$length], $this->id));
$data = $this->worker->enqueue(new Internal\FileTask('fread', [null, $length], $this->id), $token);
if ($data !== null) {
$this->position += \strlen($data);
}
} catch (TaskException $exception) {
} catch (TaskFailureException $exception) {
throw new StreamException("Reading from the file failed", 0, $exception);
} catch (WorkerException $exception) {
throw new StreamException("Sending the task to the worker failed", 0, $exception);
@ -167,7 +169,7 @@ final class ParallelFile implements File
++$this->pendingWrites;
$this->busy = true;
return coroutine(function () use ($data): void {
return launch(function () use ($data): void {
try {
$this->worker->enqueue(new Internal\FileTask('fwrite', [$data], $this->id));
$this->position += \strlen($data);
@ -185,7 +187,7 @@ final class ParallelFile implements File
public function end(string $data = ""): Future
{
return coroutine(function () use ($data): void {
return launch(function () use ($data): void {
try {
$future = $this->write($data);
$this->writable = false;

View File

@ -2,9 +2,10 @@
namespace Amp\File\Driver;
use Amp\CancellationToken;
use Amp\File\File;
use Amp\Future;
use function Amp\coroutine;
use function Amp\launch;
final class StatusCachingFile implements File
{
@ -25,14 +26,14 @@ final class StatusCachingFile implements File
$this->invalidateCallback = $invalidateCallback;
}
public function read(int $length = self::DEFAULT_READ_LENGTH): ?string
public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
return $this->file->read($length);
return $this->file->read($token, $length);
}
public function write(string $data): Future
{
return coroutine(function () use ($data): void {
return launch(function () use ($data): void {
try {
$this->file->write($data)->await();
} finally {
@ -43,7 +44,7 @@ final class StatusCachingFile implements File
public function end(string $data = ""): Future
{
return coroutine(function () use ($data): void {
return launch(function () use ($data): void {
try {
$this->file->end($data)->await();
} finally {
@ -67,9 +68,9 @@ final class StatusCachingFile implements File
return $this->file->tell();
}
public function eof(): bool
public function atEnd(): bool
{
return $this->file->eof();
return $this->file->atEnd();
}
public function getPath(): string

View File

@ -7,7 +7,6 @@ use Amp\File\Driver;
use Amp\File\File;
use Amp\File\FilesystemException;
use Amp\File\Internal;
use Amp\Future;
use Revolt\EventLoop\Driver as EventLoopDriver;
use Revolt\EventLoop\Driver\UvDriver as UvLoopDriver;

View File

@ -4,13 +4,14 @@ namespace Amp\File\Driver;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\StreamException;
use Amp\CancellationToken;
use Amp\Deferred;
use Amp\File\File;
use Amp\File\Internal;
use Amp\File\PendingOperationError;
use Amp\Future;
use Revolt\EventLoop\Driver\UvDriver as UvLoopDriver;
use function Amp\coroutine;
use function Amp\launch;
final class UvFile implements File
{
@ -70,7 +71,7 @@ final class UvFile implements File
$this->priorVersion = \version_compare(\phpversion('uv'), '0.3.0', '<');
}
public function read(int $length = self::DEFAULT_READ_LENGTH): ?string
public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->isActive) {
throw new PendingOperationError;
@ -84,6 +85,10 @@ final class UvFile implements File
$onRead = function ($result, $buffer) use ($deferred): void {
$this->isActive = false;
if ($deferred->isComplete()) {
return;
}
if (\is_int($buffer)) {
$error = \uv_strerror($buffer);
if ($error === "bad file descriptor") {
@ -111,9 +116,15 @@ final class UvFile implements File
\uv_fs_read($this->loop, $this->fh, $this->position, $length, $onRead);
$id = $token?->subscribe(function (\Throwable $exception) use ($deferred): void {
$this->isActive = false;
$deferred->error($exception);
});
try {
return $deferred->getFuture()->await();
} finally {
$token?->unsubscribe($id);
$this->poll->done();
}
}
@ -134,7 +145,7 @@ final class UvFile implements File
$future = $this->push($data);
} else {
$future = $this->queue->top();
$future = coroutine(function () use ($future, $data): void {
$future = launch(function () use ($future, $data): void {
$future->await();
$this->push($data)->await();
});
@ -147,7 +158,7 @@ final class UvFile implements File
public function end(string $data = ""): Future
{
return coroutine(function () use ($data): void {
return launch(function () use ($data): void {
try {
$future = $this->write($data);
$this->writable = false;
@ -174,7 +185,7 @@ final class UvFile implements File
$future = $this->trim($size);
} else {
$future = $this->queue->top();
$future = coroutine(function () use ($future, $size): void {
$future = launch(function () use ($future, $size): void {
$future->await();
$this->trim($size)->await();
});
@ -214,7 +225,7 @@ final class UvFile implements File
return $this->position;
}
public function eof(): bool
public function atEnd(): bool
{
return $this->queue->isEmpty() && $this->size <= $this->position;
}

View File

@ -2,78 +2,25 @@
namespace Amp\File;
use Amp\ByteStream\ClosableStream;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Future;
use Amp\ByteStream\SeekableStream;
use Amp\CancellationToken;
interface File extends InputStream, OutputStream
interface File extends InputStream, OutputStream, ClosableStream, SeekableStream
{
public const DEFAULT_READ_LENGTH = 8192;
public const SEEK_SET = \SEEK_SET;
public const SEEK_CUR = \SEEK_CUR;
public const SEEK_END = \SEEK_END;
/**
* Read $length bytes from the open file handle.
*
* @param CancellationToken|null $token
* @param int $length
*
* @return string|null
*/
public function read(int $length = self::DEFAULT_READ_LENGTH): ?string;
/**
* Write $data to the open file handle.
*
* @param string $data
*/
public function write(string $data): Future;
/**
* Write $data to the open file handle and close the handle once the write completes.
*
* @param string $data
*/
public function end(string $data = ""): Future;
/**
* Close the file handle.
*
* Applications are not required to manually close handles -- they will
* be unloaded automatically when the object is garbage collected.
*/
public function close(): void;
/**
* Set the handle's internal pointer position.
*
* $whence values:
*
* SEEK_SET - Set position equal to offset bytes.
* SEEK_CUR - Set position to current location plus offset.
* SEEK_END - Set position to end-of-file plus offset.
*
* @param int $position
* @param int $whence
*
* @return int New offset position.
*/
public function seek(int $position, int $whence = self::SEEK_SET): int;
/**
* Return the current internal offset position of the file handle.
*
* @return int
*/
public function tell(): int;
/**
* Test for "end-of-file" on the file handle.
*
* @return bool
*/
public function eof(): bool;
public function read(?CancellationToken $token = null, int $length = self::DEFAULT_READ_LENGTH): ?string;
/**
* Retrieve the path used when opening the file handle.

View File

@ -5,6 +5,7 @@ namespace Amp\File\Internal;
use Amp\CancellationToken;
use Amp\File\Driver\BlockingDriver;
use Amp\File\Driver\BlockingFile;
use Amp\File\File;
use Amp\File\FilesystemException;
use Amp\Parallel\Worker\Environment;
use Amp\Parallel\Worker\Task;
@ -109,10 +110,14 @@ final class FileTask implements Task
switch ($this->operation) {
case "fread":
\array_shift($this->args);
return $file->read($token, ...$this->args);
case "fwrite":
return $file->write(...$this->args)->await();
case "fseek":
return $file->seek(...$this->args);
case "ftruncate":
return ([$file, \substr($this->operation, 1)])(...$this->args);
return $file->truncate(...$this->args);
case "fclose":
$environment->delete($id);

View File

@ -6,9 +6,8 @@ final class PendingOperationError extends \Error
{
public function __construct(
string $message = "The previous file operation must complete before another can be started",
int $code = 0,
\Throwable $previous = null
?\Throwable $previous = null
) {
parent::__construct($message, $code, $previous);
parent::__construct($message, $previous);
}
}

View File

@ -2,9 +2,12 @@
namespace Amp\File\Test;
use Amp\CancellationTokenSource;
use Amp\CancelledException;
use Amp\File;
use Amp\File\PendingOperationError;
use function Amp\coroutine;
use Amp\TimeoutCancellationToken;
use function Amp\launch;
abstract class AsyncFileTest extends FileTest
{
@ -14,8 +17,8 @@ abstract class AsyncFileTest extends FileTest
$handle = $this->driver->openFile(__FILE__, "r");
$promise1 = coroutine(fn() => $handle->read(20));
$promise2 = coroutine(fn() => $handle->read(20));
$promise1 = launch(fn() => $handle->read(length: 20));
$promise2 = launch(fn() => $handle->read(length: 20));
$expected = \substr(File\read(__FILE__), 0, 20);
$this->assertSame($expected, $promise1->await());
@ -29,8 +32,8 @@ abstract class AsyncFileTest extends FileTest
$handle = $this->driver->openFile(__FILE__, "r");
$promise1 = coroutine(fn() => $handle->read(10));
$promise2 = coroutine(fn() => $handle->read(0));
$promise1 = launch(fn() => $handle->read(length: 10));
$promise2 = launch(fn() => $handle->read(length: 0));
$expected = \substr(File\read(__FILE__), 0, 10);
$this->assertSame($expected, $promise1->await());
@ -49,7 +52,7 @@ abstract class AsyncFileTest extends FileTest
$data = "test";
$promise1 = $handle->write($data);
$promise2 = coroutine(fn() => $handle->read(10));
$promise2 = launch(fn() => $handle->read(length: 10));
$this->assertNull($promise1->await());
@ -64,11 +67,32 @@ abstract class AsyncFileTest extends FileTest
$handle = $this->driver->openFile($path, "c+");
$promise1 = coroutine(fn() => $handle->read(10));
$promise1 = launch(fn() => $handle->read(length: 10));
$promise2 = $handle->write("test");
$this->assertNull($promise1->await());
$promise2->await();
}
public function testCancelReadThenReadAgain()
{
$path = Fixture::path() . "/temp";
$handle = $this->driver->openFile($path, "c+");
$tokenSource = new CancellationTokenSource();
$tokenSource->cancel();
$handle->write("test")->await();
$handle->seek(0);
try {
$handle->read(token: $tokenSource->getToken(), length: 2);
$handle->seek(0); // If the read succeeds (e.g.: ParallelFile), we need to seek back to 0.
} catch (CancelledException) {
}
$this->assertSame("test", $handle->read());
}
}

View File

@ -20,7 +20,7 @@ abstract class FileTest extends FilesystemTest
$handle->seek(0);
$contents = $handle->read();
$this->assertSame(6, $handle->tell());
$this->assertTrue($handle->eof());
$this->assertTrue($handle->atEnd());
$this->assertSame("foobar", $contents);
$handle->close();
@ -85,7 +85,7 @@ abstract class FileTest extends FilesystemTest
$this->assertSame("barfoobaz", $handle->read());
}
public function testReadingToEof()
public function testReadingToatEnd()
{
$handle = $this->driver->openFile(__FILE__, "r");
$contents = "";
@ -94,8 +94,8 @@ abstract class FileTest extends FilesystemTest
$stat = $this->driver->getStatus(__FILE__);
$chunkSize = (int) \floor(($stat["size"] / 5));
while (!$handle->eof()) {
$chunk = $handle->read($chunkSize);
while (!$handle->atEnd()) {
$chunk = $handle->read(length: $chunkSize);
$contents .= $chunk;
$position += \strlen($chunk ?? '');
$this->assertSame($position, $handle->tell());
@ -112,8 +112,8 @@ abstract class FileTest extends FilesystemTest
$handle = $this->driver->openFile(__FILE__, "r");
$contents = "";
$contents .= $handle->read(10);
$contents .= $handle->read(10);
$contents .= $handle->read(length: 10);
$contents .= $handle->read(length: 10);
$expected = \substr($this->driver->read(__FILE__), 0, 20);
$this->assertSame($expected, $contents);
@ -127,7 +127,7 @@ abstract class FileTest extends FilesystemTest
$this->assertSame(0, $handle->tell());
$handle->seek(10);
$this->assertSame(10, $handle->tell());
$chunk = $handle->read(90);
$chunk = $handle->read(length: 90);
$this->assertSame(100, $handle->tell());
$expected = \substr($this->driver->read(__FILE__), 10, 90);
$this->assertSame($expected, $chunk);
@ -139,8 +139,9 @@ abstract class FileTest extends FilesystemTest
{
$this->expectException(\Error::class);
$handle = $this->driver->openFile(__FILE__, "r");
try {
$handle = $this->driver->openFile(__FILE__, "r");
$handle->seek(0, 99999);
} finally {
$handle->close();
@ -204,7 +205,7 @@ abstract class FileTest extends FilesystemTest
$handle->truncate(4);
$handle->seek(0);
$contents = $handle->read();
$this->assertTrue($handle->eof());
$this->assertTrue($handle->atEnd());
$this->assertSame("foob", $contents);
$handle->write("bar")->await();
@ -229,7 +230,7 @@ abstract class FileTest extends FilesystemTest
$this->assertSame(3, $handle->tell());
$handle->seek(0);
$contents = $handle->read();
$this->assertTrue($handle->eof());
$this->assertTrue($handle->atEnd());
$this->assertSame("foo\0\0\0", $contents);
$handle->write("bar")->await();