1
0
mirror of https://github.com/danog/ipc.git synced 2024-11-27 04:24:48 +01:00
ipc/lib/Sync/ChannelledSocket.php

154 lines
3.7 KiB
PHP
Raw Normal View History

2020-02-14 20:31:11 +01:00
<?php
namespace Amp\Ipc\Sync;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
2020-07-11 17:50:06 +02:00
use Amp\Deferred;
2020-02-14 20:31:11 +01:00
use Amp\Promise;
2020-03-05 21:12:35 +01:00
use Amp\Success;
use function Amp\call;
2020-02-14 20:31:11 +01:00
final class ChannelledSocket implements Channel
{
2020-03-05 21:12:35 +01:00
private const ESTABLISHED = 0;
private const GOT_FIN_MASK = 1;
private const GOT_ACK_MASK = 2;
private const GOT_ALL_MASK = 3;
2020-02-14 20:31:11 +01:00
/** @var ChannelledStream */
private $channel;
/** @var ResourceInputStream */
private $read;
/** @var ResourceOutputStream */
private $write;
2020-09-24 21:00:49 +02:00
/** @var bool */
private $closed = false;
2020-03-05 21:12:35 +01:00
/** @var int */
private $state = self::ESTABLISHED;
2020-07-11 17:50:06 +02:00
/** @var Deferred */
private $closePromise;
/** @var bool */
private $reading = false;
2020-02-14 20:31:11 +01:00
/**
* @param resource $read Readable stream resource.
* @param resource $write Writable stream resource.
*
* @throws \Error If a stream resource is not given for $resource.
*/
public function __construct($read, $write)
{
$this->channel = new ChannelledStream(
$this->read = new ResourceInputStream($read),
$this->write = new ResourceOutputStream($write)
);
}
/**
* {@inheritdoc}
*/
public function receive(): Promise
{
2020-09-24 21:00:49 +02:00
if ($this->closed) {
2020-03-05 21:12:35 +01:00
return new Success();
}
return call(function (): \Generator {
2020-07-11 17:50:06 +02:00
$this->reading = true;
2020-03-05 21:12:35 +01:00
$data = yield $this->channel->receive();
2020-07-11 17:50:06 +02:00
$this->reading = false;
2020-03-05 21:12:35 +01:00
if ($data instanceof ChannelCloseReq) {
yield $this->channel->send(new ChannelCloseAck);
$this->state = self::GOT_FIN_MASK;
2020-03-05 21:59:20 +01:00
yield $this->disconnect();
2020-03-05 21:12:35 +01:00
return null;
2020-07-11 17:50:06 +02:00
} elseif ($data instanceof ChannelCloseAck) {
$this->closePromise->resolve($data);
return null;
2020-03-05 21:12:35 +01:00
}
return $data;
});
}
/**
* Cleanly disconnect from other endpoint.
*
* @return Promise
*/
public function disconnect(): Promise
{
2020-09-24 21:00:49 +02:00
if ($this->closed) {
2020-03-05 21:59:20 +01:00
return new Success();
2020-03-05 21:12:35 +01:00
}
2020-09-24 21:00:49 +02:00
$this->closed = true;
return call(function (): \Generator {
yield $this->channel->send(new ChannelCloseReq);
2020-07-11 17:50:06 +02:00
if ($this->reading) {
$this->closePromise = new Deferred;
}
2020-03-05 21:12:35 +01:00
do {
2020-09-24 21:00:49 +02:00
$data = yield ($this->closePromise ? $this->closePromise->promise() : $this->channel->receive());
2020-07-11 17:50:06 +02:00
if ($this->closePromise) {
$this->closePromise = null;
}
2020-03-05 21:12:35 +01:00
if ($data instanceof ChannelCloseReq) {
2020-09-24 21:00:49 +02:00
yield $this->channel->send(new ChannelCloseAck);
2020-03-05 21:12:35 +01:00
$this->state |= self::GOT_FIN_MASK;
2020-03-05 21:16:53 +01:00
} elseif ($data instanceof ChannelCloseAck) {
2020-03-05 21:12:35 +01:00
$this->state |= self::GOT_ACK_MASK;
}
} while ($this->state !== self::GOT_ALL_MASK);
$this->close();
});
2020-02-14 20:31:11 +01:00
}
/**
* {@inheritdoc}
*/
public function send($data): Promise
{
2020-09-24 21:00:49 +02:00
if ($this->closed) {
2020-03-05 21:12:35 +01:00
throw new ChannelException('The channel was already closed!');
}
2020-02-14 20:31:11 +01:00
return $this->channel->send($data);
}
2020-03-05 21:12:35 +01:00
/**
* {@inheritdoc}
*/
2020-02-14 20:31:11 +01:00
public function unreference()
{
$this->read->unreference();
}
2020-03-05 21:12:35 +01:00
/**
* {@inheritdoc}
*/
2020-02-14 20:31:11 +01:00
public function reference()
{
$this->read->reference();
}
/**
* Closes the read and write resource streams.
*/
2020-03-05 21:12:35 +01:00
private function close()
2020-02-14 20:31:11 +01:00
{
$this->read->close();
$this->write->close();
}
}