From 8cfe851cd2ee55ecd2cfc16710ad883808a4f5ac Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 28 Oct 2018 23:55:11 -0500 Subject: [PATCH] Add truncate() to Handle Addresses #32. --- lib/BlockingHandle.php | 16 ++++ lib/EioHandle.php | 163 ++++++++++++++++++++++++++------------ lib/Handle.php | 10 +++ lib/Internal/FileTask.php | 1 + lib/ParallelHandle.php | 150 ++++++++++++++++++++--------------- lib/UvHandle.php | 57 +++++++++++++ test/HandleTest.php | 57 +++++++++++++ 7 files changed, 341 insertions(+), 113 deletions(-) diff --git a/lib/BlockingHandle.php b/lib/BlockingHandle.php index b8743a4..ba72e39 100644 --- a/lib/BlockingHandle.php +++ b/lib/BlockingHandle.php @@ -110,6 +110,22 @@ class BlockingHandle implements Handle )); } + /** + * {@inheritdoc} + */ + public function truncate(int $size): Promise + { + if ($this->fh === null) { + throw new ClosedException("The file has been closed"); + } + + if (!@\ftruncate($this->fh, $size)) { + return new Failure(new StreamException("Could not truncate file")); + } + + return new Success; + } + /** * {@inheritdoc} */ diff --git a/lib/EioHandle.php b/lib/EioHandle.php index 144bfa8..a4e848a 100644 --- a/lib/EioHandle.php +++ b/lib/EioHandle.php @@ -70,35 +70,34 @@ class EioHandle implements Handle $deferred = new Deferred; $this->poll->listen($deferred->promise()); + $onRead = function (Deferred $deferred, $result, $req) { + $this->isActive = false; + + if ($result === -1) { + $error = \eio_get_last_error($req); + if ($error === "Bad file descriptor") { + $deferred->fail(new ClosedException("Reading from the file failed due to a closed handle")); + } else { + $deferred->fail(new StreamException("Reading from the file failed:" . $error)); + } + } else { + $this->position += \strlen($result); + $deferred->resolve(\strlen($result) ? $result : null); + } + }; + \eio_read( $this->fh, $length, $this->position, \EIO_PRI_DEFAULT, - [$this, "onRead"], + $onRead, $deferred ); return $deferred->promise(); } - private function onRead(Deferred $deferred, $result, $req) - { - $this->isActive = false; - - if ($result === -1) { - $error = \eio_get_last_error($req); - if ($error === "Bad file descriptor") { - $deferred->fail(new ClosedException("Reading from the file failed due to a closed handle")); - } else { - $deferred->fail(new StreamException("Reading from the file failed:" . $error)); - } - } else { - $this->position += \strlen($result); - $deferred->resolve(\strlen($result) ? $result : null); - } - } - /** * {@inheritdoc} */ @@ -136,13 +135,40 @@ class EioHandle implements Handle $deferred = new Deferred; $this->poll->listen($deferred->promise()); + $onWrite = function (Deferred $deferred, $result, $req) { + if ($this->queue->isEmpty()) { + $deferred->fail(new ClosedException('No pending write, the file may have been closed')); + } + + $this->queue->shift(); + if ($this->queue->isEmpty()) { + $this->isActive = false; + } + + if ($result === -1) { + $error = \eio_get_last_error($req); + if ($error === "Bad file descriptor") { + $deferred->fail(new ClosedException("Writing to the file failed due to a closed handle")); + } else { + $deferred->fail(new StreamException("Writing to the file failed: " . $error)); + } + } else { + $this->position += $result; + if ($this->position > $this->size) { + $this->size = $this->position; + } + + $deferred->resolve($result); + } + }; + \eio_write( $this->fh, $data, $length, $this->position, \EIO_PRI_DEFAULT, - [$this, "onWrite"], + $onWrite, $deferred ); @@ -165,34 +191,6 @@ class EioHandle implements Handle }); } - private function onWrite(Deferred $deferred, $result, $req) - { - if ($this->queue->isEmpty()) { - $deferred->fail(new ClosedException('No pending write, the file may have been closed')); - } - - $this->queue->shift(); - if ($this->queue->isEmpty()) { - $this->isActive = false; - } - - if ($result === -1) { - $error = \eio_get_last_error($req); - if ($error === "Bad file descriptor") { - $deferred->fail(new ClosedException("Writing to the file failed due to a closed handle")); - } else { - $deferred->fail(new StreamException("Writing to the file failed: " . $error)); - } - } else { - $this->position += $result; - if ($this->position > $this->size) { - $this->size = $this->position; - } - - $deferred->resolve($result); - } - } - /** * {@inheritdoc} */ @@ -205,15 +203,78 @@ class EioHandle implements Handle $deferred = new Deferred; $this->poll->listen($this->closing = $deferred->promise()); - \eio_close($this->fh, \EIO_PRI_DEFAULT, [$this, "onClose"], $deferred); + \eio_close($this->fh, \EIO_PRI_DEFAULT, function (Deferred $deferred) { + // Ignore errors when closing file, as the handle will become invalid anyway. + $deferred->resolve(); + }, $deferred); return $deferred->promise(); } - private function onClose(Deferred $deferred, $result, $req) + public function truncate(int $size): Promise { - // Ignore errors when closing file, as the handle will become invalid anyway. - $deferred->resolve(); + if ($this->isActive && $this->queue->isEmpty()) { + throw new PendingOperationError; + } + + if (!$this->writable) { + throw new ClosedException("The file is no longer writable"); + } + + $this->isActive = true; + + if ($this->queue->isEmpty()) { + $promise = $this->trim($size); + } else { + $promise = $this->queue->top(); + $promise = call(function () use ($promise, $size) { + yield $promise; + return yield $this->trim($size); + }); + } + + $this->queue->push($promise); + + return $promise; + } + + private function trim(int $size): Promise + { + $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + + $onTruncate = function (Deferred $deferred, $result, $req) use ($size) { + if ($this->queue->isEmpty()) { + $deferred->fail(new ClosedException('No pending write, the file may have been closed')); + } + + $this->queue->shift(); + if ($this->queue->isEmpty()) { + $this->isActive = false; + } + + if ($result === -1) { + $error = \eio_get_last_error($req); + if ($error === "Bad file descriptor") { + $deferred->fail(new ClosedException("Truncating the file failed due to a closed handle")); + } else { + $deferred->fail(new StreamException("Truncating the file failed: " . $error)); + } + } else { + $this->size = $size; + $deferred->resolve(); + } + }; + + \eio_ftruncate( + $this->fh, + $size, + \EIO_PRI_DEFAULT, + $onTruncate, + $deferred + ); + + return $deferred->promise(); } /** diff --git a/lib/Handle.php b/lib/Handle.php index 68ba9ed..d0f9986 100644 --- a/lib/Handle.php +++ b/lib/Handle.php @@ -45,6 +45,16 @@ interface Handle extends InputStream, OutputStream */ public function close(): Promise; + /** + * Truncates the file to the given length. If $size is larger than the current file size, the file is extended + * with null bytes. + * + * @param int $size New file size. + * + * @return \Amp\Promise + */ + public function truncate(int $size): Promise; + /** * Set the handle's internal pointer position. * diff --git a/lib/Internal/FileTask.php b/lib/Internal/FileTask.php index 477a3fc..70607da 100644 --- a/lib/Internal/FileTask.php +++ b/lib/Internal/FileTask.php @@ -112,6 +112,7 @@ class FileTask implements Task case "fread": case "fwrite": case "fseek": + case "ftruncate": return ([$file, \substr($this->operation, 1)])(...$this->args); case "fclose": diff --git a/lib/ParallelHandle.php b/lib/ParallelHandle.php index 594f075..f0cb214 100644 --- a/lib/ParallelHandle.php +++ b/lib/ParallelHandle.php @@ -4,7 +4,6 @@ namespace Amp\File; use Amp\ByteStream\ClosedException; use Amp\ByteStream\StreamException; -use Amp\Coroutine; use Amp\Parallel\Worker\TaskException; use Amp\Parallel\Worker\Worker; use Amp\Parallel\Worker\WorkerException; @@ -97,6 +96,41 @@ class ParallelHandle implements Handle return $this->closing; } + /** + * {@inheritdoc} + */ + public function truncate(int $size): Promise + { + if ($this->id === null) { + throw new ClosedException("The file has been closed"); + } + + if ($this->busy) { + throw new PendingOperationError; + } + + if (!$this->writable) { + throw new ClosedException("The file is no longer writable"); + } + + return call(function () use ($size) { + ++$this->pendingWrites; + $this->busy = true; + + try { + yield $this->worker->enqueue(new Internal\FileTask('ftruncate', [$size], $this->id)); + } catch (TaskException $exception) { + throw new StreamException("Reading from the file failed", 0, $exception); + } catch (WorkerException $exception) { + throw new StreamException("Sending the task to the worker failed", 0, $exception); + } finally { + if (--$this->pendingWrites === 0) { + $this->busy = false; + } + } + }); + } + /** * {@inheritdoc} */ @@ -115,24 +149,22 @@ class ParallelHandle implements Handle throw new PendingOperationError; } - return new Coroutine($this->doRead($length)); - } + return call(function () use ($length) { + $this->busy = true; - private function doRead(int $length): \Generator - { - $this->busy = true; + try { + $data = yield $this->worker->enqueue(new Internal\FileTask('fread', [$length], $this->id)); + $this->position += \strlen($data); + } catch (TaskException $exception) { + throw new StreamException("Reading from the file failed", 0, $exception); + } catch (WorkerException $exception) { + throw new StreamException("Sending the task to the worker failed", 0, $exception); + } finally { + $this->busy = false; + } - try { - $data = yield $this->worker->enqueue(new Internal\FileTask('fread', [$length], $this->id)); - $this->position += \strlen($data); return $data; - } catch (TaskException $exception) { - throw new StreamException("Reading from the file failed", 0, $exception); - } catch (WorkerException $exception) { - throw new StreamException("Sending the task to the worker failed", 0, $exception); - } finally { - $this->busy = false; - } + }); } /** @@ -152,7 +184,25 @@ class ParallelHandle implements Handle throw new ClosedException("The file is no longer writable"); } - return new Coroutine($this->doWrite($data)); + return call(function () use ($data) { + ++$this->pendingWrites; + $this->busy = true; + + try { + $length = yield $this->worker->enqueue(new Internal\FileTask('fwrite', [$data], $this->id)); + } catch (TaskException $exception) { + throw new StreamException("Writing to the file failed", 0, $exception); + } catch (WorkerException $exception) { + throw new StreamException("Sending the task to the worker failed", 0, $exception); + } finally { + if (--$this->pendingWrites === 0) { + $this->busy = false; + } + } + + $this->position += $length; + return $length; + }); } /** @@ -171,27 +221,6 @@ class ParallelHandle implements Handle }); } - private function doWrite(string $data): \Generator - { - ++$this->pendingWrites; - $this->busy = true; - - try { - $length = yield $this->worker->enqueue(new Internal\FileTask('fwrite', [$data], $this->id)); - } catch (TaskException $exception) { - throw new StreamException("Writing to the file failed", 0, $exception); - } catch (WorkerException $exception) { - throw new StreamException("Sending the task to the worker failed", 0, $exception); - } finally { - if (--$this->pendingWrites === 0) { - $this->busy = false; - } - } - - $this->position += $length; - return $length; - } - /** * {@inheritdoc} */ @@ -205,34 +234,31 @@ class ParallelHandle implements Handle throw new PendingOperationError; } - return new Coroutine($this->doSeek($offset, $whence)); - } + return call(function () use ($offset, $whence) { + switch ($whence) { + case \SEEK_SET: + case \SEEK_CUR: + case \SEEK_END: + try { + $this->position = yield $this->worker->enqueue( + new Internal\FileTask('fseek', [$offset, $whence], $this->id) + ); - private function doSeek(int $offset, int $whence) - { - switch ($whence) { - case \SEEK_SET: - case \SEEK_CUR: - case \SEEK_END: - try { - $this->position = yield $this->worker->enqueue( - new Internal\FileTask('fseek', [$offset, $whence], $this->id) - ); + if ($this->position > $this->size) { + $this->size = $this->position; + } - if ($this->position > $this->size) { - $this->size = $this->position; + return $this->position; + } catch (TaskException $exception) { + throw new StreamException('Seeking in the file failed.', 0, $exception); + } catch (WorkerException $exception) { + throw new StreamException("Sending the task to the worker failed", 0, $exception); } - return $this->position; - } catch (TaskException $exception) { - throw new StreamException('Seeking in the file failed.', 0, $exception); - } catch (WorkerException $exception) { - throw new StreamException("Sending the task to the worker failed", 0, $exception); - } - - default: - throw new \Error('Invalid whence value. Use SEEK_SET, SEEK_CUR, or SEEK_END.'); - } + default: + throw new \Error('Invalid whence value. Use SEEK_SET, SEEK_CUR, or SEEK_END.'); + } + }); } /** diff --git a/lib/UvHandle.php b/lib/UvHandle.php index 25fe11f..b12b23e 100644 --- a/lib/UvHandle.php +++ b/lib/UvHandle.php @@ -185,6 +185,63 @@ class UvHandle implements Handle return $deferred->promise(); } + public function truncate(int $size): Promise + { + if ($this->isActive && $this->queue->isEmpty()) { + throw new PendingOperationError; + } + + if (!$this->writable) { + throw new ClosedException("The file is no longer writable"); + } + + $this->isActive = true; + + if ($this->queue->isEmpty()) { + $promise = $this->trim($size); + } else { + $promise = $this->queue->top(); + $promise = call(function () use ($promise, $size) { + yield $promise; + return yield $this->trim($size); + }); + } + + $this->queue->push($promise); + + return $promise; + } + + private function trim(int $size): Promise + { + $deferred = new Deferred; + $this->poll->listen($deferred->promise()); + + $onTruncate = function ($fh) use ($deferred, $size) { + if ($this->queue->isEmpty()) { + $deferred->fail(new ClosedException('No pending write, the file may have been closed')); + } + + $this->queue->shift(); + if ($this->queue->isEmpty()) { + $this->isActive = false; + } + + StatCache::clear($this->path); + $this->size = $size; + $deferred->resolve(); + }; + + \uv_fs_ftruncate( + $this->loop, + $this->fh, + $size, + $onTruncate + ); + + return $deferred->promise(); + } + /** * {@inheritdoc} */ diff --git a/test/HandleTest.php b/test/HandleTest.php index 8edd1fb..6f2f4e7 100644 --- a/test/HandleTest.php +++ b/test/HandleTest.php @@ -212,4 +212,61 @@ abstract class HandleTest extends TestCase yield $handle->read(); }); } + + /** + * @depends testWrite + */ + public function testTruncateToSmallerSize() + { + $this->execute(function () { + $path = Fixture::path() . "/write"; + /** @var \Amp\File\Handle $handle */ + $handle = yield File\open($path, "c+"); + + $handle->write("foo"); + yield $handle->write("bar"); + yield $handle->truncate(4); + yield $handle->seek(0); + $contents = yield $handle->read(); + $this->assertTrue($handle->eof()); + $this->assertSame("foob", $contents); + + yield $handle->write("bar"); + $this->assertSame(7, $handle->tell()); + yield $handle->seek(0); + $contents = yield $handle->read(); + $this->assertSame("foobbar", $contents); + + yield $handle->close(); + }); + } + + /** + * @depends testWrite + */ + public function testTruncateToLargerSize() + { + $this->execute(function () { + $path = Fixture::path() . "/write"; + /** @var \Amp\File\Handle $handle */ + $handle = yield File\open($path, "c+"); + + yield $handle->write("foo"); + yield $handle->truncate(6); + $this->assertSame(3, $handle->tell()); + yield $handle->seek(0); + $contents = yield $handle->read(); + $this->assertTrue($handle->eof()); + $this->assertSame("foo\0\0\0", $contents); + + yield $handle->write("bar"); + $this->assertSame(9, $handle->tell()); + yield $handle->seek(0); + $contents = yield $handle->read(); + $this->assertSame("foo\0\0\0bar", $contents); + + yield $handle->close(); + }); + } + }