mirror of
https://github.com/danog/byte-stream.git
synced 2024-12-11 17:09:43 +01:00
Solve performance issues with very large chunks
Very large chunks need to be copied every time there's a partial write, which is pretty problematic. Instead of doing an almost full copy of the full chunk every time, this patch splits very large chunks into multiple smaller chunks automatically. Fixes #41.
This commit is contained in:
parent
bd2de267cc
commit
b420ae4d6b
@ -13,6 +13,7 @@ use Amp\Success;
|
|||||||
*/
|
*/
|
||||||
final class ResourceOutputStream implements OutputStream {
|
final class ResourceOutputStream implements OutputStream {
|
||||||
const MAX_CONSECUTIVE_EMPTY_WRITES = 3;
|
const MAX_CONSECUTIVE_EMPTY_WRITES = 3;
|
||||||
|
const LARGE_CHUNK_SIZE = 128 * 1024;
|
||||||
|
|
||||||
/** @var resource */
|
/** @var resource */
|
||||||
private $resource;
|
private $resource;
|
||||||
@ -64,6 +65,9 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
$length = \strlen($data);
|
$length = \strlen($data);
|
||||||
|
|
||||||
if ($length === 0) {
|
if ($length === 0) {
|
||||||
|
// If there's no deferred at this place, there's a bug somewhere else and the item should have
|
||||||
|
// been removed earlier from the queue.
|
||||||
|
\assert($deferred !== null);
|
||||||
$deferred->resolve(0);
|
$deferred->resolve(0);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -104,17 +108,25 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ($deferred !== null) {
|
||||||
$deferred->resolve($written + $previous);
|
$deferred->resolve($written + $previous);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
} catch (\Throwable $exception) {
|
} catch (\Throwable $exception) {
|
||||||
$resource = null;
|
$resource = null;
|
||||||
$writable = false;
|
$writable = false;
|
||||||
|
|
||||||
|
if ($deferred !== null) {
|
||||||
$deferred->fail($exception);
|
$deferred->fail($exception);
|
||||||
|
}
|
||||||
|
|
||||||
while (!$writes->isEmpty()) {
|
while (!$writes->isEmpty()) {
|
||||||
list(, , $deferred) = $writes->shift();
|
list(, , $deferred) = $writes->shift();
|
||||||
|
|
||||||
|
if ($deferred !== null) {
|
||||||
$deferred->fail($exception);
|
$deferred->fail($exception);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Loop::cancel($watcher);
|
Loop::cancel($watcher);
|
||||||
} finally {
|
} finally {
|
||||||
@ -198,6 +210,16 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
}
|
}
|
||||||
|
|
||||||
$deferred = new Deferred;
|
$deferred = new Deferred;
|
||||||
|
|
||||||
|
if ($length - $written > self::LARGE_CHUNK_SIZE) {
|
||||||
|
$chunks = \str_split($data, self::LARGE_CHUNK_SIZE);
|
||||||
|
$data = \array_pop($chunks);
|
||||||
|
foreach ($chunks as $chunk) {
|
||||||
|
$this->writes->push([$chunk, $written, null]);
|
||||||
|
$written += self::LARGE_CHUNK_SIZE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
$this->writes->push([$data, $written, $deferred]);
|
$this->writes->push([$data, $written, $deferred]);
|
||||||
Loop::enable($this->watcher);
|
Loop::enable($this->watcher);
|
||||||
$promise = $deferred->promise();
|
$promise = $deferred->promise();
|
||||||
@ -241,6 +263,10 @@ final class ResourceOutputStream implements OutputStream {
|
|||||||
do {
|
do {
|
||||||
/** @var \Amp\Deferred $deferred */
|
/** @var \Amp\Deferred $deferred */
|
||||||
list(, , $deferred) = $this->writes->shift();
|
list(, , $deferred) = $this->writes->shift();
|
||||||
|
if ($deferred === null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
$deferred->fail($exception);
|
$deferred->fail($exception);
|
||||||
} while (!$this->writes->isEmpty());
|
} while (!$this->writes->isEmpty());
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user