1
0
mirror of https://github.com/danog/file.git synced 2024-11-30 04:19:39 +01:00

Merge pull request #12 from amphp/streams

Streams
This commit is contained in:
Niklas Keller 2017-05-16 19:11:47 +02:00 committed by GitHub
commit 64249c2c62
7 changed files with 90 additions and 27 deletions

View File

@ -26,7 +26,8 @@
} }
], ],
"require": { "require": {
"amphp/amp": "dev-master as 2.0" "amphp/amp": "^2.0",
"amphp/byte-stream": "dev-master as 0.1"
}, },
"minimum-stability": "dev", "minimum-stability": "dev",
"require-dev": { "require-dev": {

View File

@ -29,14 +29,14 @@ class BlockingHandle implements Handle {
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function read(int $length): Promise { public function read(int $length = self::DEFAULT_READ_LENGTH): Promise {
if ($this->fh === null) { if ($this->fh === null) {
throw new \Error("The file has been closed"); throw new \Error("The file has been closed");
} }
$data = \fread($this->fh, $length); $data = \fread($this->fh, $length);
if ($data !== false) { if ($data !== false) {
return new Success($data); return new Success(\strlen($data) ? $data : null);
} else { } else {
return new Failure(new FilesystemException( return new Failure(new FilesystemException(
"Failed reading from file handle" "Failed reading from file handle"
@ -62,6 +62,15 @@ class BlockingHandle implements Handle {
} }
} }
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */

View File

@ -54,21 +54,12 @@ class EioDriver implements Driver {
* {@inheritdoc} * {@inheritdoc}
*/ */
public function open(string $path, string $mode): Promise { public function open(string $path, string $mode): Promise {
switch ($mode) { try {
case "r": $flags = \EIO_O_RDONLY; break; $flags = $this->parseMode($mode);
case "r+": $flags = \EIO_O_RDWR; break; } catch (\Throwable $exception) {
case "w": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break; return new Failure($exception);
case "w+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break;
case "a": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_APPEND; break;
case "a+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_APPEND; break;
case "x": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_EXCL; break;
case "x+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_EXCL; break;
case "c": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break;
case "c+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break;
default: return new Failure(new FilesystemException(
"Invalid open mode"
));
} }
$chmod = ($flags & \EIO_O_CREAT) ? 0644 : 0; $chmod = ($flags & \EIO_O_CREAT) ? 0644 : 0;
($this->incrementor)(1); ($this->incrementor)(1);
$deferred = new Deferred; $deferred = new Deferred;
@ -78,6 +69,26 @@ class EioDriver implements Driver {
return $deferred->promise(); return $deferred->promise();
} }
private function parseMode(string $mode): int {
$mode = \str_replace(['b', 't'], '', $mode);
switch ($mode) {
case 'r': return \EIO_O_RDONLY;
case 'r+': return \EIO_O_RDWR;
case 'w': return \EIO_O_WRONLY | \EIO_O_TRUNC | \EIO_O_CREAT;
case 'w+': return \EIO_O_RDWR | \EIO_O_TRUNC | \EIO_O_CREAT;
case 'a': return \EIO_O_WRONLY | \EIO_O_APPEND | \EIO_O_CREAT;
case 'a+': return \EIO_O_RDWR | \EIO_O_APPEND | \EIO_O_CREAT;
case 'x': return \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_EXCL;
case 'x+': return \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_EXCL;
case 'c': return \EIO_O_WRONLY | \EIO_O_CREAT;
case 'c+': return \EIO_O_RDWR | \EIO_O_CREAT;
default:
throw new FilesystemException('Invalid file mode');
}
}
private function onOpenHandle($openArr, $result, $req) { private function onOpenHandle($openArr, $result, $req) {
list($mode, $path, $deferred) = $openArr; list($mode, $path, $deferred) = $openArr;
if ($result === -1) { if ($result === -1) {

View File

@ -30,7 +30,7 @@ class EioHandle implements Handle {
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function read(int $length): Promise { public function read(int $length = self::DEFAULT_READ_LENGTH): Promise {
$deferred = new Deferred; $deferred = new Deferred;
$op = new \StdClass; $op = new \StdClass;
$op->type = self::OP_READ; $op->type = self::OP_READ;
@ -74,8 +74,9 @@ class EioHandle implements Handle {
\eio_get_last_error($req) \eio_get_last_error($req)
)); ));
} else { } else {
$this->position = $op->position + \strlen($result); $length = \strlen($result);
$op->deferred->resolve($result); $this->position = $op->position + $length;
$op->deferred->resolve($length ? $result : null);
} }
if ($this->queue) { if ($this->queue) {
$this->dequeue(); $this->dequeue();
@ -104,6 +105,15 @@ class EioHandle implements Handle {
return $deferred->promise(); return $deferred->promise();
} }
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
private function onWrite($op, $result, $req) { private function onWrite($op, $result, $req) {
$this->isActive = false; $this->isActive = false;
($this->incrementor)(-1); ($this->incrementor)(-1);

View File

@ -2,16 +2,20 @@
namespace Amp\File; namespace Amp\File;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Promise; use Amp\Promise;
interface Handle { interface Handle extends InputStream, OutputStream {
const DEFAULT_READ_LENGTH = 8192;
/** /**
* Read $len bytes from the open file handle starting at $offset * Read $len bytes from the open file handle starting at $offset
* *
* @param int $length * @param int $length
* @return \Amp\Promise<string> * @return \Amp\Promise<string|null>
*/ */
public function read(int $length): Promise; public function read(int $length = 8192): Promise;
/** /**
* Write $data to the open file handle starting at $offset * Write $data to the open file handle starting at $offset
@ -21,6 +25,15 @@ interface Handle {
*/ */
public function write(string $data): Promise; public function write(string $data): Promise;
/**
* Write $data to the open file handle and close the handle once the write completes.
*
* @param string $data
*
* @return \Amp\Promise<int>
*/
public function end(string $data = ""): Promise;
/** /**
* Close the file handle * Close the file handle
* *

View File

@ -80,7 +80,7 @@ class ParallelHandle implements Handle {
return ($this->pendingWrites > 0) ? false : ($this->size <= $this->position); return ($this->pendingWrites > 0) ? false : ($this->size <= $this->position);
} }
public function read(int $length): Promise { public function read(int $length = self::DEFAULT_READ_LENGTH): Promise {
if ($this->id === null) { if ($this->id === null) {
throw new \Error("The file has been closed"); throw new \Error("The file has been closed");
} }
@ -113,6 +113,15 @@ class ParallelHandle implements Handle {
return new Coroutine($this->doWrite($data)); return new Coroutine($this->doWrite($data));
} }
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
private function doWrite(string $data): \Generator { private function doWrite(string $data): \Generator {
++$this->pendingWrites; ++$this->pendingWrites;

View File

@ -40,7 +40,7 @@ class UvHandle implements Handle {
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
public function read(int $readLen): Promise { public function read(int $readLen = self::DEFAULT_READ_LENGTH): Promise {
$deferred = new Deferred; $deferred = new Deferred;
$op = new \StdClass; $op = new \StdClass;
$op->type = self::OP_READ; $op->type = self::OP_READ;
@ -78,6 +78,15 @@ class UvHandle implements Handle {
return $deferred->promise(); return $deferred->promise();
} }
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
private function doRead($op) { private function doRead($op) {
$this->driver->reference($this->busy); $this->driver->reference($this->busy);
$onRead = function ($fh, $result, $buffer) use ($op) { $onRead = function ($fh, $result, $buffer) use ($op) {
@ -88,8 +97,9 @@ class UvHandle implements Handle {
\uv_strerror($result) \uv_strerror($result)
)); ));
} else { } else {
$this->position = $op->position + strlen($buffer); $length = strlen($buffer);
$op->promisor->resolve($buffer); $this->position = $op->position + $length;
$op->promisor->resolve($length ? $buffer : null);
} }
if ($this->queue) { if ($this->queue) {
$this->dequeue(); $this->dequeue();