1
0
mirror of https://github.com/danog/file.git synced 2024-11-30 04:19:39 +01:00

Implement stream interfaces in Handle

This commit is contained in:
Aaron Piotrowski 2017-05-12 15:43:23 -05:00
parent b8c3321533
commit 9b8b1daeb0
6 changed files with 65 additions and 13 deletions

View File

@ -26,7 +26,8 @@
}
],
"require": {
"amphp/amp": "dev-master as 2.0"
"amphp/amp": "^2.0",
"amphp/byte-stream": "dev-master as 0.1"
},
"minimum-stability": "dev",
"require-dev": {

View File

@ -29,14 +29,14 @@ class BlockingHandle implements Handle {
/**
* {@inheritdoc}
*/
public function read(int $length): Promise {
public function read(int $length = self::DEFAULT_READ_LENGTH): Promise {
if ($this->fh === null) {
throw new \Error("The file has been closed");
}
$data = \fread($this->fh, $length);
if ($data !== false) {
return new Success($data);
return new Success(\strlen($data) ? $data : null);
} else {
return new Failure(new FilesystemException(
"Failed reading from file handle"
@ -62,6 +62,15 @@ class BlockingHandle implements Handle {
}
}
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
/**
* {@inheritdoc}
*/

View File

@ -30,7 +30,7 @@ class EioHandle implements Handle {
/**
* {@inheritdoc}
*/
public function read(int $length): Promise {
public function read(int $length = self::DEFAULT_READ_LENGTH): Promise {
$deferred = new Deferred;
$op = new \StdClass;
$op->type = self::OP_READ;
@ -74,8 +74,9 @@ class EioHandle implements Handle {
\eio_get_last_error($req)
));
} else {
$this->position = $op->position + \strlen($result);
$op->deferred->resolve($result);
$length = \strlen($result);
$this->position = $op->position + $length;
$op->deferred->resolve($length ? $result : null);
}
if ($this->queue) {
$this->dequeue();
@ -104,6 +105,15 @@ class EioHandle implements Handle {
return $deferred->promise();
}
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
private function onWrite($op, $result, $req) {
$this->isActive = false;
($this->incrementor)(-1);

View File

@ -2,16 +2,20 @@
namespace Amp\File;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Promise;
interface Handle {
interface Handle extends InputStream, OutputStream {
const DEFAULT_READ_LENGTH = 8192;
/**
* Read $len bytes from the open file handle starting at $offset
*
* @param int $length
* @return \Amp\Promise<string>
* @return \Amp\Promise<string|null>
*/
public function read(int $length): Promise;
public function read(int $length = 8192): Promise;
/**
* Write $data to the open file handle starting at $offset
@ -21,6 +25,15 @@ interface Handle {
*/
public function write(string $data): Promise;
/**
* Write $data to the open file handle and close the handle once the write completes.
*
* @param string $data
*
* @return \Amp\Promise<int>
*/
public function end(string $data = ""): Promise;
/**
* Close the file handle
*

View File

@ -80,7 +80,7 @@ class ParallelHandle implements Handle {
return ($this->pendingWrites > 0) ? false : ($this->size <= $this->position);
}
public function read(int $length): Promise {
public function read(int $length = self::DEFAULT_READ_LENGTH): Promise {
if ($this->id === null) {
throw new \Error("The file has been closed");
}
@ -113,6 +113,15 @@ class ParallelHandle implements Handle {
return new Coroutine($this->doWrite($data));
}
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
private function doWrite(string $data): \Generator {
++$this->pendingWrites;

View File

@ -40,7 +40,7 @@ class UvHandle implements Handle {
/**
* {@inheritdoc}
*/
public function read(int $readLen): Promise {
public function read(int $readLen = self::DEFAULT_READ_LENGTH): Promise {
$deferred = new Deferred;
$op = new \StdClass;
$op->type = self::OP_READ;
@ -78,6 +78,15 @@ class UvHandle implements Handle {
return $deferred->promise();
}
/**
* {@inheritdoc}
*/
public function end(string $data = ""): Promise {
$promise = $this->write($data);
$promise->onResolve([$this, "close"]);
return $promise;
}
private function doRead($op) {
$this->driver->reference($this->busy);
$onRead = function ($fh, $result, $buffer) use ($op) {
@ -88,8 +97,9 @@ class UvHandle implements Handle {
\uv_strerror($result)
));
} else {
$this->position = $op->position + strlen($buffer);
$op->promisor->resolve($buffer);
$length = strlen($buffer);
$this->position = $op->position + $length;
$op->promisor->resolve($length ? $buffer : null);
}
if ($this->queue) {
$this->dequeue();