1
0
mirror of https://github.com/danog/file.git synced 2024-11-30 04:19:39 +01:00
file/lib/EioHandle.php

202 lines
5.7 KiB
PHP
Raw Normal View History

2016-08-24 07:01:41 +02:00
<?php declare(strict_types = 1);
2015-08-13 01:02:41 +02:00
namespace Amp\File;
use Amp\Deferred;
use Interop\Async\Awaitable;
2015-08-13 01:02:41 +02:00
class EioHandle implements Handle {
const OP_READ = 1;
const OP_WRITE = 2;
private $incrementor;
private $fh;
private $path;
private $mode;
private $size;
private $position;
private $queue = [];
private $pendingWriteOps = 0;
private $isActive = false;
public function __construct(callable $incrementor, $fh, string $path, string $mode, int $size) {
2015-08-13 01:02:41 +02:00
$this->incrementor = $incrementor;
$this->fh = $fh;
$this->path = $path;
$this->mode = $mode;
$this->size = $size;
$this->position = ($mode[0] === "a") ? $size : 0;
}
/**
* {@inheritdoc}
*/
public function read(int $length): Awaitable {
2016-07-21 01:33:03 +02:00
$deferred = new Deferred;
2015-08-13 01:02:41 +02:00
$op = new \StdClass;
$op->type = self::OP_READ;
$op->position = $this->position;
2016-07-21 01:33:03 +02:00
$op->deferred = $deferred;
2015-08-13 01:02:41 +02:00
$remaining = $this->size - $this->position;
$op->readLen = ($length > $remaining) ? $remaining : $length;
2015-08-13 01:02:41 +02:00
if ($this->isActive) {
$this->queue[] = $op;
} else {
\call_user_func($this->incrementor, 1);
$this->isActive = true;
\eio_read($this->fh, $op->readLen, $op->position, $priority = null, [$this, "onRead"], $op);
}
2016-07-21 01:33:03 +02:00
return $deferred->getAwaitable();
2015-08-13 01:02:41 +02:00
}
private function dequeue() {
$this->isActive = true;
$op = \array_shift($this->queue);
switch ($op->type) {
case self::OP_READ:
($this->incrementor)(1);
2015-08-13 01:02:41 +02:00
$this->isActive = true;
\eio_read($this->fh, $op->readLen, $op->position, $priority = null, [$this, "onRead"], $op);
break;
case self::OP_WRITE:
($this->incrementor)(1);
2015-08-13 01:02:41 +02:00
$this->isActive = true;
\eio_write($this->fh, $op->writeData, \strlen($op->writeData), $op->position, $priority = null, [$this, "onWrite"], $op);
break;
}
}
private function onRead($op, $result, $req) {
$this->isActive = false;
($this->incrementor)(-1);
2015-08-13 01:02:41 +02:00
if ($result === -1) {
2016-07-21 01:33:03 +02:00
$op->deferred->fail(new FilesystemException(
2015-08-13 01:02:41 +02:00
\eio_get_last_error($req)
));
} else {
$this->position = $op->position + \strlen($result);
2016-07-21 01:33:03 +02:00
$op->deferred->resolve($result);
2015-08-13 01:02:41 +02:00
}
if ($this->queue) {
$this->dequeue();
}
}
/**
* {@inheritdoc}
*/
public function write(string $data): Awaitable {
2016-07-21 01:33:03 +02:00
$deferred = new Deferred;
2015-08-13 01:02:41 +02:00
$op = new \StdClass;
$op->type = self::OP_WRITE;
$op->position = $this->position;
2016-07-21 01:33:03 +02:00
$op->deferred = $deferred;
2015-08-13 01:02:41 +02:00
$op->writeData = $data;
$this->pendingWriteOps++;
if ($this->isActive) {
$this->queue[] = $op;
} else {
\call_user_func($this->incrementor, 1);
$this->isActive = true;
\eio_write($this->fh, $data, strlen($data), $op->position, $priority = null, [$this, "onWrite"], $op);
}
2016-07-21 01:33:03 +02:00
return $deferred->getAwaitable();
2015-08-13 01:02:41 +02:00
}
private function onWrite($op, $result, $req) {
$this->isActive = false;
($this->incrementor)(-1);
2015-08-13 01:02:41 +02:00
if ($result === -1) {
2016-07-21 01:33:03 +02:00
$op->deferred->fail(new FilesystemException(
2015-08-13 01:02:41 +02:00
\eio_get_last_error($req)
));
} else {
StatCache::clear($this->path);
$bytesWritten = \strlen($op->writeData);
$this->pendingWriteOps--;
$newPosition = $op->position + $bytesWritten;
$delta = $newPosition - $this->position;
$this->position = ($this->mode[0] === "a") ? $this->position : $newPosition;
$this->size += $delta;
2016-07-21 01:33:03 +02:00
$op->deferred->resolve($result);
2015-08-13 01:02:41 +02:00
}
if ($this->queue) {
$this->dequeue();
}
}
/**
* {@inheritdoc}
*/
public function close(): Awaitable {
($this->incrementor)(1);
2016-07-21 01:33:03 +02:00
$deferred = new Deferred;
\eio_close($this->fh, $priority = null, [$this, "onClose"], $deferred);
2015-08-13 01:02:41 +02:00
2016-07-21 01:33:03 +02:00
return $deferred->getAwaitable();
2015-08-13 01:02:41 +02:00
}
2016-07-21 01:33:03 +02:00
private function onClose($deferred, $result, $req) {
2015-08-13 01:02:41 +02:00
\call_user_func($this->incrementor, -1);
if ($result === -1) {
2016-07-21 01:33:03 +02:00
$deferred->fail(new FilesystemException(
2015-08-13 01:02:41 +02:00
\eio_get_last_error($req)
));
} else {
2016-07-21 01:33:03 +02:00
$deferred->resolve();
2015-08-13 01:02:41 +02:00
}
}
/**
* {@inheritdoc}
*/
public function seek(int $offset, int $whence = \SEEK_SET): Awaitable {
2015-08-13 01:02:41 +02:00
$offset = (int) $offset;
switch ($whence) {
case \SEEK_SET:
$this->position = $offset;
break;
case \SEEK_CUR:
$this->position = $this->position + $offset;
break;
case \SEEK_END:
$this->position = $this->size + $offset;
break;
default:
throw new FilesystemException(
"Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected"
);
}
}
/**
* {@inheritdoc}
*/
public function tell(): int {
2015-08-13 01:02:41 +02:00
return $this->position;
}
/**
* {@inheritdoc}
*/
public function eof(): bool {
2015-08-13 01:02:41 +02:00
return ($this->pendingWriteOps > 0) ? false : ($this->size <= $this->position);
}
/**
* {@inheritdoc}
*/
public function path(): string {
2015-08-13 01:02:41 +02:00
return $this->path;
}
/**
* {@inheritdoc}
*/
public function mode(): string {
2015-08-13 01:02:41 +02:00
return $this->mode;
}
}