1
0
mirror of https://github.com/danog/file.git synced 2024-12-03 09:47:54 +01:00
This commit is contained in:
Daniil Gentili 2024-05-31 11:06:20 +02:00
commit 600acf5474
18 changed files with 416 additions and 139 deletions

View File

@ -11,12 +11,22 @@ jobs:
include:
- operating-system: 'ubuntu-latest'
php-version: '8.1'
extensions: uv, eio
- operating-system: 'ubuntu-latest'
php-version: '8.2'
extensions: uv, eio
- operating-system: 'ubuntu-latest'
php-version: '8.3'
extensions: uv
style-fix: none
static-analysis: none
- operating-system: 'ubuntu-latest'
php-version: '8.4'
extensions: uv
style-fix: none
static-analysis: none
- operating-system: 'windows-latest'
@ -26,7 +36,9 @@ jobs:
- operating-system: 'macos-latest'
php-version: '8.3'
extensions: uv
job-description: 'on macOS'
style-fix: none
static-analysis: none
@ -53,7 +65,7 @@ jobs:
uses: shivammathur/setup-php@v2
with:
php-version: ${{ matrix.php-version }}
extensions: eio-beta, uv-amphp/ext-uv@master
extensions: ${{ matrix.extensions }}
- name: Get Composer cache directory
id: composer-cache
@ -91,7 +103,7 @@ jobs:
env:
PHP_CS_FIXER_IGNORE_ENV: 1
run: vendor/bin/php-cs-fixer --diff --dry-run -v fix
if: runner.os != 'Windows'
if: runner.os != 'Windows' && matrix.style-fix != 'none'
- name: Install composer-require-checker
run: php -r 'file_put_contents("composer-require-checker.phar", file_get_contents("https://github.com/maglnet/ComposerRequireChecker/releases/download/3.7.0/composer-require-checker.phar"));'

View File

@ -14,6 +14,10 @@ This package can be installed as a [Composer](https://getcomposer.org/) dependen
composer require amphp/file
```
## Requirements
- PHP 8.1+
`amphp/file` works out of the box without any PHP extensions.
It uses multiple processes by default, but also comes with a blocking driver that uses PHP's blocking functions in the current process.
@ -163,7 +167,7 @@ array(13) {
## Security
If you discover any security related issues, please email [`me@kelunik.com`](mailto:me@kelunik.com) instead of using the issue tracker.
If you discover any security related issues, please use the private security issue reporter instead of using the public issue tracker.
## License

View File

@ -62,6 +62,7 @@
"EIO_S_IWUSR",
"EIO_S_IXUSR",
"UV",
"UVLoop",
"uv_fs_chmod",
"uv_fs_chown",
"uv_fs_fstat",

View File

@ -43,7 +43,7 @@
"require-dev": {
"amphp/phpunit-util": "^3",
"phpunit/phpunit": "^9",
"psalm/phar": "^5.4",
"psalm/phar": "5.22.2",
"amphp/php-cs-fixer-config": "^2"
},
"suggest": {
@ -59,11 +59,13 @@
"autoload-dev": {
"psr-4": {
"Amp\\File\\Test\\": "test",
"Amp\\Cache\\Test\\": "vendor/amphp/cache/test",
"Amp\\Sync\\": "vendor/amphp/sync/test"
}
},
"config": {
"preferred-install": {
"amphp/cache": "source",
"amphp/sync": "source"
}
},

View File

@ -48,5 +48,12 @@
<directory name="src"/>
</errorLevel>
</MissingClosureReturnType>
<RiskyTruthyFalsyComparison>
<errorLevel type="suppress">
<directory name="examples"/>
<directory name="src"/>
</errorLevel>
</RiskyTruthyFalsyComparison>
</issueHandlers>
</psalm>

View File

@ -22,7 +22,7 @@ final class ParallelFilesystemDriver implements FilesystemDriver
/** @var int Maximum number of workers to use for open files. */
private int $workerLimit;
/** @var \SplObjectStorage Worker storage. */
/** @var \SplObjectStorage<Worker, int> Worker storage. */
private \SplObjectStorage $workerStorage;
/** @var Future Pending worker request */
@ -31,11 +31,11 @@ final class ParallelFilesystemDriver implements FilesystemDriver
/**
* @param int $workerLimit Maximum number of workers to use from the pool for open files.
*/
public function __construct(WorkerPool $pool = null, int $workerLimit = self::DEFAULT_WORKER_LIMIT)
public function __construct(?WorkerPool $pool = null, int $workerLimit = self::DEFAULT_WORKER_LIMIT)
{
$this->pool = $pool ?? workerPool();
$this->workerLimit = $workerLimit;
$this->workerStorage = new \SplObjectStorage;
$this->workerStorage = new \SplObjectStorage();
$this->pendingWorker = Future::complete();
}
@ -45,8 +45,11 @@ final class ParallelFilesystemDriver implements FilesystemDriver
$workerStorage = $this->workerStorage;
$worker = new Internal\FileWorker($worker, static function (Worker $worker) use ($workerStorage): void {
\assert($workerStorage->contains($worker));
if (($workerStorage[$worker] -=1) === 0 || !$worker->isRunning()) {
if (!$workerStorage->contains($worker)) {
return;
}
if (($workerStorage[$worker] -= 1) === 0 || !$worker->isRunning()) {
$workerStorage->detach($worker);
}
});

View File

@ -1,5 +1,4 @@
<?php declare(strict_types=1);
/** @noinspection PhpComposerExtensionStubsInspection */
namespace Amp\File\Driver;
@ -10,23 +9,19 @@ use Amp\DeferredFuture;
use Amp\File\Internal;
use Amp\File\PendingOperationError;
use Amp\Future;
use Revolt\EventLoop\Driver\UvDriver as UvLoopDriver;
use Revolt\EventLoop\Driver as EventLoopDriver;
final class UvFile extends Internal\QueuedWritesFile
{
private readonly Internal\UvPoll $poll;
/** @var \UVLoop|resource */
private $eventLoopHandle;
private readonly \UVLoop $eventLoopHandle;
/** @var resource */
private $fh;
private ?Future $closing = null;
/** @var bool True if ext-uv version is < 0.3.0. */
private readonly bool $priorVersion;
private readonly DeferredFuture $onClose;
/**
@ -34,7 +29,7 @@ final class UvFile extends Internal\QueuedWritesFile
* @param resource $fh File handle.
*/
public function __construct(
UvLoopDriver $driver,
EventLoopDriver $driver,
Internal\UvPoll $poll,
$fh,
string $path,
@ -49,8 +44,6 @@ final class UvFile extends Internal\QueuedWritesFile
/** @psalm-suppress PropertyTypeCoercion */
$this->eventLoopHandle = $driver->getHandle();
$this->onClose = new DeferredFuture;
$this->priorVersion = \version_compare(\phpversion('uv'), '0.3.0', '<');
}
public function read(?Cancellation $cancellation = null, int $length = self::DEFAULT_READ_LENGTH): ?string
@ -86,16 +79,6 @@ final class UvFile extends Internal\QueuedWritesFile
$deferred->complete($length ? $buffer : null);
};
if ($this->priorVersion) {
$onRead = static function ($fh, $result, $buffer) use ($onRead): void {
if ($result < 0) {
$buffer = $result; // php-uv v0.3.0 changed the callback to put an int in $buffer on error.
}
$onRead($result, $buffer);
};
}
\uv_fs_read($this->eventLoopHandle, $this->fh, $this->position, $length, $onRead);
$id = $cancellation?->subscribe(function (\Throwable $exception) use ($deferred): void {

View File

@ -1,5 +1,4 @@
<?php declare(strict_types=1);
/** @noinspection PhpComposerExtensionStubsInspection */
namespace Amp\File\Driver;
@ -8,7 +7,6 @@ use Amp\File\FilesystemDriver;
use Amp\File\FilesystemException;
use Amp\File\Internal;
use Revolt\EventLoop\Driver as EventLoopDriver;
use Revolt\EventLoop\Driver\UvDriver as UvLoopDriver;
final class UvFilesystemDriver implements FilesystemDriver
{
@ -19,23 +17,27 @@ final class UvFilesystemDriver implements FilesystemDriver
*/
public static function isSupported(EventLoopDriver $driver): bool
{
return $driver instanceof UvLoopDriver;
$uvVersion = \phpversion('uv');
if (!$uvVersion) {
return false;
}
return \version_compare($uvVersion, '0.3.0', '>=') && $driver->getHandle() instanceof \UVLoop;
}
/** @var \UVLoop|resource Loop resource of type uv_loop or instance of \UVLoop. */
private $eventLoopHandle;
private readonly \UVLoop $eventLoopHandle;
private readonly Internal\UvPoll $poll;
/** @var bool True if ext-uv version is < 0.3.0. */
private readonly bool $priorVersion;
public function __construct(private readonly UvLoopDriver $driver)
public function __construct(private readonly EventLoopDriver $driver)
{
if (!self::isSupported($driver)) {
throw new \Error('Event loop did not return a compatible handle');
}
/** @psalm-suppress PropertyTypeCoercion */
$this->eventLoopHandle = $driver->getHandle();
$this->poll = new Internal\UvPoll($driver);
$this->priorVersion = \version_compare(\phpversion('uv'), '0.3.0', '<');
}
public function openFile(string $path, string $mode): UvFile
@ -83,16 +85,6 @@ final class UvFilesystemDriver implements FilesystemDriver
$deferred->complete($stat);
};
if ($this->priorVersion) {
$callback = static function ($fh, $stat) use ($callback): void {
if (empty($fh)) {
$stat = 0;
}
$callback($stat);
};
}
\uv_fs_stat($this->eventLoopHandle, $path, $callback);
try {
@ -107,17 +99,9 @@ final class UvFilesystemDriver implements FilesystemDriver
$deferred = new DeferredFuture;
$this->poll->listen();
if ($this->priorVersion) {
$callback = static function ($fh, $stat) use ($deferred): void {
$deferred->complete(empty($fh) ? null : $stat);
};
} else {
$callback = static function ($stat) use ($deferred): void {
$deferred->complete(\is_int($stat) ? null : $stat);
};
}
\uv_fs_lstat($this->eventLoopHandle, $path, $callback);
\uv_fs_lstat($this->eventLoopHandle, $path, static function ($stat) use ($deferred): void {
$deferred->complete(\is_int($stat) ? null : $stat);
});
try {
return $deferred->getFuture()->await();
@ -160,27 +144,14 @@ final class UvFilesystemDriver implements FilesystemDriver
$deferred = new DeferredFuture;
$this->poll->listen();
if ($this->priorVersion) {
$callback = static function ($fh, $target) use ($deferred): void {
if (!(bool) $fh) {
$deferred->error(new FilesystemException("Could not read symbolic link"));
return;
}
\uv_fs_readlink($this->eventLoopHandle, $target, static function ($target) use ($deferred): void {
if (\is_int($target)) {
$deferred->error(new FilesystemException("Could not read symbolic link"));
return;
}
$deferred->complete($target);
};
} else {
$callback = static function ($target) use ($deferred): void {
if (\is_int($target)) {
$deferred->error(new FilesystemException("Could not read symbolic link"));
return;
}
$deferred->complete($target);
};
}
\uv_fs_readlink($this->eventLoopHandle, $target, $callback);
$deferred->complete($target);
});
try {
return $deferred->getFuture()->await();
@ -297,28 +268,16 @@ final class UvFilesystemDriver implements FilesystemDriver
$deferred = new DeferredFuture;
$this->poll->listen();
if ($this->priorVersion) {
\uv_fs_readdir($this->eventLoopHandle, $path, 0, static function ($fh, $data) use ($deferred, $path): void {
if (empty($fh) && $data !== 0) {
$deferred->error(new FilesystemException("Failed reading contents from {$path}"));
} elseif ($data === 0) {
$deferred->complete([]);
} else {
$deferred->complete($data);
}
});
} else {
/** @noinspection PhpUndefinedFunctionInspection */
\uv_fs_scandir($this->eventLoopHandle, $path, static function ($data) use ($deferred, $path): void {
if (\is_int($data) && $data !== 0) {
$deferred->error(new FilesystemException("Failed reading contents from {$path}"));
} elseif ($data === 0) {
$deferred->complete([]);
} else {
$deferred->complete($data);
}
});
}
/** @noinspection PhpUndefinedFunctionInspection */
\uv_fs_scandir($this->eventLoopHandle, $path, static function ($data) use ($deferred, $path): void {
if (\is_int($data) && $data !== 0) {
$deferred->error(new FilesystemException("Failed reading contents from {$path}"));
} elseif ($data === 0) {
$deferred->complete([]);
} else {
$deferred->complete($data);
}
});
try {
return $deferred->getFuture()->await();
@ -528,42 +487,24 @@ final class UvFilesystemDriver implements FilesystemDriver
{
$deferred = new DeferredFuture;
if ($this->priorVersion) {
$callback = static function ($fileHandle, $readBytes, $buffer) use ($deferred): void {
$deferred->complete($readBytes < 0 ? null : $buffer);
};
} else {
$callback = static function ($readBytes, $buffer) use ($deferred): void {
$deferred->complete($readBytes < 0 ? null : $buffer);
};
}
$callback = static function ($readBytes, $buffer) use ($deferred): void {
$deferred->complete($readBytes < 0 ? null : $buffer);
};
\uv_fs_read($this->eventLoopHandle, $fileHandle, 0, $length, $callback);
return $deferred->getFuture()->await();
}
private function doWrite(string $path, string $contents): void
{
}
private function createGenericCallback(DeferredFuture $deferred, string $error): \Closure
{
$callback = static function (int $result) use ($deferred, $error): void {
return static function (int $result) use ($deferred, $error): void {
if ($result !== 0) {
$deferred->error(new FilesystemException($error));
return;
}
$deferred->complete(null);
$deferred->complete();
};
if ($this->priorVersion) {
$callback = static function (bool $result) use ($callback): void {
$callback($result ? 0 : -1);
};
}
return $callback;
}
}

181
src/FileCache.php Normal file
View File

@ -0,0 +1,181 @@
<?php declare(strict_types=1);
namespace Amp\File;
use Amp\Cache\CacheException;
use Amp\Cache\StringCache;
use Amp\ForbidCloning;
use Amp\ForbidSerialization;
use Amp\Sync\KeyedMutex;
use Amp\Sync\Lock;
use Revolt\EventLoop;
/**
* A cache which stores data in files in a directory.
*/
final class FileCache implements StringCache
{
use ForbidCloning;
use ForbidSerialization;
private readonly Filesystem $filesystem;
private readonly string $directory;
private ?string $gcWatcher;
public function __construct(
string $directory,
private readonly KeyedMutex $mutex,
?Filesystem $filesystem = null,
) {
$filesystem ??= filesystem();
$this->filesystem = $filesystem;
$this->directory = $directory = \rtrim($directory, "/\\");
$gcWatcher = static function () use ($directory, $mutex, $filesystem): void {
try {
$files = $filesystem->listFiles($directory);
foreach ($files as $file) {
if (\strlen($file) !== 70 || !\str_ends_with($file, '.cache')) {
continue;
}
try {
$lock = $mutex->acquire($file);
} catch (\Throwable) {
continue;
}
try {
$handle = $filesystem->openFile($directory . '/' . $file, 'r');
$ttl = $handle->read(length: 4);
if ($ttl === null || \strlen($ttl) !== 4) {
$handle->close();
continue;
}
$ttl = \unpack('Nttl', $ttl)['ttl'];
if ($ttl < \time()) {
$filesystem->deleteFile($directory . '/' . $file);
}
} catch (\Throwable) {
// ignore
} finally {
$lock->release();
}
}
} catch (\Throwable) {
// ignore
}
};
// trigger once, so short running scripts also GC and don't grow forever
EventLoop::defer($gcWatcher);
$this->gcWatcher = EventLoop::repeat(300, $gcWatcher);
EventLoop::unreference($this->gcWatcher);
}
public function __destruct()
{
if ($this->gcWatcher !== null) {
EventLoop::cancel($this->gcWatcher);
}
}
public function get(string $key): ?string
{
$filename = $this->getFilename($key);
$lock = $this->lock($filename);
try {
$cacheContent = $this->filesystem->read($this->directory . '/' . $filename);
if (\strlen($cacheContent) < 4) {
return null;
}
$ttl = \unpack('Nttl', \substr($cacheContent, 0, 4))['ttl'];
if ($ttl < \time()) {
$this->filesystem->deleteFile($this->directory . '/' . $filename);
return null;
}
$value = \substr($cacheContent, 4);
\assert(\is_string($value));
return $value;
} catch (\Throwable) {
return null;
} finally {
$lock->release();
}
}
public function set(string $key, string $value, ?int $ttl = null): void
{
if ($ttl < 0) {
throw new \Error("Invalid cache TTL ({$ttl}); integer >= 0 or null required");
}
$filename = $this->getFilename($key);
$lock = $this->lock($filename);
if ($ttl === null) {
$ttl = \PHP_INT_MAX;
} else {
$ttl = \time() + $ttl;
}
$encodedTtl = \pack('N', $ttl);
try {
$this->filesystem->write($this->directory . '/' . $filename, $encodedTtl . $value);
} finally {
$lock->release();
}
}
public function delete(string $key): ?bool
{
$filename = $this->getFilename($key);
$lock = $this->lock($filename);
try {
$this->filesystem->deleteFile($this->directory . '/' . $filename);
} catch (FilesystemException) {
return false;
} finally {
$lock->release();
}
return true;
}
private static function getFilename(string $key): string
{
return \hash('sha256', $key) . '.cache';
}
private function lock(string $key): Lock
{
try {
return $this->mutex->acquire($key);
} catch (\Throwable $exception) {
throw new CacheException(
\sprintf('Exception thrown when obtaining the lock for key "%s"', $key),
0,
$exception
);
}
}
}

View File

@ -2,33 +2,45 @@
namespace Amp\File;
use Amp\Cancellation;
use Amp\Sync\Lock;
use Amp\Sync\Mutex;
use Amp\Sync\SyncException;
use function Amp\delay;
final class FileMutex implements Mutex
{
private const LATENCY_TIMEOUT = 0.01;
private const DELAY_LIMIT = 1;
private readonly Filesystem $filesystem;
private readonly string $directory;
/**
* @param string $fileName Name of temporary file to use as a mutex.
*/
public function __construct(private readonly string $fileName)
public function __construct(private readonly string $fileName, ?Filesystem $filesystem = null)
{
$this->filesystem = $filesystem ?? filesystem();
$this->directory = \dirname($this->fileName);
}
public function acquire(): Lock
public function acquire(?Cancellation $cancellation = null): Lock
{
if (!$this->filesystem->isDirectory($this->directory)) {
throw new SyncException(\sprintf('Directory of "%s" does not exist or is not a directory', $this->fileName));
}
$f = \fopen($this->fileName, 'c');
while (true) {
if (\flock($f, LOCK_EX|LOCK_NB)) {
// Return a lock object that can be used to release the lock on the mutex.
$lock = new Lock(fn () => \flock($f, LOCK_UN));
// Try to create the lock file. If the file already exists, someone else
// has the lock, so set an asynchronous timer and try again.
for ($attempt = 0; true; ++$attempt) {
if (\flock($f, LOCK_EX|LOCK_NB)) {
$lock = new Lock(fn () => \flock($f, LOCK_UN));
return $lock;
}
delay(self::LATENCY_TIMEOUT);
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation);
}
}
}

View File

@ -4,7 +4,7 @@ namespace Amp\File;
class FilesystemException extends \Exception
{
public function __construct(string $message, \Throwable $previous = null)
public function __construct(string $message, ?\Throwable $previous = null)
{
parent::__construct($message, 0, $previous);
}

View File

@ -38,8 +38,8 @@ abstract class QueuedWritesFile implements File, \IteratorAggregate
}
$this->queue = new \SplQueue();
$this->writable = $this->mode[0] !== 'r';
$this->position = $this->mode[0] === 'a' ? $this->size : 0;
$this->writable = !\str_contains($this->mode, 'r') || \str_contains($this->mode, '+');
$this->position = \str_contains($this->mode, 'a') ? $this->size : 0;
}
public function __destruct()

View File

@ -2,7 +2,7 @@
namespace Amp\File\Internal;
use Revolt\EventLoop\Driver\UvDriver as UvLoopDriver;
use Revolt\EventLoop\Driver as EventLoopDriver;
/** @internal */
final class UvPoll
@ -11,7 +11,7 @@ final class UvPoll
private int $requests = 0;
public function __construct(private readonly UvLoopDriver $driver)
public function __construct(private readonly EventLoopDriver $driver)
{
// Create dummy watcher to keep loop running while polling.

76
src/KeyedFileMutex.php Normal file
View File

@ -0,0 +1,76 @@
<?php declare(strict_types=1);
namespace Amp\File;
use Amp\Cancellation;
use Amp\Sync\KeyedMutex;
use Amp\Sync\Lock;
use Amp\Sync\SyncException;
use function Amp\delay;
final class KeyedFileMutex implements KeyedMutex
{
private const LATENCY_TIMEOUT = 0.01;
private const DELAY_LIMIT = 1;
private readonly Filesystem $filesystem;
private readonly string $directory;
/**
* @param string $directory Directory in which to store key files.
*/
public function __construct(string $directory, ?Filesystem $filesystem = null)
{
$this->filesystem = $filesystem ?? filesystem();
$this->directory = \rtrim($directory, "/\\");
}
public function acquire(string $key, ?Cancellation $cancellation = null): Lock
{
if (!$this->filesystem->isDirectory($this->directory)) {
throw new SyncException(\sprintf('Directory "%s" does not exist or is not a directory', $this->directory));
}
$filename = $this->getFilename($key);
// Try to create the lock file. If the file already exists, someone else
// has the lock, so set an asynchronous timer and try again.
for ($attempt = 0; true; ++$attempt) {
try {
$file = $this->filesystem->openFile($filename, 'x');
// Return a lock object that can be used to release the lock on the mutex.
$lock = new Lock(fn () => $this->release($filename));
$file->close();
return $lock;
} catch (FilesystemException) {
delay(\min(self::DELAY_LIMIT, self::LATENCY_TIMEOUT * (2 ** $attempt)), cancellation: $cancellation);
}
}
}
/**
* Releases the lock on the mutex.
*
* @throws SyncException
*/
private function release(string $filename): void
{
try {
$this->filesystem->deleteFile($filename);
} catch (\Throwable $exception) {
throw new SyncException(
'Failed to unlock the mutex file: ' . $filename,
previous: $exception,
);
}
}
private function getFilename(string $key): string
{
return $this->directory . '/' . \hash('sha256', $key) . '.lock';
}
}

View File

@ -8,10 +8,12 @@ enum Whence
* Set position equal to offset bytes.
*/
case Start;
/**
* Set position to current location plus offset.
*/
case Current;
/**
* Set position to end-of-file plus offset.
*/

View File

@ -55,7 +55,6 @@ function createDefaultDriver(): FilesystemDriver
$driver = EventLoop::getDriver();
if (UvFilesystemDriver::isSupported($driver)) {
/** @var EventLoop\Driver\UvDriver $driver */
return new UvFilesystemDriver($driver);
}

27
test/FileCacheTest.php Normal file
View File

@ -0,0 +1,27 @@
<?php declare(strict_types=1);
namespace Amp\File\Test;
use Amp\Cache\Test\StringCacheTest;
use Amp\File\FileCache;
use Amp\Sync\LocalKeyedMutex;
class FileCacheTest extends StringCacheTest
{
protected function setUp(): void
{
parent::setUp();
Fixture::init();
}
protected function tearDown(): void
{
parent::tearDown();
Fixture::clear();
}
protected function createCache(): FileCache
{
return new FileCache(Fixture::path(), new LocalKeyedMutex());
}
}

View File

@ -0,0 +1,27 @@
<?php declare(strict_types=1);
namespace Amp\File\Test;
use Amp\File\KeyedFileMutex;
use Amp\Sync\AbstractKeyedMutexTest;
use Amp\Sync\KeyedMutex;
final class KeyedFileMutexTest extends AbstractKeyedMutexTest
{
protected function setUp(): void
{
parent::setUp();
Fixture::init();
}
protected function tearDown(): void
{
parent::tearDown();
Fixture::clear();
}
public function createMutex(): KeyedMutex
{
return new KeyedFileMutex(Fixture::path());
}
}