diff --git a/src/danog/Merger/Abstr/SharedMerger.php b/src/danog/Merger/Abstr/SharedMerger.php new file mode 100644 index 0000000..44c8bbc --- /dev/null +++ b/src/danog/Merger/Abstr/SharedMerger.php @@ -0,0 +1,114 @@ +. + * + * @author Daniil Gentili + * @copyright 2019 Daniil Gentili + * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 + */ +namespace danog\Merger\Abstr; + +use function Amp\Socket\connect; +use danog\Merger\MergerWorker; +use danog\Merger\SequentialSocket; +use danog\Merger\Settings; + +/** + * Abstract class shared merger + */ +abstract class SharedMerger +{ + /** + * Get read loop + * + * @return callable + */ + abstract public function getReadLoop(): callable; + + /** + * Shared socket loop + * + * @param int $writerId Writer ID + * @return void + */ + public function sharedLoop($writerId) + { + $socket = $this->writers[$writerId]; + $buffer = $socket->getBuffer(); + $loop_callback = $this->getReadLoop(); + + while (true) { + if (!yield $socket->read(6)) { + $this->logger->write("Breaking out of $writerId\n"); + break; + } + + $length = unpack('V', stream_get_contents($buffer, 4))[1]; + $port = unpack('n', stream_get_contents($buffer, 2))[1]; + + if ($length === 0) { + yield $socket->read(1); + $cmd = ord(stream_get_contents($buffer, 1)); + $this->logger->write("Reading special action $cmd $writerId\n"); + + if ($cmd === Settings::ACTION_DISCONNECT) { + $this->connections[$port]->close(); + unset($this->connections[$port]); + } elseif ($cmd === Settings::ACTION_CONNECT && $this->server) { + yield $socket->read(3); + $rport = unpack('n', stream_get_contents($buffer, 2))[1]; + $type = ord(stream_get_contents($buffer, 1)); + switch ($type) { + case 0x03: + yield $socket->read(1); + $toRead = ord(stream_get_contents($buffer, 1)); + yield $socket->read($toRead); + $host = stream_get_contents($buffer, $toRead); + break; + case 0x04: + $toRead = 16; + yield $socket->read($toRead); + $host = '[' . inet_ntop(stream_get_contents($buffer, $toRead)) . ']'; + break; + case 0x01: + $toRead = 4; + yield $socket->read($toRead); + $host = inet_ntop(stream_get_contents($buffer, $toRead)); + break; + } + $this->logger->write("Connecting to $host:$rport, {$port}\n"); + try { + $this->connections[$port] = new MergerWorker($port, $loop_callback, $this->logger, $this->writers); + $this->connections[$port]->loop(new SequentialSocket(yield connect("tcp://$host:$rport"))); + $this->logger->write("Connected to $host:$rport, {$port}\n"); + } catch (\Exception $e) { + $this->logger->write("Exception {$e->getMessage()} in $host:$rport, {$port}\n"); + $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, Settings::ACTION_DISCONNECT)); + } + } elseif ($cmd === Settings::ACTION_SYNC) { + $this->logger->write("SYNCING FOR {$port} on $writerId\n"); + $this->connections[$port]->sync($writerId); + } else { + throw new \Exception("Got unknown cmd $cmd"); + } + } else { + yield $this->connections[$port]->handleSharedRead($writerId, $buffer, $length); + } + + if (fstat($buffer)['size'] > 1 * 1024 * 1024) { + $rest = stream_get_contents($buffer); + ftruncate($buffer, strlen($rest)); + fseek($buffer, 0); + fwrite($buffer, $rest); + fseek($buffer, 0); + } + } + } +} \ No newline at end of file diff --git a/src/danog/Merger/Exception.php b/src/danog/Merger/Exception.php new file mode 100644 index 0000000..d7ed93e --- /dev/null +++ b/src/danog/Merger/Exception.php @@ -0,0 +1,29 @@ +file = $file; + } + if ($line !== null) { + $this->line = $line; + } + parent::__construct($message, $code, $previous); + } + /** + * ExceptionErrorHandler. + * + * Error handler + */ + public static function ExceptionErrorHandler($errno = 0, $errstr = null, $errfile = null, $errline = null) + { + // If error is suppressed with @, don't throw an exception + if (error_reporting() === 0 || strpos($errstr, 'headers already sent') || ($errfile && strpos($errfile, 'vendor/amphp') !== false)) { + return false; + } + throw new self($errstr, $errno, null, $errfile, $errline); + } +} \ No newline at end of file diff --git a/src/danog/Merger/Merger.php b/src/danog/Merger/Merger.php index c01fa58..b4daefc 100644 --- a/src/danog/Merger/Merger.php +++ b/src/danog/Merger/Merger.php @@ -22,19 +22,17 @@ use Amp\Socket\ClientConnectContext; use function Amp\asyncCall; use function Amp\Socket\connect; use function Amp\Socket\listen; +use danog\Merger\Abstr\SharedMerger; class Merger extends SharedMerger { protected $settings; protected $writers = []; protected $connections = []; - protected $stats = []; - protected $shared_stats = []; + protected $shared_stats; + protected $server = false; protected $connection_seqno = 3; - const STATE_HEADER = 0; - const STATE_DATA = 1; - /** * Constructor * @@ -58,112 +56,109 @@ class Merger extends SharedMerger } public function loopAsync() { + $y = 0; foreach ($this->settings->getConnectFromAddresses() as $bindto) { for ($x = 0; $x < $this->settings->getConnectionCount(); $x++) { $context = (new ClientConnectContext())->withBindTo($bindto); - $this->writers[$bindto . '-' . $x] = new SharedSocket(yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context)); - $this->stats[$bindto . '-' . $x] = Stats::getInstance($bindto . '-' . $x); - $this->pending_out_payloads[$bindto . '-' . $x] = new \SplQueue; - asyncCall([$this, 'handleSharedReads'], $bindto . '-' . $x, false); + $id = $y++; + $this->writers[$id] = new SequentialSocket(yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context), $id); + $this->writers[$id]->write(pack('n', $id)); + ksort($this->writers); + asyncCall([$this, 'sharedLoop'], $id); } } $server = listen("127.0.0.1:55555"); + $loop_callback = $this->getReadLoop(); + while ($socket = yield $server->accept()) { $port = explode(':', stream_socket_get_name($socket->getResource(), true))[1]; $port = $this->connection_seqno++; - $this->connections[$port] = $socket; - $this->connection_out_seq_no[$port] = 0; - $this->connection_in_seq_no[$port] = 0; - $this->pending_in_payloads[$port] = []; - asyncCall([$this, 'handleClientReads'], $port); + $this->connections[$port] = new MergerWorker($port, $loop_callback, $this->logger, $this->writers); + $this->connections[$port]->loop(new SequentialSocket($socket)); }; } - public function handleClientReads($port) + public function getReadLoop(): callable { - $this->logger->write("New $port\n"); - $socket = $this->connections[$port]; + return function () { + $this->_logger->write("New {$this->_port}\n"); + $socket = $this->_socket; + $socksInit = $socket->getBuffer(); - $socksInit = fopen('php://memory', 'r+'); - yield $this->readMore($socket, $socksInit, 2); + yield $socket->read(2); - if (fread($socksInit, 1) !== chr(5)) { - throw new \Exception('Wrong socks5 init '); - } - yield $socket->write(chr(5)); - $auth = null; - for ($x = 0; $x < ord(fread($socksInit, 1)); $x++) { - yield $this->readMore($socket, $socksInit, 1); - $type = ord(fread($socksInit, 1)); - if ($type === 0) { - $auth = false; - } else if ($type === 2) { - $auth = true; + if (fread($socksInit, 1) !== chr(5)) { + throw new \Exception('Wrong socks5 init '); } - } - if ($auth === null) { - throw new \Exception('No socks5 method'); - } - $authchr = chr($auth ? 2 : 0); - yield $socket->write($authchr); + yield $socket->write(chr(5)); + $auth = null; + for ($x = 0; $x < ord(fread($socksInit, 1)); $x++) { + yield $socket->read(1); + $type = ord(fread($socksInit, 1)); + if ($type === 0) { + $auth = false; + } else if ($type === 2) { + $auth = true; + } + } + if ($auth === null) { + throw new \Exception('No socks5 method'); + } + $authchr = chr($auth ? 2 : 0); + yield $socket->write($authchr); - yield $this->readMore($socket, $socksInit, 4); - if (fread($socksInit, 3) !== chr(5) . chr(1) . $authchr) { - throw new \Exception('Wrong socks5 ack'); - } - if ($auth) { - yield $this->readMore($socket, $socksInit, 2); - $ulen = ord(fread(2)[1]); - yield $this->readMore($socket, $socksInit, $ulen); - $username = fread($socksInit, $ulen); + yield $socket->read(3); + if (fread($socksInit, 3) !== chr(5) . chr(1) . $authchr) { + throw new \Exception('Wrong socks5 ack'); + } + if ($auth) { + yield $socket->read(1); + $ulen = ord(fread(1)); + yield $socket->read($ulen); + $username = fread($socksInit, $ulen); - $plen = ord(fread(1)); - yield $this->readMore($socket, $socksInit, $plen); - $password = fread($socksInit, $plen); - } - $payload = fread($socksInit, 1); - switch (ord($payload[0])) { - case 0x03: - yield $this->readMore($socket, $socksInit, 1); - $payload .= fread($socksInit, 1); - $toRead = ord($payload[1]); - yield $this->readMore($socket, $socksInit, $toRead); - $payload .= fread($socksInit, $toRead); - break; - case 0x04: - $toRead = 16; - yield $this->readMore($socket, $socksInit, $toRead); - $payload .= fread($socksInit, $toRead); - break; - case 0x01: - $toRead = 4; - yield $this->readMore($socket, $socksInit, $toRead); - $payload .= fread($socksInit, $toRead); - break; - } - yield $this->readMore($socket, $socksInit, 2); - $rport = unpack('n', fread($socksInit, 2))[1]; + yield $socket->read(1); + $plen = ord(fread(1)); + yield $socket->read($plen); + $password = fread($socksInit, $plen); + } + yield $socket->read(1); + $payload = fread($socksInit, 1); + switch (ord($payload)) { + case 0x03: + yield $socket->read(1); + $payload .= fread($socksInit, 1); + $toRead = ord($payload[1]); + yield $socket->read($toRead); + $payload .= fread($socksInit, $toRead); + break; + case 0x04: + $toRead = 16; + yield $socket->read($toRead); + $payload .= fread($socksInit, $toRead); + break; + case 0x01: + $toRead = 4; + yield $socket->read($toRead); + $payload .= fread($socksInit, $toRead); + break; + } + yield $socket->read(2); + $rport = unpack('n', fread($socksInit, 2))[1]; - yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0)); + yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0)); - $this->logger->write("================================ SENDING CONNECT ================================"); - yield $this->writers[key($this->writers)]->write(pack('VnCn', 0, $port, self::ACTION_CONNECT, $rport) . $payload); + $this->_logger->write("================================ SENDING CONNECT ================================\n"); + yield $this->_writers[key($this->_writers)]->write(pack('VnCn', 0, $this->_port, Settings::ACTION_CONNECT, $rport) . $payload); - $buffer = fopen('php://memory', 'r+'); - if (fstat($socksInit)['size'] - ftell($socksInit)) { - fwrite($buffer, stream_get_contents($socksInit)); - fseek($buffer, 0); - yield $this->commonWrite($port, $buffer); - } - fclose($socksInit); - while (null !== $chunk = yield $socket->read()) { - //$this->logger->write("Sending $port => proxy\n"); - fwrite($buffer, $chunk); - fseek($buffer, 0); - yield $this->commonWrite($port, $buffer); - } - yield $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); + if (fstat($socksInit)['size'] - ftell($socksInit)) { + yield $this->commonWrite($socksInit); + } + while (yield $socket->read()) { + yield $this->commonWrite($socksInit); + } + yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT)); + }; } - } diff --git a/src/danog/Merger/MergerServer.php b/src/danog/Merger/MergerServer.php index 9035713..da681b4 100644 --- a/src/danog/Merger/MergerServer.php +++ b/src/danog/Merger/MergerServer.php @@ -16,17 +16,17 @@ namespace danog\Merger; use function Amp\asyncCall; -use function Amp\Socket\connect; use function Amp\Socket\listen; use Amp\Loop; use Amp\ByteStream\ResourceOutputStream; +use danog\Merger\Abstr\SharedMerger; class MergerServer extends SharedMerger { protected $settings; protected $writers = []; protected $connections = []; - protected $stats = []; + protected $server = true; const STATE_HEADER = 0; const STATE_HEADER_CMD = 1; @@ -58,30 +58,32 @@ class MergerServer extends SharedMerger $server = listen($this->settings->getTunnelEndpoint()); while ($socket = yield $server->accept()) { - list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true)); - $this->writers[$address . '-' . $port] = new SharedSocket($socket); - $this->stats[$address . '-' . $port] = Stats::getInstance($address . '-' . $port); - $this->pending_out_payloads[$address . '-' . $port] = new \SplQueue; - - asyncCall([$this, 'handleSharedReads'], $address . '-' . $port, true); + //list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true)); + $socket = new SequentialSocket($socket); + yield $socket->read(2); + $id = unpack('n', fread($socket->getBuffer(), 2))[1]; + $socket->setId($id); + $this->writers[$id] = $socket; + ksort($this->writers); + asyncCall([$this, 'sharedLoop'], $id); }; } - public function handleClientReads($port) + public function getReadLoop(): callable { - $this->logger->write("New $port\n"); - $socket = $this->connections[$port]; + return function () { + $this->_logger->write("New {$this->_port}\n"); + $socket = $this->_socket; - $buffer = fopen('php://memory', 'r+'); - while (null !== $chunk = yield $socket->read()) { - //$this->logger->write("Sending $port => proxy\n"); - fwrite($buffer, $chunk); - fseek($buffer, 0); + $buffer = $socket->getBuffer(); + while (null !== $chunk = yield $socket->read()) { + fwrite($buffer, $chunk); + fseek($buffer, 0); - yield $this->commonWrite($port, $buffer); - } - $this->logger->write("Closing $port\n"); - $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); + yield $this->commonWrite($buffer); + } + $this->_logger->write("Closing {$this->_port}\n"); + yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT)); + }; } - } diff --git a/src/danog/Merger/MergerWorker.php b/src/danog/Merger/MergerWorker.php new file mode 100644 index 0000000..2efc4d3 --- /dev/null +++ b/src/danog/Merger/MergerWorker.php @@ -0,0 +1,213 @@ +_port = $port; + $this->_writers = $writers; + $this->_logger = $logger; + $this->_callback = $callback->bindTo($this, get_class($this)); + $this->_sharedStats = Stats::getInstance(); + $this->_pause = new Deferred; + $this->_connectionInSubSeqNo = array_fill_keys(array_keys($this->_writers), 0); + } + public function loop($socket) + { + $this->_socket = $socket; + $this->parsePending(); + asyncCall($this->_callback); + } + public function handleSharedReadAsync($writerId, $buffer, $length) + { + $socket = $this->_writers[$writerId]; + + yield $socket->read($length + 2); + $seqno = unpack('n', stream_get_contents($buffer, 2))[1]; + + if ($this->_socket && $seqno === $this->_connectionInSeqNo && !$this->_connectionInSubSeqNo[$writerId]) { + $this->_logger->write("Receiving payload with seqno $seqno main $writerId\n"); + $this->_socket->write(stream_get_contents($buffer, $length)); + $this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF; + + if ($this->_connectionInSeqNo === 0) { + foreach ($this->_connectionInSubSeqNo as &$sseqno) { + $sseqno--; + } + $this->_pendingPayloads = $this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : []; + //ksort($this->_pendingPayloads); + /* + $this->_pause->resolve(); + $this->_pause = new Deferred; + $this->_minPauseSeqno = 0; + */ + } + $this->parsePending(); + } else { + if (!$this->_connectionInSubSeqNo[$writerId]) { + $this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) postpone $writerId\n"); + $this->_pendingPayloads[$seqno] = stream_get_contents($buffer, $length); + //ksort($this->_pendingPayloads); + /* + if ($seqno - $this->_connectionInSeqNo > 200) { + $this->_logger->write("Pausing {$this->_port} - $writerId\n"); + $this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo; + yield $this->_pause->promise(); + $this->_logger->write("Resuming {$this->_port} - $writerId\n"); + }*/ + } else { + $this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) - {$this->_connectionInSubSeqNo[$writerId]} postpone $writerId\n"); + $this->_pendingSubPayloads[$this->_connectionInSubSeqNo[$writerId]][$seqno] = stream_get_contents($buffer, $length); + /* + if ($this->_connectionInSubSeqNo[$writerId] > 1 || (0xFFFF + $seqno) - $this->_connectionInSeqNo > 200) { + $this->_logger->write("Pausing {$this->_port} - $writerId\n"); + $this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo; + yield $this->_pause->promise(); + $this->_logger->write("Resuming {$this->_port} - $writerId\n"); + }*/ + } + } + + } + public function sync($writerId) + { + $seqno = ++$this->_connectionInSubSeqNo[$writerId]; + if (!isset($this->_pendingSubPayloads[$seqno])) { + $this->_pendingSubPayloads[$seqno] = []; + } + } + + public function commonWrite($chunk) + { + $shared_deferred = new Deferred(); + $promise = $shared_deferred->promise(); + $length = fstat($chunk)['size'] - ftell($chunk); + foreach ($this->_sharedStats->balance($length) as $writerId => $bytes) { + if ($bytes <= 0) { + continue; + } + + $seqno = $this->_connectionOutSeqNo; + $this->_connectionOutSeqNo = ($this->_connectionOutSeqNo + 1) % 0xFFFF; + if ($this->_connectionOutSeqNo === 0) { + foreach ($this->_writers as $writer) { + $writer->write(pack('VnC', 0, $this->_port, Settings::ACTION_SYNC)); + } + } + $this->_logger->write("Still sending {$this->_port} seqno $seqno length $bytes\n"); + + $this->_writers[$writerId]->writeSequential(pack('Vnn', $bytes, $this->_port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve( + function ($error = null, $result = null) use (&$shared_deferred) { + if ($error) { + throw $error; + } + if ($shared_deferred) { + $shared_deferred->resolve(); + $shared_deferred = null; + } + } + ); + } + fseek($chunk, 0); + ftruncate($chunk, 0); + return $promise; + } + + public function parsePending() + { + for ($seqno = $this->_connectionInSeqNo; $seqno < 0xFFFF && isset($this->_pendingPayloads[$seqno]); $seqno++) { + $payload = $this->_pendingPayloads[$seqno]; + $this->_logger->write("Receiving proxy => {$this->_port} seqno $seqno post\n"); + + unset($this->_pendingPayloads[$seqno]); + $this->_socket->write($payload); + $this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF; + if ($this->_connectionInSeqNo === 0) { + foreach ($this->_connectionInSubSeqNo as &$sseqno) { + $sseqno--; + } + $this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : []; + //ksort($this->_pendingPayloads); + /* + $this->_pause->resolve(); + $this->_pause = new Deferred; + $this->_minPauseSeqno = 0; + */ + $this->parsePending(); + } + } + /* + if ($this->_minPauseSeqno && $this->_connectionInSeqNo > $this->_minPauseSeqno) { + $this->_pause->resolve(); + $this->_pause = new Deferred; + $this->_minPauseSeqno = 0; + }*/ + } + public function close() + { + if (!$this->_socket) { + return; + } + return $this->_socket->close(); + } + + public function handleSharedRead($writerId, $buffer, $length) + { + return call([$this, 'handleSharedReadAsync'], $writerId, $buffer, $length); + } +} diff --git a/src/danog/Merger/SequentialSocket.php b/src/danog/Merger/SequentialSocket.php new file mode 100644 index 0000000..cbfe0e4 --- /dev/null +++ b/src/danog/Merger/SequentialSocket.php @@ -0,0 +1,71 @@ +socket = $socket; + $this->buffer = fopen('php://memory', 'r+'); + $this->last_write = new Success(); + $this->stats = Stats::getInstance($id); + } + public function setId($id) + { + $this->stats = Stats::getInstance($id); + } + public function getBuffer() + { + return $this->buffer; + } + public function read($length = 0) + { + return call([$this, 'readAsync'], $length); + } + public function readAsync($length) + { + if (!$length) { + $pos = ftell($this->buffer); + $read = yield $this->socket->read(); + fwrite($this->buffer, $read); + fseek($this->buffer, $pos); + return $read !== null; + } + $read = true; + $pos = ftell($this->buffer); + fseek($this->buffer, 0, SEEK_END); + while (fstat($this->buffer)['size'] - $pos < $length && ($read = yield $this->socket->read()) !== null) { + fwrite($this->buffer, $read); + } + fseek($this->buffer, $pos); + return $read !== null; + } + public function write($data) + { + return $this->socket->write($data); + } + public function writeSequential($data) + { + return call([$this, 'writeAsync'], $data); + } + public function writeAsync($data) + { + yield $this->last_write; + $started = microtime(true); + $wrote = yield $this->last_write = $this->socket->write($data); + $this->stats->stopSending($started, $wrote); + return $wrote; + } + public function close() + { + return $this->socket->close(); + } +} \ No newline at end of file diff --git a/src/danog/Merger/Settings.php b/src/danog/Merger/Settings.php index bb0d6c3..01802a9 100644 --- a/src/danog/Merger/Settings.php +++ b/src/danog/Merger/Settings.php @@ -17,6 +17,10 @@ namespace danog\Merger; class Settings { + const ACTION_CONNECT = 0; + const ACTION_DISCONNECT = 1; + const ACTION_SYNC = 2; + /** * Addresses from which to connect * diff --git a/src/danog/Merger/SharedMerger.php b/src/danog/Merger/SharedMerger.php deleted file mode 100644 index 7443214..0000000 --- a/src/danog/Merger/SharedMerger.php +++ /dev/null @@ -1,193 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2019 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - */ -namespace danog\Merger; - -use Amp\Deferred; -use function Amp\asyncCall; -use function Amp\call; -use function Amp\Socket\connect; - -abstract class SharedMerger -{ - protected $pending_in_payloads = []; - protected $pending_out_payloads = []; - protected $connection_out_seq_no = []; - protected $connection_in_seq_no = []; - - const ACTION_CONNECT = 0; - const ACTION_DISCONNECT = 1; - public function readMore($socket, $buffer, $length) - { - return call([$this, 'readMoreAsync'], $socket, $buffer, $length); - } - public function readMoreAsync($socket, $buffer, $length) - { - $read = true; - $pos = ftell($buffer); - fseek($buffer, 0, SEEK_END); - while (fstat($buffer)['size'] - $pos < $length && ($read = yield $socket->read()) !== null) { - fwrite($buffer, $read); - } - fseek($buffer, $pos); - return $read !== null; - } - public function commonWrite($port, $chunk) - { - $shared_deferred = new Deferred(); - $promise = $shared_deferred->promise(); - $length = fstat($chunk)['size'] - ftell($chunk); - foreach ($this->shared_stats->balance($length) as $id => $bytes) { - if ($bytes === 0) continue; - $stats = $this->stats[$id]; - $seqno = $this->connection_out_seq_no[$port]; - $this->connection_out_seq_no[$port] = ($this->connection_out_seq_no[$port] + 1) % 0xFFFF; - - $this->logger->write("Still sending $port seqno $seqno length $bytes\n"); - - $this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve( - function ($error = null, $result = null) use ($stats, &$shared_deferred, $bytes) { - if ($error) { - throw $error; - } - if ($shared_deferred) { - $shared_deferred->resolve(); - $shared_deferred = null; - } - $stats->stopSending($result, $bytes); - } - ); - } - ftruncate($chunk, 0); - fseek($chunk, 0); - return $promise; - } - - public function handleSharedReads($id, $server) - { - $socket = $this->writers[$id]; - $buffer = $socket->getBuffer(); - - while (true) { - if (!yield $socket->read(6)) { - $this->logger->write("Breaking out of $id\n"); - break; - } - - $length = unpack('V', stream_get_contents($buffer, 4))[1]; - $port = unpack('n', stream_get_contents($buffer, 2))[1]; - - $this->logger->write("Reading length $length port $port\n"); - if ($length === 0) { - yield $socket->read(1); - $cmd = ord(stream_get_contents($buffer, 1)); - $this->logger->write("Reading special action $cmd $id\n"); - - if ($cmd === self::ACTION_DISCONNECT) { - $this->connections[$port]->close(); - unset($this->connections[$port]); - unset($this->connection_out_seq_no[$port]); - unset($this->connection_in_seq_no[$port]); - unset($this->pending_in_payloads[$port]); - } else if ($cmd === self::ACTION_CONNECT && $server) { - yield $socket->read(3); - $rport = unpack('n', stream_get_contents($buffer, 2))[1]; - $type = ord(stream_get_contents($buffer, 1)); - switch ($type) { - case 0x03: - yield $socket->read(1); - $toRead = ord(stream_get_contents($buffer, 1)); - yield $socket->read($toRead); - $host = stream_get_contents($buffer, $toRead); - break; - case 0x04: - $toRead = 16; - yield $socket->read($toRead); - $host = '[' . inet_ntop(stream_get_contents($buffer, $toRead)) . ']'; - break; - case 0x01: - $toRead = 4; - yield $socket->read($toRead); - $host = inet_ntop(stream_get_contents($buffer, $toRead)); - break; - } - $this->logger->write("Connecting to $host:$rport, $port\n"); - try { - $this->connection_out_seq_no[$port] = 0; - $this->connection_in_seq_no[$port] = 0; - $this->pending_in_payloads[$port] = []; - $this->connections[$port] = yield connect("tcp://$host:$rport"); - ksort($this->pending_in_payloads[$port]); - foreach ($this->pending_in_payloads[$port] as $seqno => $payload) { - if ($this->connection_in_seq_no[$port] !== $seqno) { - break; - } - $this->logger->write("Receiving proxy => $port seqno $seqno init $id\n"); - - unset($this->pending_in_payloads[$port][$seqno]); - $this->connections[$port]->write($payload); - $this->connection_in_seq_no[$port] = ($this->connection_in_seq_no[$port] + 1) % 0xFFFF; - } - asyncCall([$this, 'handleClientReads'], $port); - } catch (\Exception $e) { - $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); - } - } else if ($cmd > 1) { - throw new \Exception("Got unknown cmd $cmd"); - } - } else { - $this->logger->write("Reading payload\n"); - - yield $socket->read($length + 2); - $seqno = unpack('n', stream_get_contents($buffer, 2))[1]; - - if (isset($this->connections[$port]) && $seqno === $this->connection_in_seq_no[$port]) { - $this->logger->write("Receiving proxy => $port seqno $seqno main $id\n"); - $this->connections[$port]->write($d = stream_get_contents($buffer, $length)); - if (strlen($d) != $length) { - die('Wrong length'); - } - $this->connection_in_seq_no[$port] = ($this->connection_in_seq_no[$port] + 1) % 0xFFFF; - ksort($this->pending_in_payloads[$port]); - foreach ($this->pending_in_payloads[$port] as $seqno => $payload) { - if ($this->connection_in_seq_no[$port] !== $seqno) { - break; - } - $this->logger->write("Receiving proxy => $port seqno $seqno subloop $id\n"); - unset($this->pending_in_payloads[$port][$seqno]); - $this->connections[$port]->write($payload); - $this->connection_in_seq_no[$port] = ($this->connection_in_seq_no[$port] + 1) % 0xFFFF; - - } - } else { - if (!isset($this->pending_in_payloads[$port])) { - $this->pending_in_payloads[$port] = []; - } - $this->logger->write("Postponing payload {$this->connection_in_seq_no[$port]} != seqno $seqno postpone $id\n"); - - $this->pending_in_payloads[$port][$seqno] = stream_get_contents($buffer, $length); - } - } - - if (fstat($buffer)['size'] > 1 * 1024 * 1024) { - $rest = stream_get_contents($buffer); - ftruncate($buffer, strlen($rest)); - fseek($buffer, 0); - fwrite($buffer, $rest); - fseek($buffer, 0); - } - } - } -} diff --git a/src/danog/Merger/Stats.php b/src/danog/Merger/Stats.php index b3b4e19..dcb6c0c 100644 --- a/src/danog/Merger/Stats.php +++ b/src/danog/Merger/Stats.php @@ -15,6 +15,9 @@ */ namespace danog\Merger; +use Amp\Loop; + + class Stats { const MEAN_COUNT = 10; @@ -69,8 +72,15 @@ class Stats } private $speeds = []; - private $needs_starting = []; - + public function __construct() + { + Loop::repeat(1000, (function () { + foreach ($this->speeds as $elem) { + $elem->unshift((1024*1024 * 8) / 1); + $elem->pop(); + } + })->bindTo($this, get_class($this))); + } public function allocate($ID) { $this->speeds[$ID] = new \Ds\Deque(); @@ -82,11 +92,7 @@ class Stats $time = microtime(true) - $started; $this->speeds[$ID]->unshift(($sent * 8) / $time); $this->speeds[$ID]->pop(); - if (isset($this->needs_starting[$ID])) { - echo "Re-start sending $ID\n"; - unset($this->needs_starting[$ID]); - $this->startSending($ID); - } + } public function getSpeed($ID, $powerOf = 6) { @@ -110,6 +116,9 @@ class Stats foreach ($result as &$elem) { $elem = (int) ($elem * $per_bytes); + if (!$elem) { + $elem += 2; + } $sum += $elem; }