1
0
mirror of https://github.com/danog/file.git synced 2024-11-26 20:04:51 +01:00
file/lib/EioHandle.php

344 lines
8.5 KiB
PHP
Raw Normal View History

<?php
2015-08-13 01:02:41 +02:00
namespace Amp\File;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\StreamException;
2017-06-17 23:41:57 +02:00
use Amp\Deferred;
use Amp\Promise;
use Amp\Success;
use function Amp\call;
2015-08-13 01:02:41 +02:00
class EioHandle implements Handle
{
/** @var \Amp\File\Internal\EioPoll */
2017-06-20 05:58:11 +02:00
private $poll;
/** @var resource eio file handle. */
2015-08-13 01:02:41 +02:00
private $fh;
/** @var string */
2015-08-13 01:02:41 +02:00
private $path;
/** @var string */
2015-08-13 01:02:41 +02:00
private $mode;
/** @var int */
2015-08-13 01:02:41 +02:00
private $size;
/** @var int */
2015-08-13 01:02:41 +02:00
private $position;
/** @var \SplQueue */
private $queue;
/** @var bool */
2015-08-13 01:02:41 +02:00
private $isActive = false;
/** @var bool */
private $writable = true;
2015-08-13 01:02:41 +02:00
/** @var \Amp\Promise|null */
private $closing;
public function __construct(Internal\EioPoll $poll, $fh, string $path, string $mode, int $size)
{
2017-06-20 05:58:11 +02:00
$this->poll = $poll;
2015-08-13 01:02:41 +02:00
$this->fh = $fh;
$this->path = $path;
$this->mode = $mode;
$this->size = $size;
$this->position = ($mode[0] === "a") ? $size : 0;
$this->queue = new \SplQueue;
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function read(int $length = self::DEFAULT_READ_LENGTH): Promise
{
2015-08-13 01:02:41 +02:00
if ($this->isActive) {
throw new PendingOperationError;
2015-08-13 01:02:41 +02:00
}
$this->isActive = true;
$remaining = $this->size - $this->position;
$length = $length > $remaining ? $remaining : $length;
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$onRead = function (Deferred $deferred, $result, $req) {
$this->isActive = false;
if ($result === -1) {
$error = \eio_get_last_error($req);
if ($error === "Bad file descriptor") {
$deferred->fail(new ClosedException("Reading from the file failed due to a closed handle"));
} else {
$deferred->fail(new StreamException("Reading from the file failed:" . $error));
}
} else {
$this->position += \strlen($result);
$deferred->resolve(\strlen($result) ? $result : null);
}
};
\eio_read(
$this->fh,
$length,
$this->position,
\EIO_PRI_DEFAULT,
$onRead,
$deferred
);
return $deferred->promise();
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function write(string $data): Promise
{
if ($this->isActive && $this->queue->isEmpty()) {
throw new PendingOperationError;
}
if (!$this->writable) {
throw new ClosedException("The file is no longer writable");
}
$this->isActive = true;
if ($this->queue->isEmpty()) {
$promise = $this->push($data);
2015-08-13 01:02:41 +02:00
} else {
$promise = $this->queue->top();
$promise = call(function () use ($promise, $data) {
yield $promise;
return yield $this->push($data);
});
2015-08-13 01:02:41 +02:00
}
$this->queue->push($promise);
return $promise;
}
private function push(string $data): Promise
{
$length = \strlen($data);
if ($length === 0) {
return new Success(0);
}
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$onWrite = function (Deferred $deferred, $result, $req) {
if ($this->queue->isEmpty()) {
$deferred->fail(new ClosedException('No pending write, the file may have been closed'));
}
$this->queue->shift();
if ($this->queue->isEmpty()) {
$this->isActive = false;
}
if ($result === -1) {
$error = \eio_get_last_error($req);
if ($error === "Bad file descriptor") {
$deferred->fail(new ClosedException("Writing to the file failed due to a closed handle"));
} else {
$deferred->fail(new StreamException("Writing to the file failed: " . $error));
}
} else {
$this->position += $result;
if ($this->position > $this->size) {
$this->size = $this->position;
}
$deferred->resolve($result);
}
};
\eio_write(
$this->fh,
$data,
$length,
$this->position,
\EIO_PRI_DEFAULT,
$onWrite,
$deferred
);
2016-11-15 06:17:19 +01:00
return $deferred->promise();
2015-08-13 01:02:41 +02:00
}
2017-05-12 22:43:23 +02:00
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise
{
return call(function () use ($data) {
$promise = $this->write($data);
$this->writable = false;
// ignore any errors
yield Promise\any([$this->close()]);
return $promise;
});
2017-05-12 22:43:23 +02:00
}
2015-08-13 01:02:41 +02:00
/**
* {@inheritdoc}
*/
public function close(): Promise
{
if ($this->closing) {
return $this->closing;
}
2016-07-21 01:33:03 +02:00
$deferred = new Deferred;
$this->poll->listen($this->closing = $deferred->promise());
\eio_close($this->fh, \EIO_PRI_DEFAULT, function (Deferred $deferred) {
// Ignore errors when closing file, as the handle will become invalid anyway.
$deferred->resolve();
}, $deferred);
2015-08-13 01:02:41 +02:00
2016-11-15 06:17:19 +01:00
return $deferred->promise();
2015-08-13 01:02:41 +02:00
}
public function truncate(int $size): Promise
{
if ($this->isActive && $this->queue->isEmpty()) {
throw new PendingOperationError;
}
if (!$this->writable) {
throw new ClosedException("The file is no longer writable");
}
$this->isActive = true;
if ($this->queue->isEmpty()) {
$promise = $this->trim($size);
} else {
$promise = $this->queue->top();
$promise = call(function () use ($promise, $size) {
yield $promise;
return yield $this->trim($size);
});
}
$this->queue->push($promise);
return $promise;
}
private function trim(int $size): Promise
{
$deferred = new Deferred;
$this->poll->listen($deferred->promise());
$onTruncate = function (Deferred $deferred, $result, $req) use ($size) {
if ($this->queue->isEmpty()) {
$deferred->fail(new ClosedException('No pending write, the file may have been closed'));
}
$this->queue->shift();
if ($this->queue->isEmpty()) {
$this->isActive = false;
}
if ($result === -1) {
$error = \eio_get_last_error($req);
if ($error === "Bad file descriptor") {
$deferred->fail(new ClosedException("Truncating the file failed due to a closed handle"));
} else {
$deferred->fail(new StreamException("Truncating the file failed: " . $error));
}
} else {
$this->size = $size;
$deferred->resolve();
}
};
\eio_ftruncate(
$this->fh,
$size,
\EIO_PRI_DEFAULT,
$onTruncate,
$deferred
);
return $deferred->promise();
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function seek(int $offset, int $whence = \SEEK_SET): Promise
{
if ($this->isActive) {
throw new PendingOperationError;
}
2015-08-13 01:02:41 +02:00
switch ($whence) {
case \SEEK_SET:
$this->position = $offset;
break;
case \SEEK_CUR:
$this->position = $this->position + $offset;
break;
case \SEEK_END:
$this->position = $this->size + $offset;
break;
default:
2016-08-30 21:05:14 +02:00
throw new \Error(
2015-08-13 01:02:41 +02:00
"Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected"
);
}
2017-01-11 14:22:06 +01:00
2016-08-30 21:05:14 +02:00
return new Success($this->position);
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function tell(): int
{
2015-08-13 01:02:41 +02:00
return $this->position;
}
/**
* {@inheritdoc}
*/
public function eof(): bool
{
return !$this->queue->isEmpty() ? false : ($this->size <= $this->position);
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function path(): string
{
2015-08-13 01:02:41 +02:00
return $this->path;
}
/**
* {@inheritdoc}
*/
public function mode(): string
{
2015-08-13 01:02:41 +02:00
return $this->mode;
}
}