1
0
mirror of https://github.com/danog/file.git synced 2024-12-02 09:17:57 +01:00

Update for amphp/parallel v2.0

This commit is contained in:
Aaron Piotrowski 2023-02-18 09:49:06 -06:00
parent bac8157738
commit de1f4c1e08
No known key found for this signature in database
GPG Key ID: 5B456E6AABA44A63
6 changed files with 20 additions and 26 deletions

View File

@ -44,14 +44,12 @@
"amphp/phpunit-util": "^3", "amphp/phpunit-util": "^3",
"phpunit/phpunit": "^9", "phpunit/phpunit": "^9",
"psalm/phar": "^5.4", "psalm/phar": "^5.4",
"amphp/php-cs-fixer-config": "^2-dev" "amphp/php-cs-fixer-config": "^2"
}, },
"suggest": { "suggest": {
"ext-eio": "^2 || ^3", "ext-eio": "^2 || ^3",
"ext-uv": "^0.3 || ^0.2" "ext-uv": "^0.3 || ^0.2"
}, },
"minimum-stability": "beta",
"prefer-stable": true,
"autoload": { "autoload": {
"psr-4": { "psr-4": {
"Amp\\File\\": "src" "Amp\\File\\": "src"

View File

@ -67,7 +67,7 @@ final class ParallelFilesystemDriver implements FilesystemDriver
$this->pendingWorker->await(); // Wait for any currently pending request for a worker. $this->pendingWorker->await(); // Wait for any currently pending request for a worker.
if ($this->workerStorage->count() < $this->workerLimit) { if ($this->workerStorage->count() < $this->workerLimit) {
$this->pendingWorker = async(fn () => $this->pool->getWorker()); $this->pendingWorker = async($this->pool->getWorker(...));
$worker = $this->pendingWorker->await(); $worker = $this->pendingWorker->await();
if ($this->workerStorage->contains($worker)) { if ($this->workerStorage->contains($worker)) {
@ -188,7 +188,7 @@ final class ParallelFilesystemDriver implements FilesystemDriver
private function runFileTask(Internal\FileTask $task): mixed private function runFileTask(Internal\FileTask $task): mixed
{ {
try { try {
return $this->pool->submit($task)->getResult()->await(); return $this->pool->submit($task)->await();
} catch (TaskFailureThrowable $exception) { } catch (TaskFailureThrowable $exception) {
throw new FilesystemException("The file operation failed", $exception); throw new FilesystemException("The file operation failed", $exception);
} catch (WorkerException $exception) { } catch (WorkerException $exception) {

View File

@ -4,8 +4,8 @@ namespace Amp\File\Internal;
use Amp\ByteStream\ClosedException; use Amp\ByteStream\ClosedException;
use Amp\ByteStream\StreamException; use Amp\ByteStream\StreamException;
use Amp\Cache\Cache;
use Amp\Cache\CacheException; use Amp\Cache\CacheException;
use Amp\Cache\LocalCache;
use Amp\Cancellation; use Amp\Cancellation;
use Amp\File\Driver\BlockingFile; use Amp\File\Driver\BlockingFile;
use Amp\File\Driver\BlockingFilesystemDriver; use Amp\File\Driver\BlockingFilesystemDriver;
@ -16,19 +16,14 @@ use Amp\Sync\Channel;
/** /**
* @codeCoverageIgnore * @codeCoverageIgnore
* @internal * @internal
* @implements Task<mixed, never, never, BlockingFile> * @implements Task<mixed, never, never>
*/ */
final class FileTask implements Task final class FileTask implements Task
{ {
private static ?LocalCache $cache = null;
private static ?BlockingFilesystemDriver $driver = null; private static ?BlockingFilesystemDriver $driver = null;
private const ENV_PREFIX = "amphp/file#";
private static function makeId(int $id): string
{
return self::ENV_PREFIX . $id;
}
/** /**
* @param int|null $id File ID. * @param int|null $id File ID.
* *
@ -50,19 +45,20 @@ final class FileTask implements Task
* @throws ClosedException * @throws ClosedException
* @throws StreamException * @throws StreamException
*/ */
public function run(Channel $channel, Cache $cache, Cancellation $cancellation): mixed public function run(Channel $channel, Cancellation $cancellation): mixed
{ {
self::$driver ??= new BlockingFilesystemDriver(); $cache = self::$cache ??= new LocalCache();
$driver = self::$driver ??= new BlockingFilesystemDriver();
if ('f' === $this->operation[0]) { if ('f' === $this->operation[0]) {
if ("fopen" === $this->operation) { if ("fopen" === $this->operation) {
$file = self::$driver->openFile(...$this->args); $file = $driver->openFile(...$this->args);
$size = self::$driver->getStatus($file->getPath())["size"] $size = $driver->getStatus($file->getPath())["size"]
?? throw new FilesystemException("Could not determine file size"); ?? throw new FilesystemException("Could not determine file size");
$id = $file->getId(); $id = $file->getId();
$cache->set(self::makeId($id), $file); $cache->set((string) $id, $file);
return [$id, $size, $file->getMode()]; return [$id, $size, $file->getMode()];
} }
@ -71,7 +67,7 @@ final class FileTask implements Task
throw new FilesystemException("No file ID provided"); throw new FilesystemException("No file ID provided");
} }
$id = self::makeId($this->id); $id = (string) $this->id;
$file = $cache->get($id); $file = $cache->get($id);
if ($file === null) { if ($file === null) {
@ -128,7 +124,7 @@ final class FileTask implements Task
case "touch": case "touch":
case "read": case "read":
case "write": case "write":
return self::$driver->{$this->operation}(...$this->args); return $driver->{$this->operation}(...$this->args);
default: default:
throw new \Error("Invalid operation - " . $this->operation); throw new \Error("Invalid operation - " . $this->operation);

View File

@ -38,7 +38,7 @@ final class FileWorker
public function execute(Task $task, ?Cancellation $cancellation = null): mixed public function execute(Task $task, ?Cancellation $cancellation = null): mixed
{ {
return $this->worker->submit($task, $cancellation)->getResult()->await(); return $this->worker->submit($task, $cancellation)->await();
} }
public function shutdown(): void public function shutdown(): void

View File

@ -5,7 +5,7 @@ namespace Amp\File\Test\Driver;
use Amp\File; use Amp\File;
use Amp\File\Driver\ParallelFilesystemDriver; use Amp\File\Driver\ParallelFilesystemDriver;
use Amp\File\Test\AsyncFileTest; use Amp\File\Test\AsyncFileTest;
use Amp\Parallel\Worker\DefaultWorkerPool; use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\WorkerPool; use Amp\Parallel\Worker\WorkerPool;
class ParallelFileTest extends AsyncFileTest class ParallelFileTest extends AsyncFileTest
@ -16,7 +16,7 @@ class ParallelFileTest extends AsyncFileTest
protected function createDriver(int $workerLimit = self::DEFAULT_WORKER_LIMIT): File\FilesystemDriver protected function createDriver(int $workerLimit = self::DEFAULT_WORKER_LIMIT): File\FilesystemDriver
{ {
$this->pool = new DefaultWorkerPool(); $this->pool = new ContextWorkerPool();
return new ParallelFilesystemDriver($this->pool, $workerLimit); return new ParallelFilesystemDriver($this->pool, $workerLimit);
} }

View File

@ -5,7 +5,7 @@ namespace Amp\File\Test\Driver;
use Amp\File; use Amp\File;
use Amp\File\Driver\ParallelFilesystemDriver; use Amp\File\Driver\ParallelFilesystemDriver;
use Amp\File\Test\FilesystemDriverTest; use Amp\File\Test\FilesystemDriverTest;
use Amp\Parallel\Worker\DefaultWorkerPool; use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\WorkerPool; use Amp\Parallel\Worker\WorkerPool;
class ParallelFilesystemDriverTest extends FilesystemDriverTest class ParallelFilesystemDriverTest extends FilesystemDriverTest
@ -14,7 +14,7 @@ class ParallelFilesystemDriverTest extends FilesystemDriverTest
protected function createDriver(): File\FilesystemDriver protected function createDriver(): File\FilesystemDriver
{ {
$this->pool = new DefaultWorkerPool(); $this->pool = new ContextWorkerPool();
return new ParallelFilesystemDriver($this->pool); return new ParallelFilesystemDriver($this->pool);
} }