diff --git a/src/danog/Merger/Abstr/SharedMerger.php b/src/danog/Merger/Abstr/SharedMerger.php index 44c8bbc..ea3a167 100644 --- a/src/danog/Merger/Abstr/SharedMerger.php +++ b/src/danog/Merger/Abstr/SharedMerger.php @@ -92,9 +92,6 @@ abstract class SharedMerger $this->logger->write("Exception {$e->getMessage()} in $host:$rport, {$port}\n"); $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, Settings::ACTION_DISCONNECT)); } - } elseif ($cmd === Settings::ACTION_SYNC) { - $this->logger->write("SYNCING FOR {$port} on $writerId\n"); - $this->connections[$port]->sync($writerId); } else { throw new \Exception("Got unknown cmd $cmd"); } diff --git a/src/danog/Merger/Merger.php b/src/danog/Merger/Merger.php index b4daefc..37d0e5b 100644 --- a/src/danog/Merger/Merger.php +++ b/src/danog/Merger/Merger.php @@ -62,7 +62,11 @@ class Merger extends SharedMerger $context = (new ClientConnectContext())->withBindTo($bindto); $id = $y++; $this->writers[$id] = new SequentialSocket(yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context), $id); - $this->writers[$id]->write(pack('n', $id)); + $this->writers[$id]->write($s = pack('n', $id)); + yield $this->writers[$id]->read(2); + if (fread($this->writers[$id]->getBuffer(), 2) !== $s) { + throw new Exception('Wrong reply'); + } ksort($this->writers); asyncCall([$this, 'sharedLoop'], $id); } @@ -158,7 +162,7 @@ class Merger extends SharedMerger while (yield $socket->read()) { yield $this->commonWrite($socksInit); } - yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT)); + $this->close(); }; } } diff --git a/src/danog/Merger/MergerServer.php b/src/danog/Merger/MergerServer.php index da681b4..eafec99 100644 --- a/src/danog/Merger/MergerServer.php +++ b/src/danog/Merger/MergerServer.php @@ -58,7 +58,6 @@ class MergerServer extends SharedMerger $server = listen($this->settings->getTunnelEndpoint()); while ($socket = yield $server->accept()) { - //list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true)); $socket = new SequentialSocket($socket); yield $socket->read(2); $id = unpack('n', fread($socket->getBuffer(), 2))[1]; @@ -66,6 +65,7 @@ class MergerServer extends SharedMerger $this->writers[$id] = $socket; ksort($this->writers); asyncCall([$this, 'sharedLoop'], $id); + yield $socket->write(pack('n', $id)); }; } @@ -76,14 +76,10 @@ class MergerServer extends SharedMerger $socket = $this->_socket; $buffer = $socket->getBuffer(); - while (null !== $chunk = yield $socket->read()) { - fwrite($buffer, $chunk); - fseek($buffer, 0); - + while (yield $socket->read()) { yield $this->commonWrite($buffer); } - $this->_logger->write("Closing {$this->_port}\n"); - yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT)); + $this->close(); }; } } diff --git a/src/danog/Merger/MergerWorker.php b/src/danog/Merger/MergerWorker.php index 2efc4d3..8efddd9 100644 --- a/src/danog/Merger/MergerWorker.php +++ b/src/danog/Merger/MergerWorker.php @@ -64,13 +64,16 @@ class MergerWorker $this->_logger = $logger; $this->_callback = $callback->bindTo($this, get_class($this)); $this->_sharedStats = Stats::getInstance(); - $this->_pause = new Deferred; $this->_connectionInSubSeqNo = array_fill_keys(array_keys($this->_writers), 0); } public function loop($socket) { $this->_socket = $socket; - $this->parsePending(); + if ($this->_pause) { + $pause = $this->_pause; + $this->_pause = null; + $pause->resolve(); + } asyncCall($this->_callback); } public function handleSharedReadAsync($writerId, $buffer, $length) @@ -80,55 +83,18 @@ class MergerWorker yield $socket->read($length + 2); $seqno = unpack('n', stream_get_contents($buffer, 2))[1]; - if ($this->_socket && $seqno === $this->_connectionInSeqNo && !$this->_connectionInSubSeqNo[$writerId]) { - $this->_logger->write("Receiving payload with seqno $seqno main $writerId\n"); - $this->_socket->write(stream_get_contents($buffer, $length)); - $this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF; - - if ($this->_connectionInSeqNo === 0) { - foreach ($this->_connectionInSubSeqNo as &$sseqno) { - $sseqno--; - } - $this->_pendingPayloads = $this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : []; - //ksort($this->_pendingPayloads); - /* - $this->_pause->resolve(); - $this->_pause = new Deferred; - $this->_minPauseSeqno = 0; - */ + while (!$this->_socket || $seqno !== $this->_connectionInSeqNo) { + if (!$this->_pause) { + $this->_pause = new Deferred; } - $this->parsePending(); - } else { - if (!$this->_connectionInSubSeqNo[$writerId]) { - $this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) postpone $writerId\n"); - $this->_pendingPayloads[$seqno] = stream_get_contents($buffer, $length); - //ksort($this->_pendingPayloads); - /* - if ($seqno - $this->_connectionInSeqNo > 200) { - $this->_logger->write("Pausing {$this->_port} - $writerId\n"); - $this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo; yield $this->_pause->promise(); - $this->_logger->write("Resuming {$this->_port} - $writerId\n"); - }*/ - } else { - $this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) - {$this->_connectionInSubSeqNo[$writerId]} postpone $writerId\n"); - $this->_pendingSubPayloads[$this->_connectionInSubSeqNo[$writerId]][$seqno] = stream_get_contents($buffer, $length); - /* - if ($this->_connectionInSubSeqNo[$writerId] > 1 || (0xFFFF + $seqno) - $this->_connectionInSeqNo > 200) { - $this->_logger->write("Pausing {$this->_port} - $writerId\n"); - $this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo; - yield $this->_pause->promise(); - $this->_logger->write("Resuming {$this->_port} - $writerId\n"); - }*/ - } } - - } - public function sync($writerId) - { - $seqno = ++$this->_connectionInSubSeqNo[$writerId]; - if (!isset($this->_pendingSubPayloads[$seqno])) { - $this->_pendingSubPayloads[$seqno] = []; + $this->_socket->write(stream_get_contents($buffer, $length)); + $this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF; + if ($this->_pause) { + $pause = $this->_pause; + $this->_pause = null; + $pause->resolve(); } } @@ -139,17 +105,12 @@ class MergerWorker $length = fstat($chunk)['size'] - ftell($chunk); foreach ($this->_sharedStats->balance($length) as $writerId => $bytes) { if ($bytes <= 0) { + $this->_logger->write("Skipping $bytes\n"); continue; } $seqno = $this->_connectionOutSeqNo; $this->_connectionOutSeqNo = ($this->_connectionOutSeqNo + 1) % 0xFFFF; - if ($this->_connectionOutSeqNo === 0) { - foreach ($this->_writers as $writer) { - $writer->write(pack('VnC', 0, $this->_port, Settings::ACTION_SYNC)); - } - } - $this->_logger->write("Still sending {$this->_port} seqno $seqno length $bytes\n"); $this->_writers[$writerId]->writeSequential(pack('Vnn', $bytes, $this->_port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve( function ($error = null, $result = null) use (&$shared_deferred) { @@ -167,43 +128,16 @@ class MergerWorker ftruncate($chunk, 0); return $promise; } - - public function parsePending() - { - for ($seqno = $this->_connectionInSeqNo; $seqno < 0xFFFF && isset($this->_pendingPayloads[$seqno]); $seqno++) { - $payload = $this->_pendingPayloads[$seqno]; - $this->_logger->write("Receiving proxy => {$this->_port} seqno $seqno post\n"); - - unset($this->_pendingPayloads[$seqno]); - $this->_socket->write($payload); - $this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF; - if ($this->_connectionInSeqNo === 0) { - foreach ($this->_connectionInSubSeqNo as &$sseqno) { - $sseqno--; - } - $this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : []; - //ksort($this->_pendingPayloads); - /* - $this->_pause->resolve(); - $this->_pause = new Deferred; - $this->_minPauseSeqno = 0; - */ - $this->parsePending(); - } - } - /* - if ($this->_minPauseSeqno && $this->_connectionInSeqNo > $this->_minPauseSeqno) { - $this->_pause->resolve(); - $this->_pause = new Deferred; - $this->_minPauseSeqno = 0; - }*/ - } public function close() { if (!$this->_socket) { return; } - return $this->_socket->close(); + $socket = $this->_socket; + $this->_socket = null; + $this->_logger->write("Closing {$this->_port}\n"); + $socket->close(); + $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT)); } public function handleSharedRead($writerId, $buffer, $length) diff --git a/src/danog/Merger/Settings.php b/src/danog/Merger/Settings.php index 01802a9..ad46565 100644 --- a/src/danog/Merger/Settings.php +++ b/src/danog/Merger/Settings.php @@ -19,7 +19,6 @@ class Settings { const ACTION_CONNECT = 0; const ACTION_DISCONNECT = 1; - const ACTION_SYNC = 2; /** * Addresses from which to connect diff --git a/src/danog/Merger/Stats.php b/src/danog/Merger/Stats.php index dcb6c0c..dd39811 100644 --- a/src/danog/Merger/Stats.php +++ b/src/danog/Merger/Stats.php @@ -72,15 +72,6 @@ class Stats } private $speeds = []; - public function __construct() - { - Loop::repeat(1000, (function () { - foreach ($this->speeds as $elem) { - $elem->unshift((1024*1024 * 8) / 1); - $elem->pop(); - } - })->bindTo($this, get_class($this))); - } public function allocate($ID) { $this->speeds[$ID] = new \Ds\Deque(); @@ -103,10 +94,17 @@ class Stats $sum = 0; $result = []; + $maxk = 0; + $maxv = 0; foreach ($this->speeds as $last_key => $elem) { $ret = $elem->sum(); $sum += $ret; + if ($ret > $maxv) { + $maxv = $ret; + $maxk = $last_key; + } + $result[$last_key] = $ret; } @@ -114,15 +112,16 @@ class Stats $sum = 0; - foreach ($result as &$elem) { + foreach ($result as $key => &$elem) { $elem = (int) ($elem * $per_bytes); if (!$elem) { + $this->speeds[$key]->unshift(1000000); + $this->speeds[$key]->pop(); $elem += 2; } $sum += $elem; } - - $result[$last_key] -= $sum - $bytes; + $result[$maxk] -= $sum - $bytes; return $result; } public function getSpeeds($powerOf = 6)