diff --git a/src/danog/Merger/Merger.php b/src/danog/Merger/Merger.php index 191a0c9..16194ec 100644 --- a/src/danog/Merger/Merger.php +++ b/src/danog/Merger/Merger.php @@ -6,6 +6,7 @@ use function Amp\asyncCall; use function Amp\Socket\connect; use function Amp\Socket\listen; use Amp\Loop; +use Amp\ByteStream\ResourceOutputStream; class Merger extends SharedMerger { @@ -28,10 +29,11 @@ class Merger extends SharedMerger { $this->settings = $settings; $this->shared_stats = Stats::getInstance(); - /* + $this->logger = new ResourceOutputStream(fopen('php://stdout', 'r+')); + Loop::repeat(1000, function () { - var_dump($this->shared_stats->getSpeeds()); - });*/ + $this->logger->write(json_encode($this->shared_stats->getSpeeds(), JSON_PRETTY_PRINT)); + }); } public function loop() { @@ -44,6 +46,7 @@ class Merger extends SharedMerger $context = (new ClientConnectContext())->withBindTo($bindto); $this->writers[$bindto . '-' . $x] = yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context); $this->stats[$bindto . '-' . $x] = Stats::getInstance($bindto . '-' . $x); + $this->pending_out_payloads[$bindto . '-' . $x] = new \SplQueue; asyncCall([$this, 'handleSharedReads'], $bindto . '-' . $x, false); } } @@ -56,13 +59,13 @@ class Merger extends SharedMerger $this->connections[$port] = $socket; $this->connection_out_seq_no[$port] = 0; $this->connection_in_seq_no[$port] = 0; - $this->pending_payloads[$port] = []; + $this->pending_in_payloads[$port] = []; asyncCall([$this, 'handleClientReads'], $port); }; } public function handleClientReads($port) { - var_dumP("New $port"); + var_dumP("New $port\n"); $socket = $this->connections[$port]; $socksInit = fopen('php://memory', 'r+'); @@ -130,6 +133,7 @@ class Merger extends SharedMerger yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0)); + var_Dump("================================ SENDING CONNECT ================================"); yield $this->writers[key($this->writers)]->write(pack('VnCn', 0, $port, self::ACTION_CONNECT, $rport) . $payload); $buffer = $socksInit; @@ -137,7 +141,7 @@ class Merger extends SharedMerger yield $this->commonWrite($port, $buffer); } while (null !== $chunk = yield $socket->read()) { - var_dumP("Sending $port => proxy"); + //var_dumP("Sending $port => proxy\n"); $pos = ftell($buffer); fwrite($buffer, $chunk); fseek($buffer, $pos); diff --git a/src/danog/Merger/MergerServer.php b/src/danog/Merger/MergerServer.php index 34eadc6..5760eae 100644 --- a/src/danog/Merger/MergerServer.php +++ b/src/danog/Merger/MergerServer.php @@ -5,6 +5,7 @@ use function Amp\asyncCall; use function Amp\Socket\connect; use function Amp\Socket\listen; use Amp\Loop; +use Amp\ByteStream\ResourceOutputStream; class MergerServer extends SharedMerger { @@ -26,10 +27,10 @@ class MergerServer extends SharedMerger { $this->settings = $settings; $this->shared_stats = Stats::getInstance(); - /* + $this->logger = new ResourceOutputStream(fopen('php://stdout', 'r+')); Loop::repeat(1000, function () { - var_dump($this->shared_stats->getSpeeds()); - });*/ + $this->logger->write(json_encode($this->shared_stats->getSpeeds(), JSON_PRETTY_PRINT)); + }); } public function loop() { @@ -43,18 +44,20 @@ class MergerServer extends SharedMerger list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true)); $this->writers[$address . '-' . $port] = $socket; $this->stats[$address . '-' . $port] = Stats::getInstance($address . '-' . $port); + $this->pending_out_payloads[$address . '-' . $port] = new \SplQueue; + asyncCall([$this, 'handleSharedReads'], $address . '-' . $port, true); }; } public function handleClientReads($port) { - var_dump("New $port"); + $this->logger->write("New $port\n"); $socket = $this->connections[$port]; $buffer = fopen('php://memory', 'r+'); while (null !== $chunk = yield $socket->read()) { - var_dumP("Sending $port => proxy"); + $this->logger->write("Sending $port => proxy\n"); $pos = ftell($buffer); fwrite($buffer, $chunk); @@ -66,7 +69,7 @@ class MergerServer extends SharedMerger $buffer = fopen('php://memory', 'r+'); } } - var_dump("Closing $port"); + $this->logger->write("Closing $port\n"); $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); } diff --git a/src/danog/Merger/SharedMerger.php b/src/danog/Merger/SharedMerger.php index 7ba2d6f..1614a04 100644 --- a/src/danog/Merger/SharedMerger.php +++ b/src/danog/Merger/SharedMerger.php @@ -8,7 +8,8 @@ use function Amp\asyncCall; abstract class SharedMerger { - protected $pending_payloads = []; + protected $pending_in_payloads = []; + protected $pending_out_payloads = []; protected $connection_out_seq_no = []; protected $connection_in_seq_no = []; @@ -37,9 +38,9 @@ abstract class SharedMerger $seqno = $this->connection_out_seq_no[$port]; $this->connection_out_seq_no[$port] = ($this->connection_out_seq_no[$port]+1) % 0xFFFF; - var_dump("Still sending $port seqno $seqno"); + $this->logger->write("Still sending $port seqno $seqno\n"); $stats->startSending(); - $this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . fread($chunk, $bytes))->onResolve( + $this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve( function ($error = null, $result = null) use ($stats, &$deferred, $port) { if ($error) { throw $error; @@ -62,58 +63,64 @@ abstract class SharedMerger $buffer = fopen('php://memory', 'r+'); while (true) { + $this->logger->write("Reading length\n"); if (!yield $this->readMore($socket, $buffer, 6)) { + $this->logger->write("Breaking out of $id\n"); break; } - $length = unpack('V', fread($buffer, 4))[1]; - $port = unpack('n', fread($buffer, 2))[1]; + + $length = unpack('V', stream_get_contents($buffer, 4))[1]; + $port = unpack('n', stream_get_contents($buffer, 2))[1]; if ($length === 0) { + $this->logger->write("Reading special action $id\n"); + yield $this->readMore($socket, $buffer, 1); - $cmd = ord(fread($buffer, 1)); + $cmd = ord(stream_get_contents($buffer, 1)); + var_dump($cmd); if ($cmd === self::ACTION_DISCONNECT) { $this->connections[$port]->close(); unset($this->connections[$port]); unset($this->connection_out_seq_no[$port]); unset($this->connection_in_seq_no[$port]); - unset($this->pending_payloads[$port]); + unset($this->pending_in_payloads[$port]); } else if ($cmd === self::ACTION_CONNECT && $server) { yield $this->readMore($socket, $buffer, 3); - $rport = unpack('n', fread($buffer, 2))[1]; - $type = ord(fread($buffer, 1)); + $rport = unpack('n', stream_get_contents($buffer, 2))[1]; + $type = ord(stream_get_contents($buffer, 1)); switch ($type) { case 0x03: yield $this->readMore($socket, $buffer, 1); - $toRead = ord(fread($buffer, 1)); + $toRead = ord(stream_get_contents($buffer, 1)); yield $this->readMore($socket, $buffer, $toRead); - $host = fread($buffer, $toRead); + $host = stream_get_contents($buffer, $toRead); break; case 0x04: $toRead = 16; yield $this->readMore($socket, $buffer, $toRead); - $host = '[' . inet_ntop(fread($buffer, $toRead)) . ']'; + $host = '[' . inet_ntop(stream_get_contents($buffer, $toRead)) . ']'; break; case 0x01: $toRead = 4; yield $this->readMore($socket, $buffer, $toRead); - $host = inet_ntop(fread($buffer, $toRead)); + $host = inet_ntop(stream_get_contents($buffer, $toRead)); break; } - var_dump("Connecting to $host:$rport, $port"); + $this->logger->write("Connecting to $host:$rport, $port\n"); try { $this->connection_out_seq_no[$port] = 0; $this->connection_in_seq_no[$port] = 0; - $this->pending_payloads[$port] = []; + $this->pending_in_payloads[$port] = []; $this->connections[$port] = yield connect("tcp://$host:$rport"); - - foreach ($this->pending_payloads[$port] as $seqno => $payload) { + ksort($this->pending_in_payloads[$port]); + foreach ($this->pending_in_payloads[$port] as $seqno => $payload) { if ($this->connection_in_seq_no[$port] !== $seqno) { break; } - var_dump("Receiving proxy => $port seqno $seqno"); - var_dumP($payload); + $this->logger->write("Receiving proxy => $port seqno $seqno init $id\n"); + //$this->logger->write($payload); - unset($this->pending_payloads[$port][$seqno]); + unset($this->pending_in_payloads[$port][$seqno]); $this->connections[$port]->write($payload); $this->connection_in_seq_no[$port]++; } @@ -121,37 +128,48 @@ abstract class SharedMerger } catch (\Exception $e) { $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); } + } else if ($cmd > 1) { + throw new \Exception("Got unknown cmd $cmd"); } } else { + $this->logger->write("Reading payload\n"); + yield $this->readMore($socket, $buffer, $length + 2); - $seqno = unpack('n', fread($buffer, 2))[1]; + $seqno = unpack('n', stream_get_contents($buffer, 2))[1]; if (isset($this->connections[$port]) && $seqno === $this->connection_in_seq_no[$port]) { - var_dump("Recieving proxy => $port seqno $seqno"); - $this->connections[$port]->write($data = fread($buffer, $length)); + $this->logger->write("Receiving proxy => $port seqno $seqno main $id\n"); + $this->connections[$port]->write($d = stream_get_contents($buffer, $length)); + if (strlen($d) != $length) { + die('Wrong length'); + } $this->connection_in_seq_no[$port]++; - var_dumP($data); - foreach ($this->pending_payloads[$port] as $seqno => $payload) { + //$this->logger->write($data); + ksort($this->pending_in_payloads[$port]); + foreach ($this->pending_in_payloads[$port] as $seqno => $payload) { if ($this->connection_in_seq_no[$port] !== $seqno) { break; } - var_dump("Receiving proxy => $port seqno $seqno"); - unset($this->pending_payloads[$port][$seqno]); + $this->logger->write("Receiving proxy => $port seqno $seqno subloop $id\n"); + unset($this->pending_in_payloads[$port][$seqno]); $this->connections[$port]->write($payload); $this->connection_in_seq_no[$port]++; - var_dumP($payload); + //$this->logger->write($payload); } } else { - if (!isset($this->pending_payloads[$port])) { - $this->pending_payloads[$port] = []; + if (!isset($this->pending_in_payloads[$port])) { + $this->pending_in_payloads[$port] = []; } + $this->logger->write("Postponing payload {$this->connection_in_seq_no[$port]} != seqno $seqno postpone $id\n"); - $this->pending_payloads[$port][$seqno] = fread($buffer, $length); + $this->pending_in_payloads[$port][$seqno] = stream_get_contents($buffer, $length); } } if (fstat($buffer)['size'] > 10 * 1024 * 1024) { + $this->logger->write("=============== Resetting buffer\n"); + $rest = stream_get_contents($buffer); fclose($buffer); $buffer = fopen('php://memory', 'r+');