1
0
mirror of https://github.com/danog/file.git synced 2024-11-29 20:09:10 +01:00

Stream file handle support

This commit is contained in:
Daniel Lowrey 2015-08-12 19:02:41 -04:00
parent e5ba3490f8
commit ab05daa5a5
15 changed files with 1088 additions and 29 deletions

View File

@ -7,6 +7,19 @@ use Amp\Success;
use Amp\Failure;
class BlockingDriver implements Driver {
/**
* {@inheritdoc}
*/
public function open($path, $mode) {
if (!$fh = \fopen($path, $mode)) {
return new Failure(new FilesystemException(
"Failed opening file handle"
));
}
return new Success(new BlockingHandle($fh, $path, $mode));
}
/**
* {@inheritdoc}
*/

108
lib/BlockingHandle.php Normal file
View File

@ -0,0 +1,108 @@
<?php
namespace Amp\File;
use Amp\Promise;
use Amp\Success;
use Amp\Failure;
class BlockingHandle implements Handle {
private $fh;
private $path;
private $mode;
/**
* @param resource $fh An open uv filesystem descriptor
*/
public function __construct($fh, $path, $mode) {
$this->fh = $fh;
$this->path = $path;
$this->mode = $mode;
}
/**
* {@inheritdoc}
*/
public function read($len) {
$data = \fread($this->fh, $len);
if ($data !== false) {
return new Success($data);
} else {
return new Failure(new \RuntimeException(
"Failed reading from file handle"
));
}
}
/**
* {@inheritdoc}
*/
public function write($data) {
$len = \fwrite($this->fh, $data);
if ($len !== false) {
return new Success($data);
} else {
return new Failure(new \RuntimeException(
"Failed writing to file handle"
));
}
}
/**
* {@inheritdoc}
*/
public function close() {
if (\fclose($this->fh)) {
return new Success;
} else {
return new Failure(new \RuntimeException(
"Failed closing file handle"
));
}
}
/**
* {@inheritdoc}
*/
public function seek($position, $whence = \SEEK_SET) {
switch ($whence) {
case \SEEK_SET:
case \SEEK_CUR:
case \SEEK_END:
\fseek($this->fh, $position, $whence);
return;
default:
throw new FilesystemException(
"Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected"
);
}
}
/**
* {@inheritdoc}
*/
public function tell() {
return \ftell($this->fh);
}
/**
* {@inheritdoc}
*/
public function eof() {
return \feof($this->fh);
}
/**
* {@inheritdoc}
*/
public function path() {
return $this->path;
}
/**
* {@inheritdoc}
*/
public function mode() {
return $this->mode;
}
}

View File

@ -3,6 +3,15 @@
namespace Amp\File;
interface Driver {
/**
* Open a handle for the specified path
*
* @param string $path
* @param string $mode
* @return \Amp\File\Handle
*/
public function open($path, $mode);
/**
* Execute a file stat operation
*

View File

@ -9,8 +9,9 @@ use Amp\Deferred;
class EioDriver implements Driver {
private $watcher;
private $callableDelReq;
private $pending = 0;
private $incrementor;
private $callableDecrementor;
private static $stream;
/**
@ -24,8 +25,25 @@ class EioDriver implements Driver {
\eio_init();
self::$stream = \eio_get_event_stream();
}
$this->callableDelReq = function() {
$this->decrementPending();
$this->callableDecrementor = function() {
\call_user_func($this->incrementor, -1);
};
$this->incrementor = function ($increment) {
switch ($increment) {
case 1:
case -1:
$this->pending += $increment;
break;
default:
throw new FilesystemException(
"Invalid pending event increment; 1 or -1 required"
);
}
if ($this->pending === 0) {
\Amp\disable($this->watcher);
} elseif ($this->pending === 1) {
\Amp\enable($this->watcher);
}
};
$this->watcher = \Amp\onReadable(self::$stream, function() {
while (\eio_npending()) {
@ -34,15 +52,74 @@ class EioDriver implements Driver {
}, $options = ["enable" => false]);
}
private function incrementPending() {
if ($this->pending++ === 0) {
\Amp\enable($this->watcher);
/**
* {@inheritdoc}
*/
public function open($path, $mode) {
switch ($mode) {
case "r": $flags = \EIO_O_RDONLY; break;
case "r+": $flags = \EIO_O_RDWR; break;
case "w": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break;
case "w+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break;
case "a": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_APPEND; break;
case "a+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_APPEND; break;
case "x": $flags = \EIO_O_WRONLY | \EIO_O_CREAT | \EIO_O_EXCL; break;
case "x+": $flags = \EIO_O_RDWR | \EIO_O_CREAT | \EIO_O_EXCL; break;
case "c": $flags = \EIO_O_WRONLY | \EIO_O_CREAT; break;
case "c+": $flags = \EIO_O_RDWR | \EIO_O_CREAT; break;
default: return new Failure(new FilesystemException(
"Invalid open mode"
));
}
$chmod = ($flags & \EIO_O_CREAT) ? 0644 : 0;
\call_user_func($this->incrementor, 1);
$promisor = new Deferred;
$openArr = [$mode, $path, $promisor];
\eio_open($path, $flags, $chmod, $priority = null, [$this, "onOpenHandle"], $openArr);
return $promisor->promise();
}
private function onOpenHandle($openArr, $result, $req) {
list($mode, $path, $promisor) = $openArr;
if ($result === -1) {
\call_user_func($this->incrementor, -1);
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
));
} elseif ($mode[0] === "a") {
\array_unshift($openArr, $result);
\eio_ftruncate($result, $offset = 0, $priority = null, [$this, "onOpenFtruncate"], $openArr);
} else {
\array_unshift($openArr, $result);
\eio_fstat($result, $priority = null, [$this, "onOpenFstat"], $openArr);
}
}
private function decrementPending() {
if ($this->pending-- === 1) {
\Amp\disable($this->watcher);
private function onOpenFtruncate($openArr, $result, $req) {
\call_user_func($this->incrementor, -1);
list($fh, $mode, $path, $promisor) = $openArr;
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
));
} else {
$handle = new EioHandle($this->incrementor, $fh, $path, $mode, $size = 0);
$promisor->succeed($handle);
}
}
private function onOpenFstat($openArr, $result, $req) {
\call_user_func($this->incrementor, -1);
list($fh, $mode, $path, $promisor) = $openArr;
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
));
} else {
StatCache::set($path, $result);
$handle = new EioHandle($this->incrementor, $fh, $path, $mode, $result["size"]);
$promisor->succeed($handle);
}
}
@ -54,7 +131,7 @@ class EioDriver implements Driver {
return new Success($stat);
}
$this->incrementPending();
\call_user_func($this->incrementor, 1);
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
$data = [$promisor, $path];
@ -65,7 +142,7 @@ class EioDriver implements Driver {
private function onStat($data, $result, $req) {
list($promisor, $path) = $data;
$this->decrementPending();
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$promisor->succeed(null);
} else {
@ -198,7 +275,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function lstat($path) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
\eio_lstat($path, $priority, [$this, "onLstat"], $promisor);
@ -207,7 +284,7 @@ class EioDriver implements Driver {
}
private function onLstat($promisor, $result, $req) {
$this->decrementPending();
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$promisor->succeed(null);
} else {
@ -219,7 +296,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function symlink($target, $link) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
\eio_symlink($target, $link, $priority, [$this, "onGenericResult"], $promisor);
@ -228,7 +305,7 @@ class EioDriver implements Driver {
}
private function onGenericResult($promisor, $result, $req) {
$this->decrementPending();
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
@ -242,7 +319,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function rename($from, $to) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
\eio_rename($from, $to, $priority, [$this, "onGenericResult"], $promisor);
@ -254,7 +331,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function unlink($path) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
$data = [$promisor, $path];
@ -265,7 +342,7 @@ class EioDriver implements Driver {
private function onUnlink($data, $result, $req) {
list($promisor, $path) = $data;
$this->decrementPending();
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
@ -280,7 +357,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function mkdir($path, $mode = 0644) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
\eio_mkdir($path, $mode, $priority, [$this, "onGenericResult"], $promisor);
@ -292,7 +369,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function rmdir($path) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
$data = [$promisor, $path];
@ -303,7 +380,7 @@ class EioDriver implements Driver {
private function onRmdir($data, $result, $req) {
list($promisor, $path) = $data;
$this->decrementPending();
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
@ -318,7 +395,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function scandir($path) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$flags = \EIO_READDIR_STAT_ORDER | \EIO_READDIR_DIRS_FIRST;
$priority = \EIO_PRI_DEFAULT;
@ -328,7 +405,7 @@ class EioDriver implements Driver {
}
private function onScandir($promisor, $result, $req) {
$this->decrementPending();
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
@ -342,7 +419,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function chmod($path, $mode) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
\eio_chmod($path, $mode, $priority, [$this, "onGenericResult"], $promisor);
@ -354,7 +431,7 @@ class EioDriver implements Driver {
* {@inheritdoc}
*/
public function chown($path, $uid, $gid) {
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$priority = \EIO_PRI_DEFAULT;
\eio_chown($path, $uid, $gid, $priority, [$this, "onGenericResult"], $promisor);
@ -382,7 +459,7 @@ class EioDriver implements Driver {
$mode = 0;
$priority = \EIO_PRI_DEFAULT;
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
\eio_open($path, $flags, $mode, $priority, [$this, "onGetOpen"], $promisor);
@ -418,7 +495,7 @@ class EioDriver implements Driver {
private function onGetRead($fhAndPromisor, $result, $req) {
list($fh, $promisor) = $fhAndPromisor;
$priority = \EIO_PRI_DEFAULT;
\eio_close($fh, $priority, $this->callableDelReq);
\eio_close($fh, $priority, $this->callableDecrementor);
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
@ -436,7 +513,7 @@ class EioDriver implements Driver {
$mode = \EIO_S_IRUSR | \EIO_S_IWUSR | \EIO_S_IXUSR;
$priority = \EIO_PRI_DEFAULT;
$this->incrementPending();
\call_user_func($this->incrementor, 1);;
$promisor = new Deferred;
$data = [$contents, $promisor];
\eio_open($path, $flags, $mode, $priority, [$this, "onPutOpen"], $data);
@ -464,7 +541,7 @@ class EioDriver implements Driver {
list($fh, $promisor) = $fhAndPromisor;
\eio_close($fh);
$priority = \EIO_PRI_DEFAULT;
\eio_close($fh, $priority, $this->callableDelReq);
\eio_close($fh, $priority, $this->callableDecrementor);
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)

201
lib/EioHandle.php Normal file
View File

@ -0,0 +1,201 @@
<?php
namespace Amp\File;
use Amp\Promise;
use Amp\Deferred;
class EioHandle implements Handle {
const OP_READ = 1;
const OP_WRITE = 2;
private $incrementor;
private $fh;
private $path;
private $mode;
private $size;
private $position;
private $queue = [];
private $pendingWriteOps = 0;
private $isActive = false;
public function __construct(callable $incrementor, $fh, $path, $mode, $size) {
$this->incrementor = $incrementor;
$this->fh = $fh;
$this->path = $path;
$this->mode = $mode;
$this->size = $size;
$this->position = ($mode[0] === "a") ? $size : 0;
}
/**
* {@inheritdoc}
*/
public function read($len) {
$promisor = new Deferred;
$op = new \StdClass;
$op->type = self::OP_READ;
$op->position = $this->position;
$op->promisor = $promisor;
$remaining = $this->size - $this->position;
$op->readLen = ($len > $remaining) ? $remaining : $len;
if ($this->isActive) {
$this->queue[] = $op;
} else {
\call_user_func($this->incrementor, 1);
$this->isActive = true;
\eio_read($this->fh, $op->readLen, $op->position, $priority = null, [$this, "onRead"], $op);
}
return $promisor->promise();
}
private function dequeue() {
$this->isActive = true;
$op = \array_shift($this->queue);
switch ($op->type) {
case self::OP_READ:
\call_user_func($this->incrementor, 1);
$this->isActive = true;
\eio_read($this->fh, $op->readLen, $op->position, $priority = null, [$this, "onRead"], $op);
break;
case self::OP_WRITE:
\call_user_func($this->incrementor, 1);
$this->isActive = true;
\eio_write($this->fh, $op->writeData, \strlen($op->writeData), $op->position, $priority = null, [$this, "onWrite"], $op);
break;
}
}
private function onRead($op, $result, $req) {
$this->isActive = false;
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$op->promisor->fail(new FilesystemException(
\eio_get_last_error($req)
));
} else {
$this->position = $op->position + \strlen($result);
$op->promisor->succeed($result);
}
if ($this->queue) {
$this->dequeue();
}
}
/**
* {@inheritdoc}
*/
public function write($data) {
$promisor = new Deferred;
$op = new \StdClass;
$op->type = self::OP_WRITE;
$op->position = $this->position;
$op->promisor = $promisor;
$op->writeData = $data;
$this->pendingWriteOps++;
if ($this->isActive) {
$this->queue[] = $op;
} else {
\call_user_func($this->incrementor, 1);
$this->isActive = true;
\eio_write($this->fh, $data, strlen($data), $op->position, $priority = null, [$this, "onWrite"], $op);
}
return $promisor->promise();
}
private function onWrite($op, $result, $req) {
$this->isActive = false;
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$op->promisor->fail(new FilesystemException(
\eio_get_last_error($req)
));
} else {
StatCache::clear($this->path);
$bytesWritten = \strlen($op->writeData);
$this->pendingWriteOps--;
$newPosition = $op->position + $bytesWritten;
$delta = $newPosition - $this->position;
$this->position = ($this->mode[0] === "a") ? $this->position : $newPosition;
$this->size += $delta;
$op->promisor->succeed($result);
}
if ($this->queue) {
$this->dequeue();
}
}
/**
* {@inheritdoc}
*/
public function close() {
\call_user_func($this->incrementor, 1);
$promisor = new Deferred;
\eio_close($this->fh, $priority = null, [$this, "onClose"], $promisor);
return $promisor->promise();
}
private function onClose($promisor, $result, $req) {
\call_user_func($this->incrementor, -1);
if ($result === -1) {
$promisor->fail(new FilesystemException(
\eio_get_last_error($req)
));
} else {
$promisor->succeed();
}
}
/**
* {@inheritdoc}
*/
public function seek($offset, $whence = \SEEK_SET) {
$offset = (int) $offset;
switch ($whence) {
case \SEEK_SET:
$this->position = $offset;
break;
case \SEEK_CUR:
$this->position = $this->position + $offset;
break;
case \SEEK_END:
$this->position = $this->size + $offset;
break;
default:
throw new FilesystemException(
"Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected"
);
}
}
/**
* {@inheritdoc}
*/
public function tell() {
return $this->position;
}
/**
* {@inheritdoc}
*/
public function eof() {
return ($this->pendingWriteOps > 0) ? false : ($this->size <= $this->position);
}
/**
* {@inheritdoc}
*/
public function path() {
return $this->path;
}
/**
* {@inheritdoc}
*/
public function mode() {
return $this->mode;
}
}

76
lib/Handle.php Normal file
View File

@ -0,0 +1,76 @@
<?php
namespace Amp\File;
interface Handle {
/**
* Read $len bytes from the open file handle starting at $offset
*
* @param int $offset
* @param int $len
* @return \Amp\Promise
*/
public function read($len);
/**
* Write $data to the open file handle starting at $offset
*
* @param int $offset
* @param string $data
* @return \Amp\Promise
*/
public function write($data);
/**
* Close the file handle
*
* Applications are not required to manually close handles -- they will
* be unloaded automatically when the object is garbage collected.
*
* @return \Amp\Promise
*/
public function close();
/**
* Set the handle's internal pointer position
*
* $whence values:
*
* SEEK_SET - Set position equal to offset bytes.
* SEEK_CUR - Set position to current location plus offset.
* SEEK_END - Set position to end-of-file plus offset.
*
* @param int $position
* @param int $whence
* @return void
*/
public function seek($position, $whence = \SEEK_SET);
/**
* Return the current internal offset position of the file handle
*
* @return int
*/
public function tell();
/**
* Test for "end-of-file" on the file handle
*
* @return bool
*/
public function eof();
/**
* Retrieve the path used when opening the file handle
*
* @return string
*/
public function path();
/**
* Retrieve the mode used when opening the file handle
*
* @return string
*/
public function mode();
}

View File

@ -19,6 +19,77 @@ class UvDriver implements Driver {
$this->loop = $this->reactor->getLoop();
}
/**
* {@inheritdoc}
*/
public function open($path, $mode) {
switch ($mode) {
case "r": $flags = \UV::O_RDONLY; break;
case "r+": $flags = \UV::O_RDWR; break;
case "w": $flags = \UV::O_WRONLY | \UV::O_CREAT; break;
case "w+": $flags = \UV::O_RDWR | \UV::O_CREAT; break;
case "a": $flags = \UV::O_WRONLY | \UV::O_CREAT | \UV::O_APPEND; break;
case "a+": $flags = \UV::O_RDWR | \UV::O_CREAT | \UV::O_APPEND; break;
case "x": $flags = \UV::O_WRONLY | \UV::O_CREAT | \UV::O_EXCL; break;
case "x+": $flags = \UV::O_RDWR | \UV::O_CREAT | \UV::O_EXCL; break;
case "c": $flags = \UV::O_WRONLY | \UV::O_CREAT; break;
case "c+": $flags = \UV::O_RDWR | \UV::O_CREAT; break;
default: return new Failure(new FilesystemException(
"Invalid open mode"
));
}
$chmod = ($flags & \UV::O_CREAT) ? 0644 : 0;
$this->reactor->addRef();
$promisor = new Deferred;
$openArr = [$mode, $path, $promisor];
\uv_fs_open($this->loop, $path, $flags, $chmod, function($fh) use ($openArr) {
if ($fh) {
$this->onOpenHandle($fh, $openArr);
} else {
$this->reactor->delRef();
$promisor->fail(new \RuntimeException(
"Failed opening file handle"
));
}
});
return $promisor->promise();
}
private function onOpenHandle($fh, array $openArr) {
list($mode) = $openArr;
if ($mode[0] === "w") {
\uv_fs_ftruncate($this->loop, $fh, $length = 0, function($fh) use ($openArr) {
$this->reactor->delRef();
if ($fh) {
$this->finalizeHandle($fh, $size = 0, $openArr);
} else {
$promisor->fail(new FilesystemException(
"Failed truncating file"
));
}
});
} else {
\uv_fs_fstat($this->loop, $fh, function($fh, $stat) use ($openArr) {
$this->reactor->delRef();
if ($fh) {
StatCache::set($openArr[1], $stat);
$this->finalizeHandle($fh, $stat["size"], $openArr);
} else {
$promisor->fail(new FilesystemException(
"Failed reading file size from open handle"
));
}
});
}
}
private function finalizeHandle($fh, $size, array $openArr) {
list($mode, $path, $promisor) = $openArr;
$handle = new UvHandle($this->reactor, $fh, $path, $mode, $size);
$promisor->succeed($handle);
}
/**
* {@inheritdoc}
*/

201
lib/UvHandle.php Normal file
View File

@ -0,0 +1,201 @@
<?php
namespace Amp\File;
use Amp\Promise;
use Amp\Deferred;
use Amp\UvReactor;
class UvHandle implements Handle {
const OP_READ = 1;
const OP_WRITE = 2;
private $reactor;
private $fh;
private $path;
private $mode;
private $size;
private $loop;
private $position;
private $queue = [];
private $pendingWriteOps = 0;
private $isActive = false;
private $isCloseInitialized = false;
public function __construct(UvReactor $reactor, $fh, $path, $mode, $size) {
$this->reactor = $reactor;
$this->fh = $fh;
$this->path = $path;
$this->mode = $mode;
$this->size = $size;
$this->loop = $reactor->getLoop();
$this->position = ($mode[0] === "a") ? $size : 0;
}
/**
* {@inheritdoc}
*/
public function read($readLen) {
$promisor = new Deferred;
$op = new \StdClass;
$op->type = self::OP_READ;
$op->position = $this->position;
$op->promisor = $promisor;
$op->readLen = $readLen - 1;
if ($this->isActive) {
$this->queue[] = $op;
} else {
$this->isActive = true;
$this->doRead($op);
}
return $promisor->promise();
}
/**
* {@inheritdoc}
*/
public function write($writeData) {
$this->pendingWriteOps++;
$promisor = new Deferred;
$op = new \StdClass;
$op->type = self::OP_WRITE;
$op->position = $this->position;
$op->promisor = $promisor;
$op->writeData = $writeData;
if ($this->isActive) {
$this->queue[] = $op;
} else {
$this->isActive = true;
$this->doWrite($op);
}
return $promisor->promise();
}
private function doRead($op) {
$this->reactor->addRef();
$onRead = function ($fh, $result, $buffer) use ($op) {
$this->isActive = false;
$this->reactor->delRef();
if ($result < 0) {
$op->promisor->fail(new FilesystemException(
\uv_strerror($result)
));
} else {
$this->position = $op->position + strlen($buffer);
$op->promisor->succeed($buffer);
}
if ($this->queue) {
$this->dequeue();
}
};
\uv_fs_read($this->loop, $this->fh, $op->position, $op->readLen, $onRead);
}
private function doWrite($op) {
$this->reactor->addRef();
$onWrite = function ($fh, $result) use ($op) {
$this->isActive = false;
$this->reactor->delRef();
if ($result < 0) {
$op->promisor->fail(new FilesystemException(
\uv_strerror($result)
));
} else {
StatCache::clear($this->path);
$bytesWritten = \strlen($op->writeData);
$this->pendingWriteOps--;
$newPosition = $op->position + $bytesWritten;
$delta = $newPosition - $this->position;
$this->position = ($this->mode[0] === "a") ? $this->position : $newPosition;
$this->size += $delta;
$op->promisor->succeed($result);
}
if ($this->queue) {
$this->dequeue();
}
};
\uv_fs_write($this->loop, $this->fh, $op->writeData, $op->position, $onWrite);
}
private function dequeue() {
$this->isActive = true;
$op = \array_shift($this->queue);
switch ($op->type) {
case self::OP_READ: $this->doRead($op); break;
case self::OP_WRITE: $this->doWrite($op); break;
}
}
/**
* {@inheritdoc}
*/
public function seek($offset, $whence = \SEEK_SET) {
$offset = (int) $offset;
switch ($whence) {
case \SEEK_SET:
$this->position = $offset;
break;
case \SEEK_CUR:
$this->position = $this->position + $offset;
break;
case \SEEK_END:
$this->position = $this->size + $offset;
break;
default:
throw new FilesystemException(
"Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected"
);
}
}
/**
* {@inheritdoc}
*/
public function tell() {
return $this->position;
}
/**
* {@inheritdoc}
*/
public function eof() {
return ($this->pendingWriteOps > 0) ? false : ($this->size <= $this->position);
}
/**
* {@inheritdoc}
*/
public function path() {
return $this->path;
}
/**
* {@inheritdoc}
*/
public function mode() {
return $this->mode;
}
/**
* {@inheritdoc}
*/
public function close() {
$this->isCloseInitialized = true;
$this->reactor->addRef();
$promisor = new Deferred;
\uv_fs_close($this->loop, $this->fh, function($fh) use ($promisor) {
$this->reactor->delRef();
$promisor->succeed();
});
return $promisor->promise();
}
public function __destruct() {
if (empty($this->isCloseInitialized)) {
$this->close();
}
}
}

View File

@ -35,6 +35,17 @@ function driver() {
}
}
/**
* Open a handle for the specified path
*
* @param string $path
* @param string $mode
* @return \Amp\File\Handle
*/
function open($path, $mode) {
return filesystem()->open($path, $mode);
}
/**
* Execute a file stat operation
*

View File

@ -0,0 +1,16 @@
<?php
namespace Amp\File\Test;
use Amp as amp;
use Amp\File as file;
class BlockingHandleTest extends HandleTest {
protected function setUp() {
$reactor = new amp\NativeReactor;
amp\reactor($reactor);
$driver = new file\BlockingDriver($reactor);
file\filesystem($driver);
parent::setUp();
}
}

42
test/EioHandleTest.php Normal file
View File

@ -0,0 +1,42 @@
<?php
namespace Amp\File\Test;
use Amp as amp;
use Amp\File as file;
class EioHandleTest extends HandleTest {
protected function setUp() {
if (\extension_loaded("eio")) {
amp\reactor(amp\driver());
file\filesystem(new file\EioDriver);
parent::setUp();
} else {
$this->markTestSkipped(
"eio extension not loaded"
);
}
}
public function testQueuedWritesOverrideEachOtherIfNotWaitedUpon() {
amp\run(function () {
$path = Fixture::path() . "/write";
$handle = (yield file\open($path, "c+"));
$this->assertSame(0, $handle->tell());
$write1 = $handle->write("foo");
$write2 = $handle->write("bar");
yield amp\all([$write1, $write2]);
$handle->seek(0);
$contents = (yield $handle->read(8192));
$this->assertSame(3, $handle->tell());
$this->assertTrue($handle->eof());
$this->assertSame("bar", $contents);
yield $handle->close();
yield file\unlink($path);
});
}
}

47
test/Fixture.php Normal file
View File

@ -0,0 +1,47 @@
<?php
namespace Amp\File\Test;
final class Fixture {
private static $fixtureId;
public static function path() {
if (empty(self::$fixtureId)) {
self::$fixtureId = \uniqid();
}
return \sys_get_temp_dir() . "/amphp_file_fixture/" . __CLASS__ . self::$fixtureId;
}
public static function init() {
$fixtureDir = self::path();
self::clear();
if (!\mkdir($fixtureDir, $mode = 0777, $recursive = true)) {
throw new \RuntimeException(
"Failed creating temporary test fixture directory: {$fixtureDir}"
);
}
if (!\mkdir($fixtureDir . "/dir", $mode = 0777, $recursive = true)) {
throw new \RuntimeException(
"Failed creating temporary test fixture directory"
);
}
if (!\file_put_contents($fixtureDir . "/small.txt", "small")) {
throw new \RuntimeException(
"Failed creating temporary test fixture file"
);
}
}
public static function clear() {
$fixtureDir = self::path();
if (!\file_exists($fixtureDir)) {
return;
}
if (\stripos(\PHP_OS, "win") === 0) {
\system('rd /Q /S "' . $fixtureDir . '"');
} else {
\system('/bin/rm -rf ' . \escapeshellarg($fixtureDir));
}
}
}

142
test/HandleTest.php Normal file
View File

@ -0,0 +1,142 @@
<?php
namespace Amp\File\Test;
use Amp as amp;
use Amp\File as file;
abstract class HandleTest extends \PHPUnit_Framework_TestCase {
public static function setUpBeforeClass() {
Fixture::init();
}
public static function tearDownAfterClass() {
Fixture::clear();
}
protected function setUp() {
file\StatCache::clear();
}
public function testWrite() {
amp\run(function () {
$path = Fixture::path() . "/write";
$handle = (yield file\open($path, "c+"));
$this->assertSame(0, $handle->tell());
yield $handle->write("foo");
yield $handle->write("bar");
$handle->seek(0);
$contents = (yield $handle->read(8192));
$this->assertSame(6, $handle->tell());
$this->assertTrue($handle->eof());
$this->assertSame("foobar", $contents);
yield $handle->close();
yield file\unlink($path);
});
}
public function testReadingToEof() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
$contents = "";
$position = 0;
$stat = (yield file\stat(__FILE__));
$chunkSize = \floor(($stat["size"] / 5));
while (!$handle->eof()) {
$chunk = (yield $handle->read($chunkSize));
$contents .= $chunk;
$position += \strlen($chunk);
$this->assertSame($position, $handle->tell());
}
$this->assertSame((yield file\get(__FILE__)), $contents);
});
}
public function testQueuedReads() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
$contents = "";
$read1 = $handle->read(10);
$handle->seek(10);
$read2 = $handle->read(10);
$contents .= (yield $read1);
$contents .= (yield $read2);
$expected = \substr((yield file\get(__FILE__)), 0, 20);
$this->assertSame($expected, $contents);
});
}
public function testReadingFromOffset() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
$this->assertSame(0, $handle->tell());
$handle->seek(10);
$this->assertSame(10, $handle->tell());
$chunk = (yield $handle->read(90));
$this->assertSame(100, $handle->tell());
$expected = \substr((yield file\get(__FILE__)), 10, 90);
$this->assertSame($expected, $chunk);
});
}
/**
* @expectedException Amp\File\FilesystemException
* @expectedExceptionMessage Invalid whence parameter; SEEK_SET, SEEK_CUR or SEEK_END expected
*/
public function testSeekThrowsOnInvalidWhence() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
$handle->seek(0, 99999);
});
}
public function testSeekSetCur() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
$this->assertSame(0, $handle->tell());
$handle->seek(10);
$this->assertSame(10, $handle->tell());
$handle->seek(-10, \SEEK_CUR);
$this->assertSame(0, $handle->tell());
});
}
public function testSeekSetEnd() {
amp\run(function () {
$size = (yield file\size(__FILE__));
$handle = (yield file\open(__FILE__, "r"));
$this->assertSame(0, $handle->tell());
$handle->seek(-10, \SEEK_END);
$this->assertSame($size - 10, $handle->tell());
});
}
public function testPath() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
$this->assertSame(__FILE__, $handle->path());
});
}
public function testMode() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
$this->assertSame("r", $handle->mode());
});
}
public function testClose() {
amp\run(function () {
$handle = (yield file\open(__FILE__, "r"));
yield $handle->close();
});
}
}

View File

@ -9,6 +9,7 @@ class UvDriverTest extends DriverTest {
\Amp\reactor($reactor);
$driver = new \Amp\File\UvDriver($reactor);
\Amp\File\filesystem($driver);
parent::setUp();
} else {
$this->markTestSkipped(
"php-uv extension not loaded"

44
test/UvHandleTest.php Normal file
View File

@ -0,0 +1,44 @@
<?php
namespace Amp\File\Test;
use Amp as amp;
use Amp\File as file;
class UvHandleTest extends HandleTest {
protected function setUp() {
if (\extension_loaded("uv")) {
$reactor = new amp\UvReactor;
amp\reactor($reactor);
$driver = new file\UvDriver($reactor);
file\filesystem($driver);
parent::setUp();
} else {
$this->markTestSkipped(
"php-uv extension not loaded"
);
}
}
public function testQueuedWritesOverrideEachOtherIfNotWaitedUpon() {
amp\run(function () {
$path = Fixture::path() . "/write";
$handle = (yield file\open($path, "c+"));
$this->assertSame(0, $handle->tell());
$write1 = $handle->write("foo");
$write2 = $handle->write("bar");
yield amp\all([$write1, $write2]);
$handle->seek(0);
$contents = (yield $handle->read(8192));
$this->assertSame(3, $handle->tell());
$this->assertTrue($handle->eof());
$this->assertSame("bar", $contents);
yield $handle->close();
yield file\unlink($path);
});
}
}