diff --git a/.gitignore b/.gitignore index 57872d0..f8b077d 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ /vendor/ +composer.lock +*.swp diff --git a/src/danog/Merger/Merger.php b/src/danog/Merger/Merger.php index 3ae09a2..2cb25c1 100644 --- a/src/danog/Merger/Merger.php +++ b/src/danog/Merger/Merger.php @@ -30,7 +30,7 @@ class Merger extends SharedMerger protected $connections = []; protected $stats = []; protected $shared_stats = []; - protected $connection_seqno = 0; + protected $connection_seqno = 3; const STATE_HEADER = 0; const STATE_DATA = 1; @@ -151,20 +151,18 @@ class Merger extends SharedMerger var_Dump("================================ SENDING CONNECT ================================"); yield $this->writers[key($this->writers)]->write(pack('VnCn', 0, $port, self::ACTION_CONNECT, $rport) . $payload); - $buffer = $socksInit; - if (fstat($buffer)['size'] - ftell($buffer)) { + $buffer = fopen('php://memory', 'r+'); + if (fstat($socksInit)['size'] - ftell($socksInit)) { + fwrite($buffer, stream_get_contents($socksInit)); + fseek($buffer, 0); yield $this->commonWrite($port, $buffer); } + fclose($socksInit); while (null !== $chunk = yield $socket->read()) { //var_dumP("Sending $port => proxy\n"); - $pos = ftell($buffer); fwrite($buffer, $chunk); - fseek($buffer, $pos); + fseek($buffer, 0); yield $this->commonWrite($port, $buffer); - - if (fstat($buffer)['size'] > 10 * 1024 * 1024) { - $buffer = fopen('php://memory', 'r+'); - } } yield $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); } diff --git a/src/danog/Merger/MergerServer.php b/src/danog/Merger/MergerServer.php index 166b743..225ca69 100644 --- a/src/danog/Merger/MergerServer.php +++ b/src/danog/Merger/MergerServer.php @@ -71,17 +71,11 @@ class MergerServer extends SharedMerger $buffer = fopen('php://memory', 'r+'); while (null !== $chunk = yield $socket->read()) { - $this->logger->write("Sending $port => proxy\n"); - - $pos = ftell($buffer); + //$this->logger->write("Sending $port => proxy\n"); fwrite($buffer, $chunk); - fseek($buffer, $pos); + fseek($buffer, 0); yield $this->commonWrite($port, $buffer); - - if (fstat($buffer)['size'] > 10 * 1024 * 1024) { - $buffer = fopen('php://memory', 'r+'); - } } $this->logger->write("Closing $port\n"); $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); diff --git a/src/danog/Merger/SharedMerger.php b/src/danog/Merger/SharedMerger.php index a879e02..17eccfd 100644 --- a/src/danog/Merger/SharedMerger.php +++ b/src/danog/Merger/SharedMerger.php @@ -36,10 +36,12 @@ abstract class SharedMerger public function readMoreAsync($socket, $buffer, $length) { $read = true; - while (fstat($buffer)['size'] - ($pos = ftell($buffer)) < $length && ($read = yield $socket->read()) !== null) { + $pos = ftell($buffer); + fseek($buffer, 0, SEEK_END); + while (fstat($buffer)['size'] - $pos < $length && ($read = yield $socket->read()) !== null) { fwrite($buffer, $read); - fseek($buffer, $pos); } + fseek($buffer, $pos); return $read !== null; } public function commonWrite($port, $chunk) @@ -52,10 +54,10 @@ abstract class SharedMerger $seqno = $this->connection_out_seq_no[$port]; $this->connection_out_seq_no[$port] = ($this->connection_out_seq_no[$port]+1) % 0xFFFF; - $this->logger->write("Still sending $port seqno $seqno\n"); + $this->logger->write("Still sending $port seqno $seqno length $bytes\n"); $stats->startSending(); $this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve( - function ($error = null, $result = null) use ($stats, &$deferred, $port) { + function ($error = null, $result = null) use ($stats, &$deferred, $port, $bytes) { if ($error) { throw $error; } @@ -67,6 +69,8 @@ abstract class SharedMerger } ); } + ftruncate($chunk, 0); + fseek($chunk, 0); return $promise; } @@ -77,7 +81,6 @@ abstract class SharedMerger $buffer = fopen('php://memory', 'r+'); while (true) { - $this->logger->write("Reading length\n"); if (!yield $this->readMore($socket, $buffer, 6)) { $this->logger->write("Breaking out of $id\n"); break; @@ -86,6 +89,7 @@ abstract class SharedMerger $length = unpack('V', stream_get_contents($buffer, 4))[1]; $port = unpack('n', stream_get_contents($buffer, 2))[1]; + $this->logger->write("Reading length $length port $port\n"); if ($length === 0) { $this->logger->write("Reading special action $id\n"); @@ -132,7 +136,6 @@ abstract class SharedMerger break; } $this->logger->write("Receiving proxy => $port seqno $seqno init $id\n"); - //$this->logger->write($payload); unset($this->pending_in_payloads[$port][$seqno]); $this->connections[$port]->write($payload); @@ -158,7 +161,6 @@ abstract class SharedMerger die('Wrong length'); } $this->connection_in_seq_no[$port]++; - //$this->logger->write($data); ksort($this->pending_in_payloads[$port]); foreach ($this->pending_in_payloads[$port] as $seqno => $payload) { if ($this->connection_in_seq_no[$port] !== $seqno) { @@ -168,7 +170,6 @@ abstract class SharedMerger unset($this->pending_in_payloads[$port][$seqno]); $this->connections[$port]->write($payload); $this->connection_in_seq_no[$port]++; - //$this->logger->write($payload); } } else { @@ -182,11 +183,9 @@ abstract class SharedMerger } if (fstat($buffer)['size'] > 10 * 1024 * 1024) { - $this->logger->write("=============== Resetting buffer\n"); - $rest = stream_get_contents($buffer); - fclose($buffer); - $buffer = fopen('php://memory', 'r+'); + ftruncate($buffer, strlen($rest)); + fseek($buffer, 0); fwrite($buffer, $rest); fseek($buffer, 0); } diff --git a/src/danog/Merger/Stats.php b/src/danog/Merger/Stats.php index 173229c..4512919 100644 --- a/src/danog/Merger/Stats.php +++ b/src/danog/Merger/Stats.php @@ -78,22 +78,32 @@ class Stats private $speeds = []; private $temp = []; + private $needs_starting = []; public function allocate($ID) { $this->speeds[$ID] = new \Ds\Deque(); $this->speeds[$ID]->allocate(self::MEAN_COUNT); $this->speeds[$ID]->push(...array_fill(0, $this->speeds[$ID]->capacity(), 1000)); + $this->temp[$ID] = 0; } public function startSending($ID) { - $this->temp[$ID] = microtime(true); + if (!isset($this->temp[$ID])) { + $this->temp[$ID] = microtime(true); + } else { + $this->needs_starting[$ID] = true; + } } public function stopSending($ID, $sent) { - $this->speeds[$ID]->push(($sent * 8) / (microtime(true) - $this->temp[$ID])); + $this->speeds[$ID]->unshift(($sent * 8) / (microtime(true) - $this->temp[$ID])); $this->speeds[$ID]->pop(); unset($this->temp[$ID]); + if (isset($this->needs_starting[$ID])) { + unset($this->needs_starting[$ID]); + $this->startSending($ID); + } } public function getSpeed($ID, $powerOf = 6) {