1
0
mirror of https://github.com/danog/psalm.git synced 2025-01-21 21:31:13 +01:00

Attempt to buffer unsent messages

This commit is contained in:
Brown 2019-05-30 16:54:15 -04:00
parent 8fd59674a2
commit 9d7cf66279

View File

@ -129,8 +129,18 @@ class Pool
// Get the work for this process
$task_data_iterator = array_values($process_task_data_iterator)[$proc_id];
$task_done_buffer = '';
foreach ($task_data_iterator as $i => $task_data) {
$task_closure($i, $task_data);
$task_result = $task_closure($i, $task_data);
$task_done_message = new ForkTaskDoneMessage($task_result);
$serialized_message = $task_done_buffer . base64_encode(serialize($task_done_message)) . PHP_EOL;
$bytes_written = @fwrite($write_stream, $serialized_message);
if (strlen($serialized_message) !== $bytes_written) {
$task_done_buffer = substr($serialized_message, $bytes_written);
}
}
// Execute each child's shutdown closure before
@ -138,8 +148,21 @@ class Pool
$results = $shutdown_closure();
// Serialize this child's produced results and send them to the parent.
$serialized_message = serialize($results ?: []);
fwrite($write_stream, $serialized_message);
$process_done_message = new ForkProcessDoneMessage($results ?: []);
$serialized_message = $task_done_buffer . base64_encode(serialize($process_done_message)) . PHP_EOL;
$bytes_to_write = strlen($serialized_message);
$bytes_written = 0;
while ($bytes_written < $bytes_to_write) {
// attemt to write the remaining unsent part
$bytes_written += @fwrite($write_stream, substr($serialized_message, $bytes_written));
if ($bytes_written < $bytes_to_write) {
// wait a bit
usleep(500000);
}
}
fclose($write_stream);
@ -216,6 +239,8 @@ class Pool
/** @var array<int, string> $content */
$content = array_fill_keys(array_keys($streams), '');
$terminationMessages = [];
// Read the data off of all the stream.
while (count($streams) > 0) {
$needs_read = array_values($streams);
@ -236,36 +261,40 @@ class Pool
$content[intval($file)] .= $buffer;
}
if (strpos($buffer, PHP_EOL) !== false) {
$serialized_messages = explode(PHP_EOL, $content[intval($file)]);
$content[intval($file)] = array_pop($serialized_messages);
foreach ($serialized_messages as $serialized_message) {
$message = unserialize(base64_decode($serialized_message));
if ($message instanceof ForkProcessDoneMessage) {
$terminationMessages[] = $message->data;
} elseif ($message instanceof ForkTaskDoneMessage) {
if ($this->task_done_closure !== null) {
($this->task_done_closure)($message->data);
}
} else {
error_log('Child should return ForkMessage - response type=' . gettype($message));
$this->did_have_error = true;
}
}
}
// If the stream has closed, stop trying to select on it.
if (feof($file)) {
if ($content[intval($file)] !== '') {
error_log('Child did not send full message before closing the connection');
$this->did_have_error = true;
}
fclose($file);
unset($streams[intval($file)]);
}
}
}
return array_values(
array_map(
/**
* @param string $data
*
* @return array
*/
function ($data) {
/** @var array */
$result = unserialize($data);
/** @psalm-suppress DocblockTypeContradiction */
if (!\is_array($result)) {
error_log(
'Child terminated without returning a serialized array - response type=' . gettype($result)
);
$this->did_have_error = true;
}
return $result;
},
$content
)
);
return array_values($terminationMessages);
}
/**