1
0
mirror of https://github.com/danog/Merger.git synced 2024-11-30 04:19:10 +01:00
This commit is contained in:
Daniil Gentili 2019-03-24 13:04:33 +00:00
parent 397691f340
commit f4e5da6f22
5 changed files with 34 additions and 31 deletions

2
.gitignore vendored
View File

@ -1 +1,3 @@
/vendor/
composer.lock
*.swp

View File

@ -30,7 +30,7 @@ class Merger extends SharedMerger
protected $connections = [];
protected $stats = [];
protected $shared_stats = [];
protected $connection_seqno = 0;
protected $connection_seqno = 3;
const STATE_HEADER = 0;
const STATE_DATA = 1;
@ -151,20 +151,18 @@ class Merger extends SharedMerger
var_Dump("================================ SENDING CONNECT ================================");
yield $this->writers[key($this->writers)]->write(pack('VnCn', 0, $port, self::ACTION_CONNECT, $rport) . $payload);
$buffer = $socksInit;
if (fstat($buffer)['size'] - ftell($buffer)) {
$buffer = fopen('php://memory', 'r+');
if (fstat($socksInit)['size'] - ftell($socksInit)) {
fwrite($buffer, stream_get_contents($socksInit));
fseek($buffer, 0);
yield $this->commonWrite($port, $buffer);
}
fclose($socksInit);
while (null !== $chunk = yield $socket->read()) {
//var_dumP("Sending $port => proxy\n");
$pos = ftell($buffer);
fwrite($buffer, $chunk);
fseek($buffer, $pos);
fseek($buffer, 0);
yield $this->commonWrite($port, $buffer);
if (fstat($buffer)['size'] > 10 * 1024 * 1024) {
$buffer = fopen('php://memory', 'r+');
}
}
yield $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT));
}

View File

@ -71,17 +71,11 @@ class MergerServer extends SharedMerger
$buffer = fopen('php://memory', 'r+');
while (null !== $chunk = yield $socket->read()) {
$this->logger->write("Sending $port => proxy\n");
$pos = ftell($buffer);
//$this->logger->write("Sending $port => proxy\n");
fwrite($buffer, $chunk);
fseek($buffer, $pos);
fseek($buffer, 0);
yield $this->commonWrite($port, $buffer);
if (fstat($buffer)['size'] > 10 * 1024 * 1024) {
$buffer = fopen('php://memory', 'r+');
}
}
$this->logger->write("Closing $port\n");
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT));

View File

@ -36,10 +36,12 @@ abstract class SharedMerger
public function readMoreAsync($socket, $buffer, $length)
{
$read = true;
while (fstat($buffer)['size'] - ($pos = ftell($buffer)) < $length && ($read = yield $socket->read()) !== null) {
$pos = ftell($buffer);
fseek($buffer, 0, SEEK_END);
while (fstat($buffer)['size'] - $pos < $length && ($read = yield $socket->read()) !== null) {
fwrite($buffer, $read);
fseek($buffer, $pos);
}
fseek($buffer, $pos);
return $read !== null;
}
public function commonWrite($port, $chunk)
@ -52,10 +54,10 @@ abstract class SharedMerger
$seqno = $this->connection_out_seq_no[$port];
$this->connection_out_seq_no[$port] = ($this->connection_out_seq_no[$port]+1) % 0xFFFF;
$this->logger->write("Still sending $port seqno $seqno\n");
$this->logger->write("Still sending $port seqno $seqno length $bytes\n");
$stats->startSending();
$this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve(
function ($error = null, $result = null) use ($stats, &$deferred, $port) {
function ($error = null, $result = null) use ($stats, &$deferred, $port, $bytes) {
if ($error) {
throw $error;
}
@ -67,6 +69,8 @@ abstract class SharedMerger
}
);
}
ftruncate($chunk, 0);
fseek($chunk, 0);
return $promise;
}
@ -77,7 +81,6 @@ abstract class SharedMerger
$buffer = fopen('php://memory', 'r+');
while (true) {
$this->logger->write("Reading length\n");
if (!yield $this->readMore($socket, $buffer, 6)) {
$this->logger->write("Breaking out of $id\n");
break;
@ -86,6 +89,7 @@ abstract class SharedMerger
$length = unpack('V', stream_get_contents($buffer, 4))[1];
$port = unpack('n', stream_get_contents($buffer, 2))[1];
$this->logger->write("Reading length $length port $port\n");
if ($length === 0) {
$this->logger->write("Reading special action $id\n");
@ -132,7 +136,6 @@ abstract class SharedMerger
break;
}
$this->logger->write("Receiving proxy => $port seqno $seqno init $id\n");
//$this->logger->write($payload);
unset($this->pending_in_payloads[$port][$seqno]);
$this->connections[$port]->write($payload);
@ -158,7 +161,6 @@ abstract class SharedMerger
die('Wrong length');
}
$this->connection_in_seq_no[$port]++;
//$this->logger->write($data);
ksort($this->pending_in_payloads[$port]);
foreach ($this->pending_in_payloads[$port] as $seqno => $payload) {
if ($this->connection_in_seq_no[$port] !== $seqno) {
@ -168,7 +170,6 @@ abstract class SharedMerger
unset($this->pending_in_payloads[$port][$seqno]);
$this->connections[$port]->write($payload);
$this->connection_in_seq_no[$port]++;
//$this->logger->write($payload);
}
} else {
@ -182,11 +183,9 @@ abstract class SharedMerger
}
if (fstat($buffer)['size'] > 10 * 1024 * 1024) {
$this->logger->write("=============== Resetting buffer\n");
$rest = stream_get_contents($buffer);
fclose($buffer);
$buffer = fopen('php://memory', 'r+');
ftruncate($buffer, strlen($rest));
fseek($buffer, 0);
fwrite($buffer, $rest);
fseek($buffer, 0);
}

View File

@ -78,22 +78,32 @@ class Stats
private $speeds = [];
private $temp = [];
private $needs_starting = [];
public function allocate($ID)
{
$this->speeds[$ID] = new \Ds\Deque();
$this->speeds[$ID]->allocate(self::MEAN_COUNT);
$this->speeds[$ID]->push(...array_fill(0, $this->speeds[$ID]->capacity(), 1000));
$this->temp[$ID] = 0;
}
public function startSending($ID)
{
$this->temp[$ID] = microtime(true);
if (!isset($this->temp[$ID])) {
$this->temp[$ID] = microtime(true);
} else {
$this->needs_starting[$ID] = true;
}
}
public function stopSending($ID, $sent)
{
$this->speeds[$ID]->push(($sent * 8) / (microtime(true) - $this->temp[$ID]));
$this->speeds[$ID]->unshift(($sent * 8) / (microtime(true) - $this->temp[$ID]));
$this->speeds[$ID]->pop();
unset($this->temp[$ID]);
if (isset($this->needs_starting[$ID])) {
unset($this->needs_starting[$ID]);
$this->startSending($ID);
}
}
public function getSpeed($ID, $powerOf = 6)
{