From b420ae4d6bc70731c49855430a3bd9592b7283df Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Wed, 23 May 2018 19:39:34 +0200 Subject: [PATCH] Solve performance issues with very large chunks Very large chunks need to be copied every time there's a partial write, which is pretty problematic. Instead of doing an almost full copy of the full chunk every time, this patch splits very large chunks into multiple smaller chunks automatically. Fixes #41. --- lib/ResourceOutputStream.php | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/lib/ResourceOutputStream.php b/lib/ResourceOutputStream.php index d4b1b1f..76c2c4c 100644 --- a/lib/ResourceOutputStream.php +++ b/lib/ResourceOutputStream.php @@ -13,6 +13,7 @@ use Amp\Success; */ final class ResourceOutputStream implements OutputStream { const MAX_CONSECUTIVE_EMPTY_WRITES = 3; + const LARGE_CHUNK_SIZE = 128 * 1024; /** @var resource */ private $resource; @@ -64,6 +65,9 @@ final class ResourceOutputStream implements OutputStream { $length = \strlen($data); if ($length === 0) { + // If there's no deferred at this place, there's a bug somewhere else and the item should have + // been removed earlier from the queue. + \assert($deferred !== null); $deferred->resolve(0); continue; } @@ -104,16 +108,24 @@ final class ResourceOutputStream implements OutputStream { return; } - $deferred->resolve($written + $previous); + if ($deferred !== null) { + $deferred->resolve($written + $previous); + } } } catch (\Throwable $exception) { $resource = null; $writable = false; - $deferred->fail($exception); + if ($deferred !== null) { + $deferred->fail($exception); + } + while (!$writes->isEmpty()) { list(, , $deferred) = $writes->shift(); - $deferred->fail($exception); + + if ($deferred !== null) { + $deferred->fail($exception); + } } Loop::cancel($watcher); @@ -198,6 +210,16 @@ final class ResourceOutputStream implements OutputStream { } $deferred = new Deferred; + + if ($length - $written > self::LARGE_CHUNK_SIZE) { + $chunks = \str_split($data, self::LARGE_CHUNK_SIZE); + $data = \array_pop($chunks); + foreach ($chunks as $chunk) { + $this->writes->push([$chunk, $written, null]); + $written += self::LARGE_CHUNK_SIZE; + } + } + $this->writes->push([$data, $written, $deferred]); Loop::enable($this->watcher); $promise = $deferred->promise(); @@ -241,6 +263,10 @@ final class ResourceOutputStream implements OutputStream { do { /** @var \Amp\Deferred $deferred */ list(, , $deferred) = $this->writes->shift(); + if ($deferred === null) { + continue; + } + $deferred->fail($exception); } while (!$this->writes->isEmpty()); }