1
0
mirror of https://github.com/danog/Merger.git synced 2024-11-30 04:19:10 +01:00

1gbps, it wasn't that difficult after all

This commit is contained in:
Daniil Gentili 2019-04-03 17:30:03 +02:00
parent 11105f8f58
commit 79e31108ec
6 changed files with 40 additions and 111 deletions

View File

@ -92,9 +92,6 @@ abstract class SharedMerger
$this->logger->write("Exception {$e->getMessage()} in $host:$rport, {$port}\n");
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, Settings::ACTION_DISCONNECT));
}
} elseif ($cmd === Settings::ACTION_SYNC) {
$this->logger->write("SYNCING FOR {$port} on $writerId\n");
$this->connections[$port]->sync($writerId);
} else {
throw new \Exception("Got unknown cmd $cmd");
}

View File

@ -62,7 +62,11 @@ class Merger extends SharedMerger
$context = (new ClientConnectContext())->withBindTo($bindto);
$id = $y++;
$this->writers[$id] = new SequentialSocket(yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context), $id);
$this->writers[$id]->write(pack('n', $id));
$this->writers[$id]->write($s = pack('n', $id));
yield $this->writers[$id]->read(2);
if (fread($this->writers[$id]->getBuffer(), 2) !== $s) {
throw new Exception('Wrong reply');
}
ksort($this->writers);
asyncCall([$this, 'sharedLoop'], $id);
}
@ -158,7 +162,7 @@ class Merger extends SharedMerger
while (yield $socket->read()) {
yield $this->commonWrite($socksInit);
}
yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT));
$this->close();
};
}
}

View File

@ -58,7 +58,6 @@ class MergerServer extends SharedMerger
$server = listen($this->settings->getTunnelEndpoint());
while ($socket = yield $server->accept()) {
//list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true));
$socket = new SequentialSocket($socket);
yield $socket->read(2);
$id = unpack('n', fread($socket->getBuffer(), 2))[1];
@ -66,6 +65,7 @@ class MergerServer extends SharedMerger
$this->writers[$id] = $socket;
ksort($this->writers);
asyncCall([$this, 'sharedLoop'], $id);
yield $socket->write(pack('n', $id));
};
}
@ -76,14 +76,10 @@ class MergerServer extends SharedMerger
$socket = $this->_socket;
$buffer = $socket->getBuffer();
while (null !== $chunk = yield $socket->read()) {
fwrite($buffer, $chunk);
fseek($buffer, 0);
while (yield $socket->read()) {
yield $this->commonWrite($buffer);
}
$this->_logger->write("Closing {$this->_port}\n");
yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT));
$this->close();
};
}
}

View File

@ -64,13 +64,16 @@ class MergerWorker
$this->_logger = $logger;
$this->_callback = $callback->bindTo($this, get_class($this));
$this->_sharedStats = Stats::getInstance();
$this->_pause = new Deferred;
$this->_connectionInSubSeqNo = array_fill_keys(array_keys($this->_writers), 0);
}
public function loop($socket)
{
$this->_socket = $socket;
$this->parsePending();
if ($this->_pause) {
$pause = $this->_pause;
$this->_pause = null;
$pause->resolve();
}
asyncCall($this->_callback);
}
public function handleSharedReadAsync($writerId, $buffer, $length)
@ -80,55 +83,18 @@ class MergerWorker
yield $socket->read($length + 2);
$seqno = unpack('n', stream_get_contents($buffer, 2))[1];
if ($this->_socket && $seqno === $this->_connectionInSeqNo && !$this->_connectionInSubSeqNo[$writerId]) {
$this->_logger->write("Receiving payload with seqno $seqno main $writerId\n");
$this->_socket->write(stream_get_contents($buffer, $length));
$this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF;
if ($this->_connectionInSeqNo === 0) {
foreach ($this->_connectionInSubSeqNo as &$sseqno) {
$sseqno--;
}
$this->_pendingPayloads = $this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : [];
//ksort($this->_pendingPayloads);
/*
$this->_pause->resolve();
$this->_pause = new Deferred;
$this->_minPauseSeqno = 0;
*/
while (!$this->_socket || $seqno !== $this->_connectionInSeqNo) {
if (!$this->_pause) {
$this->_pause = new Deferred;
}
$this->parsePending();
} else {
if (!$this->_connectionInSubSeqNo[$writerId]) {
$this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) postpone $writerId\n");
$this->_pendingPayloads[$seqno] = stream_get_contents($buffer, $length);
//ksort($this->_pendingPayloads);
/*
if ($seqno - $this->_connectionInSeqNo > 200) {
$this->_logger->write("Pausing {$this->_port} - $writerId\n");
$this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo;
yield $this->_pause->promise();
$this->_logger->write("Resuming {$this->_port} - $writerId\n");
}*/
} else {
$this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) - {$this->_connectionInSubSeqNo[$writerId]} postpone $writerId\n");
$this->_pendingSubPayloads[$this->_connectionInSubSeqNo[$writerId]][$seqno] = stream_get_contents($buffer, $length);
/*
if ($this->_connectionInSubSeqNo[$writerId] > 1 || (0xFFFF + $seqno) - $this->_connectionInSeqNo > 200) {
$this->_logger->write("Pausing {$this->_port} - $writerId\n");
$this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo;
yield $this->_pause->promise();
$this->_logger->write("Resuming {$this->_port} - $writerId\n");
}*/
}
}
}
public function sync($writerId)
{
$seqno = ++$this->_connectionInSubSeqNo[$writerId];
if (!isset($this->_pendingSubPayloads[$seqno])) {
$this->_pendingSubPayloads[$seqno] = [];
$this->_socket->write(stream_get_contents($buffer, $length));
$this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF;
if ($this->_pause) {
$pause = $this->_pause;
$this->_pause = null;
$pause->resolve();
}
}
@ -139,17 +105,12 @@ class MergerWorker
$length = fstat($chunk)['size'] - ftell($chunk);
foreach ($this->_sharedStats->balance($length) as $writerId => $bytes) {
if ($bytes <= 0) {
$this->_logger->write("Skipping $bytes\n");
continue;
}
$seqno = $this->_connectionOutSeqNo;
$this->_connectionOutSeqNo = ($this->_connectionOutSeqNo + 1) % 0xFFFF;
if ($this->_connectionOutSeqNo === 0) {
foreach ($this->_writers as $writer) {
$writer->write(pack('VnC', 0, $this->_port, Settings::ACTION_SYNC));
}
}
$this->_logger->write("Still sending {$this->_port} seqno $seqno length $bytes\n");
$this->_writers[$writerId]->writeSequential(pack('Vnn', $bytes, $this->_port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve(
function ($error = null, $result = null) use (&$shared_deferred) {
@ -167,43 +128,16 @@ class MergerWorker
ftruncate($chunk, 0);
return $promise;
}
public function parsePending()
{
for ($seqno = $this->_connectionInSeqNo; $seqno < 0xFFFF && isset($this->_pendingPayloads[$seqno]); $seqno++) {
$payload = $this->_pendingPayloads[$seqno];
$this->_logger->write("Receiving proxy => {$this->_port} seqno $seqno post\n");
unset($this->_pendingPayloads[$seqno]);
$this->_socket->write($payload);
$this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF;
if ($this->_connectionInSeqNo === 0) {
foreach ($this->_connectionInSubSeqNo as &$sseqno) {
$sseqno--;
}
$this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : [];
//ksort($this->_pendingPayloads);
/*
$this->_pause->resolve();
$this->_pause = new Deferred;
$this->_minPauseSeqno = 0;
*/
$this->parsePending();
}
}
/*
if ($this->_minPauseSeqno && $this->_connectionInSeqNo > $this->_minPauseSeqno) {
$this->_pause->resolve();
$this->_pause = new Deferred;
$this->_minPauseSeqno = 0;
}*/
}
public function close()
{
if (!$this->_socket) {
return;
}
return $this->_socket->close();
$socket = $this->_socket;
$this->_socket = null;
$this->_logger->write("Closing {$this->_port}\n");
$socket->close();
$this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT));
}
public function handleSharedRead($writerId, $buffer, $length)

View File

@ -19,7 +19,6 @@ class Settings
{
const ACTION_CONNECT = 0;
const ACTION_DISCONNECT = 1;
const ACTION_SYNC = 2;
/**
* Addresses from which to connect

View File

@ -72,15 +72,6 @@ class Stats
}
private $speeds = [];
public function __construct()
{
Loop::repeat(1000, (function () {
foreach ($this->speeds as $elem) {
$elem->unshift((1024*1024 * 8) / 1);
$elem->pop();
}
})->bindTo($this, get_class($this)));
}
public function allocate($ID)
{
$this->speeds[$ID] = new \Ds\Deque();
@ -103,10 +94,17 @@ class Stats
$sum = 0;
$result = [];
$maxk = 0;
$maxv = 0;
foreach ($this->speeds as $last_key => $elem) {
$ret = $elem->sum();
$sum += $ret;
if ($ret > $maxv) {
$maxv = $ret;
$maxk = $last_key;
}
$result[$last_key] = $ret;
}
@ -114,15 +112,16 @@ class Stats
$sum = 0;
foreach ($result as &$elem) {
foreach ($result as $key => &$elem) {
$elem = (int) ($elem * $per_bytes);
if (!$elem) {
$this->speeds[$key]->unshift(1000000);
$this->speeds[$key]->pop();
$elem += 2;
}
$sum += $elem;
}
$result[$last_key] -= $sum - $bytes;
$result[$maxk] -= $sum - $bytes;
return $result;
}
public function getSpeeds($powerOf = 6)