1
0
mirror of https://github.com/danog/file.git synced 2024-11-26 20:04:51 +01:00

Compatibility with iterable readable streams

This commit is contained in:
Aaron Piotrowski 2023-01-08 00:07:00 -06:00
parent f4bd0dff96
commit db002ce874
No known key found for this signature in database
GPG Key ID: 5B456E6AABA44A63
4 changed files with 41 additions and 20 deletions

View File

@ -3,13 +3,19 @@
namespace Amp\File\Driver; namespace Amp\File\Driver;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableStreamIteratorAggregate;
use Amp\ByteStream\StreamException; use Amp\ByteStream\StreamException;
use Amp\Cancellation; use Amp\Cancellation;
use Amp\DeferredFuture; use Amp\DeferredFuture;
use Amp\File\File; use Amp\File\File;
final class BlockingFile implements File /**
* @implements \IteratorAggregate<int, string>
*/
final class BlockingFile implements File, \IteratorAggregate
{ {
use ReadableStreamIteratorAggregate;
/** @var resource|null */ /** @var resource|null */
private $handle; private $handle;
private string $path; private string $path;

View File

@ -3,6 +3,7 @@
namespace Amp\File\Driver; namespace Amp\File\Driver;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableStreamIteratorAggregate;
use Amp\ByteStream\StreamException; use Amp\ByteStream\StreamException;
use Amp\Cancellation; use Amp\Cancellation;
use Amp\DeferredFuture; use Amp\DeferredFuture;
@ -15,8 +16,13 @@ use Amp\Parallel\Worker\WorkerException;
use Revolt\EventLoop; use Revolt\EventLoop;
use function Amp\async; use function Amp\async;
final class ParallelFile implements File /**
* @implements \IteratorAggregate<int, string>
*/
final class ParallelFile implements File, \IteratorAggregate
{ {
use ReadableStreamIteratorAggregate;
private readonly Internal\FileWorker $worker; private readonly Internal\FileWorker $worker;
private ?int $id; private ?int $id;

View File

@ -2,11 +2,17 @@
namespace Amp\File\Driver; namespace Amp\File\Driver;
use Amp\ByteStream\ReadableStreamIteratorAggregate;
use Amp\Cancellation; use Amp\Cancellation;
use Amp\File\File; use Amp\File\File;
final class StatusCachingFile implements File /**
* @implements \IteratorAggregate<int, string>
*/
final class StatusCachingFile implements File, \IteratorAggregate
{ {
use ReadableStreamIteratorAggregate;
private readonly File $file; private readonly File $file;
private readonly \Closure $invalidateCallback; private readonly \Closure $invalidateCallback;

View File

@ -3,14 +3,21 @@
namespace Amp\File\Internal; namespace Amp\File\Internal;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\ByteStream\ReadableStreamIteratorAggregate;
use Amp\Cancellation;
use Amp\File\File; use Amp\File\File;
use Amp\File\PendingOperationError; use Amp\File\PendingOperationError;
use Amp\Future; use Amp\Future;
use function Amp\async; use function Amp\async;
/** @internal */ /**
abstract class QueuedWritesFile implements File * @internal
* @implements \IteratorAggregate<int, string>
*/
abstract class QueuedWritesFile implements File, \IteratorAggregate
{ {
use ReadableStreamIteratorAggregate;
/** @var \SplQueue<Future<null>> */ /** @var \SplQueue<Future<null>> */
protected readonly \SplQueue $queue; protected readonly \SplQueue $queue;
@ -39,6 +46,11 @@ abstract class QueuedWritesFile implements File
async($this->close(...)); async($this->close(...));
} }
abstract public function read(
?Cancellation $cancellation = null,
int $length = self::DEFAULT_READ_LENGTH,
): ?string;
/** /**
* @return Future<null> * @return Future<null>
*/ */
@ -114,21 +126,12 @@ abstract class QueuedWritesFile implements File
throw new PendingOperationError; throw new PendingOperationError;
} }
switch ($whence) { return match ($whence) {
case self::SEEK_SET: self::SEEK_SET => $this->position = $position,
$this->position = $position; self::SEEK_CUR => $this->position += $position,
break; self::SEEK_END => $this->position = $this->size + $position,
case self::SEEK_CUR: default => throw new \Error("Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected"),
$this->position += $position; };
break;
case self::SEEK_END:
$this->position = $this->size + $position;
break;
default:
throw new \Error("Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected");
}
return $this->position;
} }
public function tell(): int public function tell(): int