mirror of
https://github.com/danog/file.git
synced 2024-11-30 04:19:39 +01:00
parent
25d8ef6e67
commit
8cfe851cd2
@ -110,6 +110,22 @@ class BlockingHandle implements Handle
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function truncate(int $size): Promise
|
||||||
|
{
|
||||||
|
if ($this->fh === null) {
|
||||||
|
throw new ClosedException("The file has been closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!@\ftruncate($this->fh, $size)) {
|
||||||
|
return new Failure(new StreamException("Could not truncate file"));
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Success;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
|
@ -70,20 +70,7 @@ class EioHandle implements Handle
|
|||||||
$deferred = new Deferred;
|
$deferred = new Deferred;
|
||||||
$this->poll->listen($deferred->promise());
|
$this->poll->listen($deferred->promise());
|
||||||
|
|
||||||
\eio_read(
|
$onRead = function (Deferred $deferred, $result, $req) {
|
||||||
$this->fh,
|
|
||||||
$length,
|
|
||||||
$this->position,
|
|
||||||
\EIO_PRI_DEFAULT,
|
|
||||||
[$this, "onRead"],
|
|
||||||
$deferred
|
|
||||||
);
|
|
||||||
|
|
||||||
return $deferred->promise();
|
|
||||||
}
|
|
||||||
|
|
||||||
private function onRead(Deferred $deferred, $result, $req)
|
|
||||||
{
|
|
||||||
$this->isActive = false;
|
$this->isActive = false;
|
||||||
|
|
||||||
if ($result === -1) {
|
if ($result === -1) {
|
||||||
@ -97,6 +84,18 @@ class EioHandle implements Handle
|
|||||||
$this->position += \strlen($result);
|
$this->position += \strlen($result);
|
||||||
$deferred->resolve(\strlen($result) ? $result : null);
|
$deferred->resolve(\strlen($result) ? $result : null);
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
\eio_read(
|
||||||
|
$this->fh,
|
||||||
|
$length,
|
||||||
|
$this->position,
|
||||||
|
\EIO_PRI_DEFAULT,
|
||||||
|
$onRead,
|
||||||
|
$deferred
|
||||||
|
);
|
||||||
|
|
||||||
|
return $deferred->promise();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -136,37 +135,7 @@ class EioHandle implements Handle
|
|||||||
$deferred = new Deferred;
|
$deferred = new Deferred;
|
||||||
$this->poll->listen($deferred->promise());
|
$this->poll->listen($deferred->promise());
|
||||||
|
|
||||||
\eio_write(
|
$onWrite = function (Deferred $deferred, $result, $req) {
|
||||||
$this->fh,
|
|
||||||
$data,
|
|
||||||
$length,
|
|
||||||
$this->position,
|
|
||||||
\EIO_PRI_DEFAULT,
|
|
||||||
[$this, "onWrite"],
|
|
||||||
$deferred
|
|
||||||
);
|
|
||||||
|
|
||||||
return $deferred->promise();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* {@inheritdoc}
|
|
||||||
*/
|
|
||||||
public function end(string $data = ""): Promise
|
|
||||||
{
|
|
||||||
return call(function () use ($data) {
|
|
||||||
$promise = $this->write($data);
|
|
||||||
$this->writable = false;
|
|
||||||
|
|
||||||
// ignore any errors
|
|
||||||
yield Promise\any([$this->close()]);
|
|
||||||
|
|
||||||
return $promise;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private function onWrite(Deferred $deferred, $result, $req)
|
|
||||||
{
|
|
||||||
if ($this->queue->isEmpty()) {
|
if ($this->queue->isEmpty()) {
|
||||||
$deferred->fail(new ClosedException('No pending write, the file may have been closed'));
|
$deferred->fail(new ClosedException('No pending write, the file may have been closed'));
|
||||||
}
|
}
|
||||||
@ -191,6 +160,35 @@ class EioHandle implements Handle
|
|||||||
|
|
||||||
$deferred->resolve($result);
|
$deferred->resolve($result);
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
\eio_write(
|
||||||
|
$this->fh,
|
||||||
|
$data,
|
||||||
|
$length,
|
||||||
|
$this->position,
|
||||||
|
\EIO_PRI_DEFAULT,
|
||||||
|
$onWrite,
|
||||||
|
$deferred
|
||||||
|
);
|
||||||
|
|
||||||
|
return $deferred->promise();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function end(string $data = ""): Promise
|
||||||
|
{
|
||||||
|
return call(function () use ($data) {
|
||||||
|
$promise = $this->write($data);
|
||||||
|
$this->writable = false;
|
||||||
|
|
||||||
|
// ignore any errors
|
||||||
|
yield Promise\any([$this->close()]);
|
||||||
|
|
||||||
|
return $promise;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -205,16 +203,79 @@ class EioHandle implements Handle
|
|||||||
$deferred = new Deferred;
|
$deferred = new Deferred;
|
||||||
$this->poll->listen($this->closing = $deferred->promise());
|
$this->poll->listen($this->closing = $deferred->promise());
|
||||||
|
|
||||||
\eio_close($this->fh, \EIO_PRI_DEFAULT, [$this, "onClose"], $deferred);
|
\eio_close($this->fh, \EIO_PRI_DEFAULT, function (Deferred $deferred) {
|
||||||
|
// Ignore errors when closing file, as the handle will become invalid anyway.
|
||||||
|
$deferred->resolve();
|
||||||
|
}, $deferred);
|
||||||
|
|
||||||
return $deferred->promise();
|
return $deferred->promise();
|
||||||
}
|
}
|
||||||
|
|
||||||
private function onClose(Deferred $deferred, $result, $req)
|
public function truncate(int $size): Promise
|
||||||
{
|
{
|
||||||
// Ignore errors when closing file, as the handle will become invalid anyway.
|
if ($this->isActive && $this->queue->isEmpty()) {
|
||||||
|
throw new PendingOperationError;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$this->writable) {
|
||||||
|
throw new ClosedException("The file is no longer writable");
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->isActive = true;
|
||||||
|
|
||||||
|
if ($this->queue->isEmpty()) {
|
||||||
|
$promise = $this->trim($size);
|
||||||
|
} else {
|
||||||
|
$promise = $this->queue->top();
|
||||||
|
$promise = call(function () use ($promise, $size) {
|
||||||
|
yield $promise;
|
||||||
|
return yield $this->trim($size);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->queue->push($promise);
|
||||||
|
|
||||||
|
return $promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function trim(int $size): Promise
|
||||||
|
{
|
||||||
|
$deferred = new Deferred;
|
||||||
|
$this->poll->listen($deferred->promise());
|
||||||
|
|
||||||
|
$onTruncate = function (Deferred $deferred, $result, $req) use ($size) {
|
||||||
|
if ($this->queue->isEmpty()) {
|
||||||
|
$deferred->fail(new ClosedException('No pending write, the file may have been closed'));
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->queue->shift();
|
||||||
|
if ($this->queue->isEmpty()) {
|
||||||
|
$this->isActive = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($result === -1) {
|
||||||
|
$error = \eio_get_last_error($req);
|
||||||
|
if ($error === "Bad file descriptor") {
|
||||||
|
$deferred->fail(new ClosedException("Truncating the file failed due to a closed handle"));
|
||||||
|
} else {
|
||||||
|
$deferred->fail(new StreamException("Truncating the file failed: " . $error));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
$this->size = $size;
|
||||||
$deferred->resolve();
|
$deferred->resolve();
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
\eio_ftruncate(
|
||||||
|
$this->fh,
|
||||||
|
$size,
|
||||||
|
\EIO_PRI_DEFAULT,
|
||||||
|
$onTruncate,
|
||||||
|
$deferred
|
||||||
|
);
|
||||||
|
|
||||||
|
return $deferred->promise();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
|
@ -45,6 +45,16 @@ interface Handle extends InputStream, OutputStream
|
|||||||
*/
|
*/
|
||||||
public function close(): Promise;
|
public function close(): Promise;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Truncates the file to the given length. If $size is larger than the current file size, the file is extended
|
||||||
|
* with null bytes.
|
||||||
|
*
|
||||||
|
* @param int $size New file size.
|
||||||
|
*
|
||||||
|
* @return \Amp\Promise
|
||||||
|
*/
|
||||||
|
public function truncate(int $size): Promise;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the handle's internal pointer position.
|
* Set the handle's internal pointer position.
|
||||||
*
|
*
|
||||||
|
@ -112,6 +112,7 @@ class FileTask implements Task
|
|||||||
case "fread":
|
case "fread":
|
||||||
case "fwrite":
|
case "fwrite":
|
||||||
case "fseek":
|
case "fseek":
|
||||||
|
case "ftruncate":
|
||||||
return ([$file, \substr($this->operation, 1)])(...$this->args);
|
return ([$file, \substr($this->operation, 1)])(...$this->args);
|
||||||
|
|
||||||
case "fclose":
|
case "fclose":
|
||||||
|
@ -4,7 +4,6 @@ namespace Amp\File;
|
|||||||
|
|
||||||
use Amp\ByteStream\ClosedException;
|
use Amp\ByteStream\ClosedException;
|
||||||
use Amp\ByteStream\StreamException;
|
use Amp\ByteStream\StreamException;
|
||||||
use Amp\Coroutine;
|
|
||||||
use Amp\Parallel\Worker\TaskException;
|
use Amp\Parallel\Worker\TaskException;
|
||||||
use Amp\Parallel\Worker\Worker;
|
use Amp\Parallel\Worker\Worker;
|
||||||
use Amp\Parallel\Worker\WorkerException;
|
use Amp\Parallel\Worker\WorkerException;
|
||||||
@ -97,6 +96,41 @@ class ParallelHandle implements Handle
|
|||||||
return $this->closing;
|
return $this->closing;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritdoc}
|
||||||
|
*/
|
||||||
|
public function truncate(int $size): Promise
|
||||||
|
{
|
||||||
|
if ($this->id === null) {
|
||||||
|
throw new ClosedException("The file has been closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
if ($this->busy) {
|
||||||
|
throw new PendingOperationError;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$this->writable) {
|
||||||
|
throw new ClosedException("The file is no longer writable");
|
||||||
|
}
|
||||||
|
|
||||||
|
return call(function () use ($size) {
|
||||||
|
++$this->pendingWrites;
|
||||||
|
$this->busy = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
yield $this->worker->enqueue(new Internal\FileTask('ftruncate', [$size], $this->id));
|
||||||
|
} catch (TaskException $exception) {
|
||||||
|
throw new StreamException("Reading from the file failed", 0, $exception);
|
||||||
|
} catch (WorkerException $exception) {
|
||||||
|
throw new StreamException("Sending the task to the worker failed", 0, $exception);
|
||||||
|
} finally {
|
||||||
|
if (--$this->pendingWrites === 0) {
|
||||||
|
$this->busy = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
@ -115,17 +149,12 @@ class ParallelHandle implements Handle
|
|||||||
throw new PendingOperationError;
|
throw new PendingOperationError;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Coroutine($this->doRead($length));
|
return call(function () use ($length) {
|
||||||
}
|
|
||||||
|
|
||||||
private function doRead(int $length): \Generator
|
|
||||||
{
|
|
||||||
$this->busy = true;
|
$this->busy = true;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$data = yield $this->worker->enqueue(new Internal\FileTask('fread', [$length], $this->id));
|
$data = yield $this->worker->enqueue(new Internal\FileTask('fread', [$length], $this->id));
|
||||||
$this->position += \strlen($data);
|
$this->position += \strlen($data);
|
||||||
return $data;
|
|
||||||
} catch (TaskException $exception) {
|
} catch (TaskException $exception) {
|
||||||
throw new StreamException("Reading from the file failed", 0, $exception);
|
throw new StreamException("Reading from the file failed", 0, $exception);
|
||||||
} catch (WorkerException $exception) {
|
} catch (WorkerException $exception) {
|
||||||
@ -133,6 +162,9 @@ class ParallelHandle implements Handle
|
|||||||
} finally {
|
} finally {
|
||||||
$this->busy = false;
|
$this->busy = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return $data;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -152,7 +184,25 @@ class ParallelHandle implements Handle
|
|||||||
throw new ClosedException("The file is no longer writable");
|
throw new ClosedException("The file is no longer writable");
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Coroutine($this->doWrite($data));
|
return call(function () use ($data) {
|
||||||
|
++$this->pendingWrites;
|
||||||
|
$this->busy = true;
|
||||||
|
|
||||||
|
try {
|
||||||
|
$length = yield $this->worker->enqueue(new Internal\FileTask('fwrite', [$data], $this->id));
|
||||||
|
} catch (TaskException $exception) {
|
||||||
|
throw new StreamException("Writing to the file failed", 0, $exception);
|
||||||
|
} catch (WorkerException $exception) {
|
||||||
|
throw new StreamException("Sending the task to the worker failed", 0, $exception);
|
||||||
|
} finally {
|
||||||
|
if (--$this->pendingWrites === 0) {
|
||||||
|
$this->busy = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->position += $length;
|
||||||
|
return $length;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -171,27 +221,6 @@ class ParallelHandle implements Handle
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private function doWrite(string $data): \Generator
|
|
||||||
{
|
|
||||||
++$this->pendingWrites;
|
|
||||||
$this->busy = true;
|
|
||||||
|
|
||||||
try {
|
|
||||||
$length = yield $this->worker->enqueue(new Internal\FileTask('fwrite', [$data], $this->id));
|
|
||||||
} catch (TaskException $exception) {
|
|
||||||
throw new StreamException("Writing to the file failed", 0, $exception);
|
|
||||||
} catch (WorkerException $exception) {
|
|
||||||
throw new StreamException("Sending the task to the worker failed", 0, $exception);
|
|
||||||
} finally {
|
|
||||||
if (--$this->pendingWrites === 0) {
|
|
||||||
$this->busy = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
$this->position += $length;
|
|
||||||
return $length;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
@ -205,11 +234,7 @@ class ParallelHandle implements Handle
|
|||||||
throw new PendingOperationError;
|
throw new PendingOperationError;
|
||||||
}
|
}
|
||||||
|
|
||||||
return new Coroutine($this->doSeek($offset, $whence));
|
return call(function () use ($offset, $whence) {
|
||||||
}
|
|
||||||
|
|
||||||
private function doSeek(int $offset, int $whence)
|
|
||||||
{
|
|
||||||
switch ($whence) {
|
switch ($whence) {
|
||||||
case \SEEK_SET:
|
case \SEEK_SET:
|
||||||
case \SEEK_CUR:
|
case \SEEK_CUR:
|
||||||
@ -233,6 +258,7 @@ class ParallelHandle implements Handle
|
|||||||
default:
|
default:
|
||||||
throw new \Error('Invalid whence value. Use SEEK_SET, SEEK_CUR, or SEEK_END.');
|
throw new \Error('Invalid whence value. Use SEEK_SET, SEEK_CUR, or SEEK_END.');
|
||||||
}
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -185,6 +185,63 @@ class UvHandle implements Handle
|
|||||||
return $deferred->promise();
|
return $deferred->promise();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public function truncate(int $size): Promise
|
||||||
|
{
|
||||||
|
if ($this->isActive && $this->queue->isEmpty()) {
|
||||||
|
throw new PendingOperationError;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!$this->writable) {
|
||||||
|
throw new ClosedException("The file is no longer writable");
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->isActive = true;
|
||||||
|
|
||||||
|
if ($this->queue->isEmpty()) {
|
||||||
|
$promise = $this->trim($size);
|
||||||
|
} else {
|
||||||
|
$promise = $this->queue->top();
|
||||||
|
$promise = call(function () use ($promise, $size) {
|
||||||
|
yield $promise;
|
||||||
|
return yield $this->trim($size);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->queue->push($promise);
|
||||||
|
|
||||||
|
return $promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
private function trim(int $size): Promise
|
||||||
|
{
|
||||||
|
$deferred = new Deferred;
|
||||||
|
$this->poll->listen($deferred->promise());
|
||||||
|
|
||||||
|
$onTruncate = function ($fh) use ($deferred, $size) {
|
||||||
|
if ($this->queue->isEmpty()) {
|
||||||
|
$deferred->fail(new ClosedException('No pending write, the file may have been closed'));
|
||||||
|
}
|
||||||
|
|
||||||
|
$this->queue->shift();
|
||||||
|
if ($this->queue->isEmpty()) {
|
||||||
|
$this->isActive = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
StatCache::clear($this->path);
|
||||||
|
$this->size = $size;
|
||||||
|
$deferred->resolve();
|
||||||
|
};
|
||||||
|
|
||||||
|
\uv_fs_ftruncate(
|
||||||
|
$this->loop,
|
||||||
|
$this->fh,
|
||||||
|
$size,
|
||||||
|
$onTruncate
|
||||||
|
);
|
||||||
|
|
||||||
|
return $deferred->promise();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@inheritdoc}
|
* {@inheritdoc}
|
||||||
*/
|
*/
|
||||||
|
@ -212,4 +212,61 @@ abstract class HandleTest extends TestCase
|
|||||||
yield $handle->read();
|
yield $handle->read();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @depends testWrite
|
||||||
|
*/
|
||||||
|
public function testTruncateToSmallerSize()
|
||||||
|
{
|
||||||
|
$this->execute(function () {
|
||||||
|
$path = Fixture::path() . "/write";
|
||||||
|
/** @var \Amp\File\Handle $handle */
|
||||||
|
$handle = yield File\open($path, "c+");
|
||||||
|
|
||||||
|
$handle->write("foo");
|
||||||
|
yield $handle->write("bar");
|
||||||
|
yield $handle->truncate(4);
|
||||||
|
yield $handle->seek(0);
|
||||||
|
$contents = yield $handle->read();
|
||||||
|
$this->assertTrue($handle->eof());
|
||||||
|
$this->assertSame("foob", $contents);
|
||||||
|
|
||||||
|
yield $handle->write("bar");
|
||||||
|
$this->assertSame(7, $handle->tell());
|
||||||
|
yield $handle->seek(0);
|
||||||
|
$contents = yield $handle->read();
|
||||||
|
$this->assertSame("foobbar", $contents);
|
||||||
|
|
||||||
|
yield $handle->close();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @depends testWrite
|
||||||
|
*/
|
||||||
|
public function testTruncateToLargerSize()
|
||||||
|
{
|
||||||
|
$this->execute(function () {
|
||||||
|
$path = Fixture::path() . "/write";
|
||||||
|
/** @var \Amp\File\Handle $handle */
|
||||||
|
$handle = yield File\open($path, "c+");
|
||||||
|
|
||||||
|
yield $handle->write("foo");
|
||||||
|
yield $handle->truncate(6);
|
||||||
|
$this->assertSame(3, $handle->tell());
|
||||||
|
yield $handle->seek(0);
|
||||||
|
$contents = yield $handle->read();
|
||||||
|
$this->assertTrue($handle->eof());
|
||||||
|
$this->assertSame("foo\0\0\0", $contents);
|
||||||
|
|
||||||
|
yield $handle->write("bar");
|
||||||
|
$this->assertSame(9, $handle->tell());
|
||||||
|
yield $handle->seek(0);
|
||||||
|
$contents = yield $handle->read();
|
||||||
|
$this->assertSame("foo\0\0\0bar", $contents);
|
||||||
|
|
||||||
|
yield $handle->close();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user