1
0
mirror of https://github.com/danog/file.git synced 2024-12-02 09:17:57 +01:00
file/lib/UvHandle.php

218 lines
5.8 KiB
PHP
Raw Normal View History

<?php
2015-08-13 01:02:41 +02:00
namespace Amp\File;
2017-06-17 23:41:57 +02:00
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
use function Amp\call;
2015-08-13 01:02:41 +02:00
class UvHandle implements Handle {
private $watcher;
2016-07-21 01:33:03 +02:00
private $driver;
2015-08-13 01:02:41 +02:00
private $fh;
private $path;
private $mode;
private $size;
private $loop;
private $position;
private $queue;
2015-08-13 01:02:41 +02:00
private $isActive = false;
private $writable = true;
2015-08-13 01:02:41 +02:00
2017-05-17 02:55:24 +02:00
/**
* @param \Amp\Loop\UvDriver $driver
* @param string $busy Watcher ID.
* @param resource $fh File handle.
* @param string $path
* @param string $mode
* @param int $size
*/
public function __construct(Loop\UvDriver $driver, string $watcher, $fh, string $path, string $mode, int $size) {
2016-07-21 01:33:03 +02:00
$this->driver = $driver;
$this->watcher = $watcher;
2015-08-13 01:02:41 +02:00
$this->fh = $fh;
$this->path = $path;
$this->mode = $mode;
$this->size = $size;
2016-07-21 01:33:03 +02:00
$this->loop = $driver->getHandle();
2015-08-13 01:02:41 +02:00
$this->position = ($mode[0] === "a") ? $size : 0;
$this->queue = new \SplQueue;
2015-08-13 01:02:41 +02:00
}
public function read(int $length = self::DEFAULT_READ_LENGTH): Promise {
2015-08-13 01:02:41 +02:00
if ($this->isActive) {
throw new PendingOperationError;
2015-08-13 01:02:41 +02:00
}
$this->driver->reference($this->watcher);
$deferred = new Deferred;
$this->isActive = true;
$onRead = function ($fh, $result, $buffer) use ($deferred) {
$this->isActive = false;
$this->driver->unreference($this->watcher);
if ($result < 0) {
$deferred->fail(new FilesystemException(
\uv_strerror($result)
));
} else {
$length = strlen($buffer);
$this->position = $this->position + $length;
$deferred->resolve($length ? $buffer : null);
}
};
\uv_fs_read($this->loop, $this->fh, $this->position, $length, $onRead);
2017-06-20 07:31:58 +02:00
2016-11-15 06:17:19 +01:00
return $deferred->promise();
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function write(string $data): Promise {
if ($this->isActive && $this->queue->isEmpty()) {
throw new PendingOperationError;
}
if (!$this->writable) {
throw new \Error("The file is no longer writable");
}
$this->isActive = true;
if ($this->queue->isEmpty()) {
$promise = $this->push($data);
2015-08-13 01:02:41 +02:00
} else {
$promise = $this->queue->top();
$promise = call(function () use ($promise, $data) {
yield $promise;
return yield $this->push($data);
});
2015-08-13 01:02:41 +02:00
}
$this->queue->push($promise);
return $promise;
2015-08-13 01:02:41 +02:00
}
2017-05-12 22:43:23 +02:00
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$this->writable = false;
2017-05-12 22:43:23 +02:00
$promise->onResolve([$this, "close"]);
return $promise;
}
private function push(string $data): Promise {
$length = \strlen($data);
$this->driver->reference($this->watcher);
$deferred = new Deferred;
$onWrite = function ($fh, $result) use ($deferred, $length) {
if ($this->queue->isEmpty()) {
$this->driver->unreference($this->watcher);
$deferred->fail(new FilesystemException('No pending write, the file may have been closed'));
2015-08-13 01:02:41 +02:00
}
$this->queue->shift();
if ($this->queue->isEmpty()) {
$this->driver->unreference($this->watcher);
$this->isActive = false;
2015-08-13 01:02:41 +02:00
}
if ($result < 0) {
$deferred->fail(new FilesystemException(
2015-08-13 01:02:41 +02:00
\uv_strerror($result)
));
} else {
StatCache::clear($this->path);
$newPosition = $this->position + $length;
2015-08-13 01:02:41 +02:00
$delta = $newPosition - $this->position;
$this->position = ($this->mode[0] === "a") ? $this->position : $newPosition;
$this->size += $delta;
$deferred->resolve($length);
2015-08-13 01:02:41 +02:00
}
};
\uv_fs_write($this->loop, $this->fh, $data, $this->position, $onWrite);
return $deferred->promise();
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
2016-11-15 06:17:19 +01:00
public function seek(int $offset, int $whence = \SEEK_SET): Promise {
if ($this->isActive) {
throw new PendingOperationError;
}
2015-08-13 01:02:41 +02:00
$offset = (int) $offset;
switch ($whence) {
case \SEEK_SET:
$this->position = $offset;
break;
case \SEEK_CUR:
$this->position = $this->position + $offset;
break;
case \SEEK_END:
$this->position = $this->size + $offset;
break;
default:
2016-08-30 21:05:14 +02:00
throw new \Error(
2015-08-13 01:02:41 +02:00
"Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected"
);
}
2017-01-11 14:22:06 +01:00
2016-08-30 21:05:14 +02:00
return new Success($this->position);
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function tell(): int {
2015-08-13 01:02:41 +02:00
return $this->position;
}
/**
* {@inheritdoc}
*/
public function eof(): bool {
return !$this->queue->isEmpty() ? false : ($this->size <= $this->position);
2015-08-13 01:02:41 +02:00
}
/**
* {@inheritdoc}
*/
public function path(): string {
2015-08-13 01:02:41 +02:00
return $this->path;
}
/**
* {@inheritdoc}
*/
public function mode(): string {
2015-08-13 01:02:41 +02:00
return $this->mode;
}
/**
* {@inheritdoc}
*/
2016-11-15 06:17:19 +01:00
public function close(): Promise {
$this->driver->reference($this->watcher);
2016-07-21 01:33:03 +02:00
$deferred = new Deferred;
2017-06-17 23:41:57 +02:00
\uv_fs_close($this->loop, $this->fh, function ($fh) use ($deferred) {
$this->driver->unreference($this->watcher);
2016-07-21 01:33:03 +02:00
$deferred->resolve();
2015-08-13 01:02:41 +02:00
});
2016-11-15 06:17:19 +01:00
return $deferred->promise();
2015-08-13 01:02:41 +02:00
}
}