1
0
mirror of https://github.com/danog/file.git synced 2025-01-22 21:31:15 +01:00
file/lib/ParallelHandle.php

193 lines
4.9 KiB
PHP
Raw Normal View History

<?php
2016-11-14 23:17:19 -06:00
2016-08-30 14:05:14 -05:00
namespace Amp\File;
use Amp\{ Coroutine, Success };
use Amp\Parallel\{
TaskException, Worker\Worker, WorkerException
};
2017-01-11 14:22:06 +01:00
use AsyncInterop\Promise;
2016-08-30 14:05:14 -05:00
class ParallelHandle implements Handle {
/** @var \Amp\Parallel\Worker\Worker */
private $worker;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/** @var int|null */
private $id;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/** @var string */
private $path;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/** @var int */
private $position;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/** @var int */
private $size;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/** @var string */
private $mode;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/** @var int Number of pending write operations. */
private $pendingWrites = 0;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* @param \Amp\Parallel\Worker\Worker $worker
* @param int $id
* @param string $path
* @param int $size
* @param string $mode
*/
public function __construct(Worker $worker, int $id, string $path, int $size, string $mode) {
$this->worker = $worker;
$this->id = $id;
$this->path = $path;
$this->size = $size;
$this->mode = $mode;
$this->position = $this->mode[0] === 'a' ? $this->size : 0;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
public function __destruct() {
if ($this->id !== null) {
$this->close();
}
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
public function path(): string {
return $this->path;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
2016-11-14 23:17:19 -06:00
public function close(): Promise {
2016-08-30 14:05:14 -05:00
$this->open = false;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
if ($this->worker->isRunning()) {
2016-11-14 23:17:19 -06:00
$promise = $this->worker->enqueue(new Internal\FileTask('fclose', [], $this->id));
2016-08-30 14:05:14 -05:00
$this->id = null;
2016-11-14 23:17:19 -06:00
return $promise;
2016-08-30 14:05:14 -05:00
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
return new Success;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
public function eof(): bool {
return ($this->pendingWrites > 0) ? false : ($this->size <= $this->position);
}
2017-01-11 14:22:06 +01:00
2016-11-14 23:17:19 -06:00
public function read(int $length): Promise {
2016-08-30 14:05:14 -05:00
if ($this->id === null) {
throw new \Error("The file has been closed");
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
return new Coroutine($this->doRead($length));
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
private function doRead(int $length): \Generator {
try {
$data = yield $this->worker->enqueue(new Internal\FileTask('fread', [$length], $this->id));
} catch (TaskException $exception) {
throw new FilesystemException("Reading from the file failed", $exception);
} catch (WorkerException $exception) {
throw new FilesystemException("Sending the task to the worker failed", $exception);
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
$this->position += \strlen($data);
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
return $data;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
2016-11-14 23:17:19 -06:00
public function write(string $data): Promise {
2016-08-30 14:05:14 -05:00
if ($this->id === null) {
throw new \Error("The file has been closed");
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
return new Coroutine($this->doWrite($data));
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
private function doWrite(string $data): \Generator {
++$this->pendingWrites;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
try {
$length = yield $this->worker->enqueue(new Internal\FileTask('fwrite', [$data], $this->id));
} catch (TaskException $exception) {
throw new FilesystemException("Writing to the file failed", $exception);
} catch (WorkerException $exception) {
throw new FilesystemException("Sending the task to the worker failed", $exception);
} finally {
--$this->pendingWrites;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
$this->position += $length;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
return $length;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
2016-11-14 23:17:19 -06:00
public function seek(int $offset, int $whence = SEEK_SET): Promise {
2016-08-30 14:05:14 -05:00
if ($this->id === null) {
throw new \Error("The file has been closed");
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
return new Coroutine($this->doSeek($offset, $whence));
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
private function doSeek(int $offset, int $whence) {
switch ($whence) {
case \SEEK_SET:
case \SEEK_CUR:
case \SEEK_END:
try {
$this->position = yield $this->worker->enqueue(
new Internal\FileTask('fseek', [$offset, $whence], $this->id)
);
} catch (TaskException $exception) {
throw new FilesystemException('Seeking in the file failed.', $exception);
} catch (WorkerException $exception) {
throw new FilesystemException("Sending the task to the worker failed", $exception);
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
if ($this->position > $this->size) {
$this->size = $this->position;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
return $this->position;
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
default:
throw new \Error('Invalid whence value. Use SEEK_SET, SEEK_CUR, or SEEK_END.');
}
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
public function tell(): int {
return $this->position;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
public function size(): int {
return $this->size;
}
2017-01-11 14:22:06 +01:00
2016-08-30 14:05:14 -05:00
/**
* {@inheritdoc}
*/
public function mode(): string {
return $this->mode;
}
}