1
0
mirror of https://github.com/danog/file.git synced 2024-11-30 04:19:39 +01:00

Improve pending op handling in UvFile and EioFile

This commit is contained in:
Aaron Piotrowski 2022-11-17 18:54:46 -06:00
parent c920711bc3
commit cce4422d0d
No known key found for this signature in database
GPG Key ID: 5B456E6AABA44A63
2 changed files with 50 additions and 77 deletions

View File

@ -29,7 +29,7 @@ final class EioFile implements File
private \SplQueue $queue;
private bool $isActive = false;
private bool $isReading = false;
private bool $writable = true;
@ -55,18 +55,16 @@ final class EioFile implements File
public function __destruct()
{
if (!$this->onClose->isComplete()) {
$this->onClose->complete();
}
async($this->close(...));
}
public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->isActive) {
if ($this->isReading || !$this->queue->isEmpty()) {
throw new PendingOperationError;
}
$this->isActive = true;
$this->isReading = true;
$remaining = $this->size - $this->position;
$length = \min($length, $remaining);
@ -75,7 +73,7 @@ final class EioFile implements File
$this->poll->listen();
$onRead = function (DeferredFuture $deferred, $result, $req): void {
$this->isActive = false;
$this->isReading = false;
if ($deferred->isComplete()) {
return;
@ -104,7 +102,7 @@ final class EioFile implements File
);
$id = $cancellation?->subscribe(function (\Throwable $exception) use ($request, $deferred): void {
$this->isActive = false;
$this->isReading = false;
$deferred->error($exception);
\eio_cancel($request);
});
@ -120,7 +118,7 @@ final class EioFile implements File
public function write(string $bytes): void
{
if ($this->isActive && $this->queue->isEmpty()) {
if ($this->isReading) {
throw new PendingOperationError;
}
@ -128,16 +126,12 @@ final class EioFile implements File
throw new ClosedException("The file is no longer writable");
}
$this->isActive = true;
if ($this->queue->isEmpty()) {
$future = $this->push($bytes);
$future = $this->push($bytes, $this->position);
} else {
$future = $this->queue->top();
$future = async(function () use ($future, $bytes): void {
$future->await();
$this->push($bytes)->await();
});
$position = $this->position;
/** @var Future $future */
$future = $this->queue->top()->map(fn () => $this->push($bytes, $position)->await());
}
$this->queue->push($future);
@ -185,7 +179,7 @@ final class EioFile implements File
public function truncate(int $size): void
{
if ($this->isActive && $this->queue->isEmpty()) {
if ($this->isReading) {
throw new PendingOperationError;
}
@ -193,16 +187,10 @@ final class EioFile implements File
throw new ClosedException("The file is no longer writable");
}
$this->isActive = true;
if ($this->queue->isEmpty()) {
$future = $this->trim($size);
} else {
$future = $this->queue->top();
$future = async(function () use ($future, $size): void {
$future->await();
$this->trim($size)->await();
});
$future = $this->queue->top()->map(fn () => $this->trim($size)->await());
}
$this->queue->push($future);
@ -212,7 +200,7 @@ final class EioFile implements File
public function seek(int $position, int $whence = \SEEK_SET): int
{
if ($this->isActive) {
if ($this->isReading) {
throw new PendingOperationError;
}
@ -253,26 +241,23 @@ final class EioFile implements File
return $this->mode;
}
private function push(string $data): Future
private function push(string $data, int $position): Future
{
$length = \strlen($data);
if ($length === 0) {
return Future::complete(null);
return Future::complete();
}
$deferred = new DeferredFuture;
$this->poll->listen();
$onWrite = function (DeferredFuture $deferred, $result, $req): void {
$onWrite = function (DeferredFuture $deferred, $result, $req) use ($position): void {
if ($this->queue->isEmpty()) {
$deferred->error(new ClosedException('No pending write, the file may have been closed'));
}
$this->queue->shift();
if ($this->queue->isEmpty()) {
$this->isActive = false;
}
if ($result === -1) {
$error = \eio_get_last_error($req);
@ -282,12 +267,13 @@ final class EioFile implements File
$deferred->error(new StreamException("Writing to the file failed: " . $error));
}
} else {
$this->position += $result;
if ($this->position > $this->size) {
$this->size = $this->position;
if ($this->position === $position) {
$this->position += $result;
}
$deferred->complete(null);
$this->size = \max($this->size, $position + $result);
$deferred->complete();
}
$this->poll->done();
@ -318,7 +304,7 @@ final class EioFile implements File
$this->queue->shift();
if ($this->queue->isEmpty()) {
$this->isActive = false;
$this->isReading = false;
}
if ($result === -1) {
@ -331,7 +317,7 @@ final class EioFile implements File
} else {
$this->size = $size;
$this->poll->done();
$deferred->complete(null);
$deferred->complete();
}
};

View File

@ -33,7 +33,7 @@ final class UvFile implements File
private \SplQueue $queue;
private bool $isActive = false;
private bool $isReading = false;
private bool $writable = true;
@ -79,17 +79,17 @@ final class UvFile implements File
public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
{
if ($this->isActive) {
if ($this->isReading || !$this->queue->isEmpty()) {
throw new PendingOperationError;
}
$deferred = new DeferredFuture;
$this->poll->listen();
$this->isActive = true;
$this->isReading = true;
$onRead = function ($result, $buffer) use ($deferred): void {
$this->isActive = false;
$this->isReading = false;
if ($deferred->isComplete()) {
return;
@ -123,7 +123,7 @@ final class UvFile implements File
\uv_fs_read($this->eventLoopHandle, $this->fh, $this->position, $length, $onRead);
$id = $cancellation?->subscribe(function (\Throwable $exception) use ($deferred): void {
$this->isActive = false;
$this->isReading = false;
$deferred->error($exception);
});
@ -138,7 +138,7 @@ final class UvFile implements File
public function write(string $bytes): void
{
if ($this->isActive && $this->queue->isEmpty()) {
if ($this->isReading) {
throw new PendingOperationError;
}
@ -146,16 +146,12 @@ final class UvFile implements File
throw new ClosedException("The file is no longer writable");
}
$this->isActive = true;
if ($this->queue->isEmpty()) {
$future = $this->push($bytes);
$future = $this->push($bytes, $this->position);
} else {
$future = $this->queue->top();
$future = async(function () use ($future, $bytes): void {
$future->await();
$this->push($bytes)->await();
});
$position = $this->position;
/** @var Future $future */
$future = $this->queue->top()->map(fn () => $this->push($bytes, $position)->await());
}
$this->queue->push($future);
@ -171,7 +167,7 @@ final class UvFile implements File
public function truncate(int $size): void
{
if ($this->isActive && $this->queue->isEmpty()) {
if ($this->isReading) {
throw new PendingOperationError;
}
@ -179,16 +175,10 @@ final class UvFile implements File
throw new ClosedException("The file is no longer writable");
}
$this->isActive = true;
if ($this->queue->isEmpty()) {
$future = $this->trim($size);
} else {
$future = $this->queue->top();
$future = async(function () use ($future, $size): void {
$future->await();
$this->trim($size)->await();
});
$future = $this->queue->top()->map(fn () => $this->trim($size)->await());
}
$this->queue->push($future);
@ -198,7 +188,7 @@ final class UvFile implements File
public function seek(int $position, int $whence = \SEEK_SET): int
{
if ($this->isActive) {
if ($this->isReading) {
throw new PendingOperationError;
}
@ -272,26 +262,23 @@ final class UvFile implements File
$this->onClose->getFuture()->finally($onClose);
}
private function push(string $data): Future
private function push(string $data, int $position): Future
{
$length = \strlen($data);
if ($length === 0) {
return Future::complete(null);
return Future::complete();
}
$deferred = new DeferredFuture;
$this->poll->listen();
$onWrite = function ($fh, $result) use ($deferred, $length): void {
$onWrite = function ($fh, $result) use ($deferred, $length, $position): void {
if ($this->queue->isEmpty()) {
$deferred->error(new ClosedException('No pending write, the file may have been closed'));
}
$this->queue->shift();
if ($this->queue->isEmpty()) {
$this->isActive = false;
}
if ($result < 0) {
$error = \uv_strerror($result);
@ -301,16 +288,19 @@ final class UvFile implements File
$deferred->error(new StreamException("Writing to the file failed: " . $error));
}
} else {
$this->position += $length;
if ($this->position > $this->size) {
$this->size = $this->position;
if ($this->position === $position) {
$this->position += $length;
}
$deferred->complete(null);
$this->poll->done();
$this->size = \max($this->size, $position + $length);
$deferred->complete();
}
$this->poll->done();
};
\uv_fs_write($this->eventLoopHandle, $this->fh, $data, $this->position, $onWrite);
\uv_fs_write($this->eventLoopHandle, $this->fh, $data, $position, $onWrite);
return $deferred->getFuture();
}
@ -326,12 +316,9 @@ final class UvFile implements File
}
$this->queue->shift();
if ($this->queue->isEmpty()) {
$this->isActive = false;
}
$this->size = $size;
$deferred->complete(null);
$deferred->complete();
$this->poll->done();
};