1
0
mirror of https://github.com/danog/ipc.git synced 2024-11-26 20:15:05 +01:00

Allow calling disconnect while reading

This commit is contained in:
Daniil Gentili 2020-07-11 17:50:06 +02:00
parent 6971e4d61a
commit 787ca479ef
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
2 changed files with 44 additions and 2 deletions

View File

@ -4,6 +4,7 @@ namespace Amp\Ipc\Sync;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Deferred;
use Amp\Promise;
use Amp\Success;
@ -30,6 +31,12 @@ final class ChannelledSocket implements Channel
/** @var int */
private $state = self::ESTABLISHED;
/** @var Deferred */
private $closePromise;
/** @var bool */
private $reading = false;
/**
* @param resource $read Readable stream resource.
* @param resource $write Writable stream resource.
@ -53,13 +60,18 @@ final class ChannelledSocket implements Channel
return new Success();
}
return call(function (): \Generator {
$this->reading = true;
$data = yield $this->channel->receive();
$this->reading = false;
if ($data instanceof ChannelCloseReq) {
yield $this->channel->send(new ChannelCloseAck);
$this->state = self::GOT_FIN_MASK;
yield $this->disconnect();
return null;
} elseif ($data instanceof ChannelCloseAck) {
$this->closePromise->resolve($data);
return null;
}
return $data;
@ -81,9 +93,14 @@ final class ChannelledSocket implements Channel
return call(function () use ($channel): \Generator {
yield $channel->send(new ChannelCloseReq);
if ($this->reading) {
$this->closePromise = new Deferred;
}
do {
$data = yield $channel->receive();
$data = yield ($this->closePromise ? $this->closePromise->promise() : $channel->receive());
if ($this->closePromise) {
$this->closePromise = null;
}
if ($data instanceof ChannelCloseReq) {
yield $channel->send(new ChannelCloseAck);
$this->state |= self::GOT_FIN_MASK;

View File

@ -6,6 +6,7 @@ use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Parallel\Context\Process;
use Amp\PHPUnit\AsyncTestCase;
use function Amp\asyncCall;
use function Amp\Ipc\connect;
class IpcTest extends AsyncTestCase
@ -32,6 +33,30 @@ class IpcTest extends AsyncTestCase
$this->assertNull(yield $process->join());
}
/** @dataProvider provideUriFifo */
public function testIPCDisconectWhileReading(string $uri, bool $fifo)
{
$process = new Process([__DIR__.'/Fixtures/echoServer.php', $uri, $fifo]);
yield $process->start();
$recvUri = yield $process->receive();
if ($uri) {
$this->assertEquals($uri, $recvUri);
}
$client = yield connect($recvUri);
$this->assertInstanceOf(ChannelledSocket::class, $client);
asyncCall(
static function () use ($client) {
while (yield $client->receive());
}
);
yield $client->disconnect();
$this->assertNull(yield $process->join());
}
public function provideUriFifo(): \Generator
{
foreach (['', \sys_get_temp_dir().'/pony'] as $uri) {