1
0
mirror of https://github.com/danog/Merger.git synced 2024-12-02 09:17:44 +01:00
This commit is contained in:
Daniil Gentili 2019-03-23 23:36:08 +01:00
parent 5bf10b5e89
commit fa944ff1af
3 changed files with 68 additions and 43 deletions

View File

@ -6,6 +6,7 @@ use function Amp\asyncCall;
use function Amp\Socket\connect;
use function Amp\Socket\listen;
use Amp\Loop;
use Amp\ByteStream\ResourceOutputStream;
class Merger extends SharedMerger
{
@ -28,10 +29,11 @@ class Merger extends SharedMerger
{
$this->settings = $settings;
$this->shared_stats = Stats::getInstance();
/*
$this->logger = new ResourceOutputStream(fopen('php://stdout', 'r+'));
Loop::repeat(1000, function () {
var_dump($this->shared_stats->getSpeeds());
});*/
$this->logger->write(json_encode($this->shared_stats->getSpeeds(), JSON_PRETTY_PRINT));
});
}
public function loop()
{
@ -44,6 +46,7 @@ class Merger extends SharedMerger
$context = (new ClientConnectContext())->withBindTo($bindto);
$this->writers[$bindto . '-' . $x] = yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context);
$this->stats[$bindto . '-' . $x] = Stats::getInstance($bindto . '-' . $x);
$this->pending_out_payloads[$bindto . '-' . $x] = new \SplQueue;
asyncCall([$this, 'handleSharedReads'], $bindto . '-' . $x, false);
}
}
@ -56,13 +59,13 @@ class Merger extends SharedMerger
$this->connections[$port] = $socket;
$this->connection_out_seq_no[$port] = 0;
$this->connection_in_seq_no[$port] = 0;
$this->pending_payloads[$port] = [];
$this->pending_in_payloads[$port] = [];
asyncCall([$this, 'handleClientReads'], $port);
};
}
public function handleClientReads($port)
{
var_dumP("New $port");
var_dumP("New $port\n");
$socket = $this->connections[$port];
$socksInit = fopen('php://memory', 'r+');
@ -130,6 +133,7 @@ class Merger extends SharedMerger
yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0));
var_Dump("================================ SENDING CONNECT ================================");
yield $this->writers[key($this->writers)]->write(pack('VnCn', 0, $port, self::ACTION_CONNECT, $rport) . $payload);
$buffer = $socksInit;
@ -137,7 +141,7 @@ class Merger extends SharedMerger
yield $this->commonWrite($port, $buffer);
}
while (null !== $chunk = yield $socket->read()) {
var_dumP("Sending $port => proxy");
//var_dumP("Sending $port => proxy\n");
$pos = ftell($buffer);
fwrite($buffer, $chunk);
fseek($buffer, $pos);

View File

@ -5,6 +5,7 @@ use function Amp\asyncCall;
use function Amp\Socket\connect;
use function Amp\Socket\listen;
use Amp\Loop;
use Amp\ByteStream\ResourceOutputStream;
class MergerServer extends SharedMerger
{
@ -26,10 +27,10 @@ class MergerServer extends SharedMerger
{
$this->settings = $settings;
$this->shared_stats = Stats::getInstance();
/*
$this->logger = new ResourceOutputStream(fopen('php://stdout', 'r+'));
Loop::repeat(1000, function () {
var_dump($this->shared_stats->getSpeeds());
});*/
$this->logger->write(json_encode($this->shared_stats->getSpeeds(), JSON_PRETTY_PRINT));
});
}
public function loop()
{
@ -43,18 +44,20 @@ class MergerServer extends SharedMerger
list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true));
$this->writers[$address . '-' . $port] = $socket;
$this->stats[$address . '-' . $port] = Stats::getInstance($address . '-' . $port);
$this->pending_out_payloads[$address . '-' . $port] = new \SplQueue;
asyncCall([$this, 'handleSharedReads'], $address . '-' . $port, true);
};
}
public function handleClientReads($port)
{
var_dump("New $port");
$this->logger->write("New $port\n");
$socket = $this->connections[$port];
$buffer = fopen('php://memory', 'r+');
while (null !== $chunk = yield $socket->read()) {
var_dumP("Sending $port => proxy");
$this->logger->write("Sending $port => proxy\n");
$pos = ftell($buffer);
fwrite($buffer, $chunk);
@ -66,7 +69,7 @@ class MergerServer extends SharedMerger
$buffer = fopen('php://memory', 'r+');
}
}
var_dump("Closing $port");
$this->logger->write("Closing $port\n");
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT));
}

View File

@ -8,7 +8,8 @@ use function Amp\asyncCall;
abstract class SharedMerger
{
protected $pending_payloads = [];
protected $pending_in_payloads = [];
protected $pending_out_payloads = [];
protected $connection_out_seq_no = [];
protected $connection_in_seq_no = [];
@ -37,9 +38,9 @@ abstract class SharedMerger
$seqno = $this->connection_out_seq_no[$port];
$this->connection_out_seq_no[$port] = ($this->connection_out_seq_no[$port]+1) % 0xFFFF;
var_dump("Still sending $port seqno $seqno");
$this->logger->write("Still sending $port seqno $seqno\n");
$stats->startSending();
$this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . fread($chunk, $bytes))->onResolve(
$this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve(
function ($error = null, $result = null) use ($stats, &$deferred, $port) {
if ($error) {
throw $error;
@ -62,58 +63,64 @@ abstract class SharedMerger
$buffer = fopen('php://memory', 'r+');
while (true) {
$this->logger->write("Reading length\n");
if (!yield $this->readMore($socket, $buffer, 6)) {
$this->logger->write("Breaking out of $id\n");
break;
}
$length = unpack('V', fread($buffer, 4))[1];
$port = unpack('n', fread($buffer, 2))[1];
$length = unpack('V', stream_get_contents($buffer, 4))[1];
$port = unpack('n', stream_get_contents($buffer, 2))[1];
if ($length === 0) {
$this->logger->write("Reading special action $id\n");
yield $this->readMore($socket, $buffer, 1);
$cmd = ord(fread($buffer, 1));
$cmd = ord(stream_get_contents($buffer, 1));
var_dump($cmd);
if ($cmd === self::ACTION_DISCONNECT) {
$this->connections[$port]->close();
unset($this->connections[$port]);
unset($this->connection_out_seq_no[$port]);
unset($this->connection_in_seq_no[$port]);
unset($this->pending_payloads[$port]);
unset($this->pending_in_payloads[$port]);
} else if ($cmd === self::ACTION_CONNECT && $server) {
yield $this->readMore($socket, $buffer, 3);
$rport = unpack('n', fread($buffer, 2))[1];
$type = ord(fread($buffer, 1));
$rport = unpack('n', stream_get_contents($buffer, 2))[1];
$type = ord(stream_get_contents($buffer, 1));
switch ($type) {
case 0x03:
yield $this->readMore($socket, $buffer, 1);
$toRead = ord(fread($buffer, 1));
$toRead = ord(stream_get_contents($buffer, 1));
yield $this->readMore($socket, $buffer, $toRead);
$host = fread($buffer, $toRead);
$host = stream_get_contents($buffer, $toRead);
break;
case 0x04:
$toRead = 16;
yield $this->readMore($socket, $buffer, $toRead);
$host = '[' . inet_ntop(fread($buffer, $toRead)) . ']';
$host = '[' . inet_ntop(stream_get_contents($buffer, $toRead)) . ']';
break;
case 0x01:
$toRead = 4;
yield $this->readMore($socket, $buffer, $toRead);
$host = inet_ntop(fread($buffer, $toRead));
$host = inet_ntop(stream_get_contents($buffer, $toRead));
break;
}
var_dump("Connecting to $host:$rport, $port");
$this->logger->write("Connecting to $host:$rport, $port\n");
try {
$this->connection_out_seq_no[$port] = 0;
$this->connection_in_seq_no[$port] = 0;
$this->pending_payloads[$port] = [];
$this->pending_in_payloads[$port] = [];
$this->connections[$port] = yield connect("tcp://$host:$rport");
foreach ($this->pending_payloads[$port] as $seqno => $payload) {
ksort($this->pending_in_payloads[$port]);
foreach ($this->pending_in_payloads[$port] as $seqno => $payload) {
if ($this->connection_in_seq_no[$port] !== $seqno) {
break;
}
var_dump("Receiving proxy => $port seqno $seqno");
var_dumP($payload);
$this->logger->write("Receiving proxy => $port seqno $seqno init $id\n");
//$this->logger->write($payload);
unset($this->pending_payloads[$port][$seqno]);
unset($this->pending_in_payloads[$port][$seqno]);
$this->connections[$port]->write($payload);
$this->connection_in_seq_no[$port]++;
}
@ -121,37 +128,48 @@ abstract class SharedMerger
} catch (\Exception $e) {
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT));
}
} else if ($cmd > 1) {
throw new \Exception("Got unknown cmd $cmd");
}
} else {
$this->logger->write("Reading payload\n");
yield $this->readMore($socket, $buffer, $length + 2);
$seqno = unpack('n', fread($buffer, 2))[1];
$seqno = unpack('n', stream_get_contents($buffer, 2))[1];
if (isset($this->connections[$port]) && $seqno === $this->connection_in_seq_no[$port]) {
var_dump("Recieving proxy => $port seqno $seqno");
$this->connections[$port]->write($data = fread($buffer, $length));
$this->logger->write("Receiving proxy => $port seqno $seqno main $id\n");
$this->connections[$port]->write($d = stream_get_contents($buffer, $length));
if (strlen($d) != $length) {
die('Wrong length');
}
$this->connection_in_seq_no[$port]++;
var_dumP($data);
foreach ($this->pending_payloads[$port] as $seqno => $payload) {
//$this->logger->write($data);
ksort($this->pending_in_payloads[$port]);
foreach ($this->pending_in_payloads[$port] as $seqno => $payload) {
if ($this->connection_in_seq_no[$port] !== $seqno) {
break;
}
var_dump("Receiving proxy => $port seqno $seqno");
unset($this->pending_payloads[$port][$seqno]);
$this->logger->write("Receiving proxy => $port seqno $seqno subloop $id\n");
unset($this->pending_in_payloads[$port][$seqno]);
$this->connections[$port]->write($payload);
$this->connection_in_seq_no[$port]++;
var_dumP($payload);
//$this->logger->write($payload);
}
} else {
if (!isset($this->pending_payloads[$port])) {
$this->pending_payloads[$port] = [];
if (!isset($this->pending_in_payloads[$port])) {
$this->pending_in_payloads[$port] = [];
}
$this->logger->write("Postponing payload {$this->connection_in_seq_no[$port]} != seqno $seqno postpone $id\n");
$this->pending_payloads[$port][$seqno] = fread($buffer, $length);
$this->pending_in_payloads[$port][$seqno] = stream_get_contents($buffer, $length);
}
}
if (fstat($buffer)['size'] > 10 * 1024 * 1024) {
$this->logger->write("=============== Resetting buffer\n");
$rest = stream_get_contents($buffer);
fclose($buffer);
$buffer = fopen('php://memory', 'r+');