1
0
mirror of https://github.com/danog/Merger.git synced 2024-12-02 09:17:44 +01:00

Total rewrite

This commit is contained in:
Daniil Gentili 2019-04-03 16:43:54 +02:00
parent f86d931c3d
commit 11105f8f58
9 changed files with 555 additions and 311 deletions

View File

@ -0,0 +1,114 @@
<?php
/**
* Merger client
*
* This file is part of Merger.
* Merger is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* Merger is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with Merger.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2019 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*/
namespace danog\Merger\Abstr;
use function Amp\Socket\connect;
use danog\Merger\MergerWorker;
use danog\Merger\SequentialSocket;
use danog\Merger\Settings;
/**
* Abstract class shared merger
*/
abstract class SharedMerger
{
/**
* Get read loop
*
* @return callable
*/
abstract public function getReadLoop(): callable;
/**
* Shared socket loop
*
* @param int $writerId Writer ID
* @return void
*/
public function sharedLoop($writerId)
{
$socket = $this->writers[$writerId];
$buffer = $socket->getBuffer();
$loop_callback = $this->getReadLoop();
while (true) {
if (!yield $socket->read(6)) {
$this->logger->write("Breaking out of $writerId\n");
break;
}
$length = unpack('V', stream_get_contents($buffer, 4))[1];
$port = unpack('n', stream_get_contents($buffer, 2))[1];
if ($length === 0) {
yield $socket->read(1);
$cmd = ord(stream_get_contents($buffer, 1));
$this->logger->write("Reading special action $cmd $writerId\n");
if ($cmd === Settings::ACTION_DISCONNECT) {
$this->connections[$port]->close();
unset($this->connections[$port]);
} elseif ($cmd === Settings::ACTION_CONNECT && $this->server) {
yield $socket->read(3);
$rport = unpack('n', stream_get_contents($buffer, 2))[1];
$type = ord(stream_get_contents($buffer, 1));
switch ($type) {
case 0x03:
yield $socket->read(1);
$toRead = ord(stream_get_contents($buffer, 1));
yield $socket->read($toRead);
$host = stream_get_contents($buffer, $toRead);
break;
case 0x04:
$toRead = 16;
yield $socket->read($toRead);
$host = '[' . inet_ntop(stream_get_contents($buffer, $toRead)) . ']';
break;
case 0x01:
$toRead = 4;
yield $socket->read($toRead);
$host = inet_ntop(stream_get_contents($buffer, $toRead));
break;
}
$this->logger->write("Connecting to $host:$rport, {$port}\n");
try {
$this->connections[$port] = new MergerWorker($port, $loop_callback, $this->logger, $this->writers);
$this->connections[$port]->loop(new SequentialSocket(yield connect("tcp://$host:$rport")));
$this->logger->write("Connected to $host:$rport, {$port}\n");
} catch (\Exception $e) {
$this->logger->write("Exception {$e->getMessage()} in $host:$rport, {$port}\n");
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, Settings::ACTION_DISCONNECT));
}
} elseif ($cmd === Settings::ACTION_SYNC) {
$this->logger->write("SYNCING FOR {$port} on $writerId\n");
$this->connections[$port]->sync($writerId);
} else {
throw new \Exception("Got unknown cmd $cmd");
}
} else {
yield $this->connections[$port]->handleSharedRead($writerId, $buffer, $length);
}
if (fstat($buffer)['size'] > 1 * 1024 * 1024) {
$rest = stream_get_contents($buffer);
ftruncate($buffer, strlen($rest));
fseek($buffer, 0);
fwrite($buffer, $rest);
fseek($buffer, 0);
}
}
}
}

View File

@ -0,0 +1,29 @@
<?php
namespace danog\Merger;
class Exception extends \Exception
{
public function __construct($message = null, $code = 0, self $previous = null, $file = null, $line = null)
{
if ($file !== null) {
$this->file = $file;
}
if ($line !== null) {
$this->line = $line;
}
parent::__construct($message, $code, $previous);
}
/**
* ExceptionErrorHandler.
*
* Error handler
*/
public static function ExceptionErrorHandler($errno = 0, $errstr = null, $errfile = null, $errline = null)
{
// If error is suppressed with @, don't throw an exception
if (error_reporting() === 0 || strpos($errstr, 'headers already sent') || ($errfile && strpos($errfile, 'vendor/amphp') !== false)) {
return false;
}
throw new self($errstr, $errno, null, $errfile, $errline);
}
}

View File

@ -22,19 +22,17 @@ use Amp\Socket\ClientConnectContext;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\Socket\connect; use function Amp\Socket\connect;
use function Amp\Socket\listen; use function Amp\Socket\listen;
use danog\Merger\Abstr\SharedMerger;
class Merger extends SharedMerger class Merger extends SharedMerger
{ {
protected $settings; protected $settings;
protected $writers = []; protected $writers = [];
protected $connections = []; protected $connections = [];
protected $stats = []; protected $shared_stats;
protected $shared_stats = []; protected $server = false;
protected $connection_seqno = 3; protected $connection_seqno = 3;
const STATE_HEADER = 0;
const STATE_DATA = 1;
/** /**
* Constructor * Constructor
* *
@ -58,112 +56,109 @@ class Merger extends SharedMerger
} }
public function loopAsync() public function loopAsync()
{ {
$y = 0;
foreach ($this->settings->getConnectFromAddresses() as $bindto) { foreach ($this->settings->getConnectFromAddresses() as $bindto) {
for ($x = 0; $x < $this->settings->getConnectionCount(); $x++) { for ($x = 0; $x < $this->settings->getConnectionCount(); $x++) {
$context = (new ClientConnectContext())->withBindTo($bindto); $context = (new ClientConnectContext())->withBindTo($bindto);
$this->writers[$bindto . '-' . $x] = new SharedSocket(yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context)); $id = $y++;
$this->stats[$bindto . '-' . $x] = Stats::getInstance($bindto . '-' . $x); $this->writers[$id] = new SequentialSocket(yield connect('tcp://' . $this->settings->getTunnelEndpoint(), $context), $id);
$this->pending_out_payloads[$bindto . '-' . $x] = new \SplQueue; $this->writers[$id]->write(pack('n', $id));
asyncCall([$this, 'handleSharedReads'], $bindto . '-' . $x, false); ksort($this->writers);
asyncCall([$this, 'sharedLoop'], $id);
} }
} }
$server = listen("127.0.0.1:55555"); $server = listen("127.0.0.1:55555");
$loop_callback = $this->getReadLoop();
while ($socket = yield $server->accept()) { while ($socket = yield $server->accept()) {
$port = explode(':', stream_socket_get_name($socket->getResource(), true))[1]; $port = explode(':', stream_socket_get_name($socket->getResource(), true))[1];
$port = $this->connection_seqno++; $port = $this->connection_seqno++;
$this->connections[$port] = $socket; $this->connections[$port] = new MergerWorker($port, $loop_callback, $this->logger, $this->writers);
$this->connection_out_seq_no[$port] = 0; $this->connections[$port]->loop(new SequentialSocket($socket));
$this->connection_in_seq_no[$port] = 0;
$this->pending_in_payloads[$port] = [];
asyncCall([$this, 'handleClientReads'], $port);
}; };
} }
public function handleClientReads($port) public function getReadLoop(): callable
{ {
$this->logger->write("New $port\n"); return function () {
$socket = $this->connections[$port]; $this->_logger->write("New {$this->_port}\n");
$socket = $this->_socket;
$socksInit = $socket->getBuffer();
$socksInit = fopen('php://memory', 'r+'); yield $socket->read(2);
yield $this->readMore($socket, $socksInit, 2);
if (fread($socksInit, 1) !== chr(5)) { if (fread($socksInit, 1) !== chr(5)) {
throw new \Exception('Wrong socks5 init '); throw new \Exception('Wrong socks5 init ');
}
yield $socket->write(chr(5));
$auth = null;
for ($x = 0; $x < ord(fread($socksInit, 1)); $x++) {
yield $this->readMore($socket, $socksInit, 1);
$type = ord(fread($socksInit, 1));
if ($type === 0) {
$auth = false;
} else if ($type === 2) {
$auth = true;
} }
} yield $socket->write(chr(5));
if ($auth === null) { $auth = null;
throw new \Exception('No socks5 method'); for ($x = 0; $x < ord(fread($socksInit, 1)); $x++) {
} yield $socket->read(1);
$authchr = chr($auth ? 2 : 0); $type = ord(fread($socksInit, 1));
yield $socket->write($authchr); if ($type === 0) {
$auth = false;
} else if ($type === 2) {
$auth = true;
}
}
if ($auth === null) {
throw new \Exception('No socks5 method');
}
$authchr = chr($auth ? 2 : 0);
yield $socket->write($authchr);
yield $this->readMore($socket, $socksInit, 4); yield $socket->read(3);
if (fread($socksInit, 3) !== chr(5) . chr(1) . $authchr) { if (fread($socksInit, 3) !== chr(5) . chr(1) . $authchr) {
throw new \Exception('Wrong socks5 ack'); throw new \Exception('Wrong socks5 ack');
} }
if ($auth) { if ($auth) {
yield $this->readMore($socket, $socksInit, 2); yield $socket->read(1);
$ulen = ord(fread(2)[1]); $ulen = ord(fread(1));
yield $this->readMore($socket, $socksInit, $ulen); yield $socket->read($ulen);
$username = fread($socksInit, $ulen); $username = fread($socksInit, $ulen);
$plen = ord(fread(1)); yield $socket->read(1);
yield $this->readMore($socket, $socksInit, $plen); $plen = ord(fread(1));
$password = fread($socksInit, $plen); yield $socket->read($plen);
} $password = fread($socksInit, $plen);
$payload = fread($socksInit, 1); }
switch (ord($payload[0])) { yield $socket->read(1);
case 0x03: $payload = fread($socksInit, 1);
yield $this->readMore($socket, $socksInit, 1); switch (ord($payload)) {
$payload .= fread($socksInit, 1); case 0x03:
$toRead = ord($payload[1]); yield $socket->read(1);
yield $this->readMore($socket, $socksInit, $toRead); $payload .= fread($socksInit, 1);
$payload .= fread($socksInit, $toRead); $toRead = ord($payload[1]);
break; yield $socket->read($toRead);
case 0x04: $payload .= fread($socksInit, $toRead);
$toRead = 16; break;
yield $this->readMore($socket, $socksInit, $toRead); case 0x04:
$payload .= fread($socksInit, $toRead); $toRead = 16;
break; yield $socket->read($toRead);
case 0x01: $payload .= fread($socksInit, $toRead);
$toRead = 4; break;
yield $this->readMore($socket, $socksInit, $toRead); case 0x01:
$payload .= fread($socksInit, $toRead); $toRead = 4;
break; yield $socket->read($toRead);
} $payload .= fread($socksInit, $toRead);
yield $this->readMore($socket, $socksInit, 2); break;
$rport = unpack('n', fread($socksInit, 2))[1]; }
yield $socket->read(2);
$rport = unpack('n', fread($socksInit, 2))[1];
yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0)); yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0));
$this->logger->write("================================ SENDING CONNECT ================================"); $this->_logger->write("================================ SENDING CONNECT ================================\n");
yield $this->writers[key($this->writers)]->write(pack('VnCn', 0, $port, self::ACTION_CONNECT, $rport) . $payload); yield $this->_writers[key($this->_writers)]->write(pack('VnCn', 0, $this->_port, Settings::ACTION_CONNECT, $rport) . $payload);
$buffer = fopen('php://memory', 'r+'); if (fstat($socksInit)['size'] - ftell($socksInit)) {
if (fstat($socksInit)['size'] - ftell($socksInit)) { yield $this->commonWrite($socksInit);
fwrite($buffer, stream_get_contents($socksInit)); }
fseek($buffer, 0); while (yield $socket->read()) {
yield $this->commonWrite($port, $buffer); yield $this->commonWrite($socksInit);
} }
fclose($socksInit); yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT));
while (null !== $chunk = yield $socket->read()) { };
//$this->logger->write("Sending $port => proxy\n");
fwrite($buffer, $chunk);
fseek($buffer, 0);
yield $this->commonWrite($port, $buffer);
}
yield $this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT));
} }
} }

View File

@ -16,17 +16,17 @@
namespace danog\Merger; namespace danog\Merger;
use function Amp\asyncCall; use function Amp\asyncCall;
use function Amp\Socket\connect;
use function Amp\Socket\listen; use function Amp\Socket\listen;
use Amp\Loop; use Amp\Loop;
use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\ResourceOutputStream;
use danog\Merger\Abstr\SharedMerger;
class MergerServer extends SharedMerger class MergerServer extends SharedMerger
{ {
protected $settings; protected $settings;
protected $writers = []; protected $writers = [];
protected $connections = []; protected $connections = [];
protected $stats = []; protected $server = true;
const STATE_HEADER = 0; const STATE_HEADER = 0;
const STATE_HEADER_CMD = 1; const STATE_HEADER_CMD = 1;
@ -58,30 +58,32 @@ class MergerServer extends SharedMerger
$server = listen($this->settings->getTunnelEndpoint()); $server = listen($this->settings->getTunnelEndpoint());
while ($socket = yield $server->accept()) { while ($socket = yield $server->accept()) {
list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true)); //list($address, $port) = explode(':', stream_socket_get_name($socket->getResource(), true));
$this->writers[$address . '-' . $port] = new SharedSocket($socket); $socket = new SequentialSocket($socket);
$this->stats[$address . '-' . $port] = Stats::getInstance($address . '-' . $port); yield $socket->read(2);
$this->pending_out_payloads[$address . '-' . $port] = new \SplQueue; $id = unpack('n', fread($socket->getBuffer(), 2))[1];
$socket->setId($id);
asyncCall([$this, 'handleSharedReads'], $address . '-' . $port, true); $this->writers[$id] = $socket;
ksort($this->writers);
asyncCall([$this, 'sharedLoop'], $id);
}; };
} }
public function handleClientReads($port) public function getReadLoop(): callable
{ {
$this->logger->write("New $port\n"); return function () {
$socket = $this->connections[$port]; $this->_logger->write("New {$this->_port}\n");
$socket = $this->_socket;
$buffer = fopen('php://memory', 'r+'); $buffer = $socket->getBuffer();
while (null !== $chunk = yield $socket->read()) { while (null !== $chunk = yield $socket->read()) {
//$this->logger->write("Sending $port => proxy\n"); fwrite($buffer, $chunk);
fwrite($buffer, $chunk); fseek($buffer, 0);
fseek($buffer, 0);
yield $this->commonWrite($port, $buffer); yield $this->commonWrite($buffer);
} }
$this->logger->write("Closing $port\n"); $this->_logger->write("Closing {$this->_port}\n");
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT)); yield $this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT));
};
} }
} }

View File

@ -0,0 +1,213 @@
<?php
namespace danog\Merger;
use Amp\Deferred;
use function Amp\asyncCall;
use function Amp\call;
class MergerWorker
{
/**
* Shared writers
*
* @var array
*/
private $_writers;
/**
* Shared stats instance
*
* @var [type]
*/
private $_sharedStats;
/**
* Main socket
*
* @var [type]
*/
private $_socket;
/**
* Connection ID
*
* @var [type]
*/
private $_port;
/**
* Logger instance
*
* @var [type]
*/
private $_logger;
private $_connectionOutSeqNo = 0;
private $_connectionInSeqNo = 0;
private $_pendingPayloads = [];
private $_connectionInSubSeqNo = [];
private $_pendingSubPayloads = [];
private $_pause;
private $_minPauseSeqno = 0;
/**
* Construct
*
* @param [type] $port
* @param [type] $callback
* @param [type] $logger
* @param [type] $writers
*/
public function __construct($port, $callback, $logger, &$writers)
{
$this->_port = $port;
$this->_writers = $writers;
$this->_logger = $logger;
$this->_callback = $callback->bindTo($this, get_class($this));
$this->_sharedStats = Stats::getInstance();
$this->_pause = new Deferred;
$this->_connectionInSubSeqNo = array_fill_keys(array_keys($this->_writers), 0);
}
public function loop($socket)
{
$this->_socket = $socket;
$this->parsePending();
asyncCall($this->_callback);
}
public function handleSharedReadAsync($writerId, $buffer, $length)
{
$socket = $this->_writers[$writerId];
yield $socket->read($length + 2);
$seqno = unpack('n', stream_get_contents($buffer, 2))[1];
if ($this->_socket && $seqno === $this->_connectionInSeqNo && !$this->_connectionInSubSeqNo[$writerId]) {
$this->_logger->write("Receiving payload with seqno $seqno main $writerId\n");
$this->_socket->write(stream_get_contents($buffer, $length));
$this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF;
if ($this->_connectionInSeqNo === 0) {
foreach ($this->_connectionInSubSeqNo as &$sseqno) {
$sseqno--;
}
$this->_pendingPayloads = $this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : [];
//ksort($this->_pendingPayloads);
/*
$this->_pause->resolve();
$this->_pause = new Deferred;
$this->_minPauseSeqno = 0;
*/
}
$this->parsePending();
} else {
if (!$this->_connectionInSubSeqNo[$writerId]) {
$this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) postpone $writerId\n");
$this->_pendingPayloads[$seqno] = stream_get_contents($buffer, $length);
//ksort($this->_pendingPayloads);
/*
if ($seqno - $this->_connectionInSeqNo > 200) {
$this->_logger->write("Pausing {$this->_port} - $writerId\n");
$this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo;
yield $this->_pause->promise();
$this->_logger->write("Resuming {$this->_port} - $writerId\n");
}*/
} else {
$this->_logger->write("Postponing payload with seqno $seqno (curseq {$this->_connectionInSeqNo}) - {$this->_connectionInSubSeqNo[$writerId]} postpone $writerId\n");
$this->_pendingSubPayloads[$this->_connectionInSubSeqNo[$writerId]][$seqno] = stream_get_contents($buffer, $length);
/*
if ($this->_connectionInSubSeqNo[$writerId] > 1 || (0xFFFF + $seqno) - $this->_connectionInSeqNo > 200) {
$this->_logger->write("Pausing {$this->_port} - $writerId\n");
$this->_minPauseSeqno = $this->_minPauseSeqno ? min($this->_minPauseSeqno, $this->_connectionInSeqNo) : $this->_connectionInSeqNo;
yield $this->_pause->promise();
$this->_logger->write("Resuming {$this->_port} - $writerId\n");
}*/
}
}
}
public function sync($writerId)
{
$seqno = ++$this->_connectionInSubSeqNo[$writerId];
if (!isset($this->_pendingSubPayloads[$seqno])) {
$this->_pendingSubPayloads[$seqno] = [];
}
}
public function commonWrite($chunk)
{
$shared_deferred = new Deferred();
$promise = $shared_deferred->promise();
$length = fstat($chunk)['size'] - ftell($chunk);
foreach ($this->_sharedStats->balance($length) as $writerId => $bytes) {
if ($bytes <= 0) {
continue;
}
$seqno = $this->_connectionOutSeqNo;
$this->_connectionOutSeqNo = ($this->_connectionOutSeqNo + 1) % 0xFFFF;
if ($this->_connectionOutSeqNo === 0) {
foreach ($this->_writers as $writer) {
$writer->write(pack('VnC', 0, $this->_port, Settings::ACTION_SYNC));
}
}
$this->_logger->write("Still sending {$this->_port} seqno $seqno length $bytes\n");
$this->_writers[$writerId]->writeSequential(pack('Vnn', $bytes, $this->_port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve(
function ($error = null, $result = null) use (&$shared_deferred) {
if ($error) {
throw $error;
}
if ($shared_deferred) {
$shared_deferred->resolve();
$shared_deferred = null;
}
}
);
}
fseek($chunk, 0);
ftruncate($chunk, 0);
return $promise;
}
public function parsePending()
{
for ($seqno = $this->_connectionInSeqNo; $seqno < 0xFFFF && isset($this->_pendingPayloads[$seqno]); $seqno++) {
$payload = $this->_pendingPayloads[$seqno];
$this->_logger->write("Receiving proxy => {$this->_port} seqno $seqno post\n");
unset($this->_pendingPayloads[$seqno]);
$this->_socket->write($payload);
$this->_connectionInSeqNo = ($this->_connectionInSeqNo + 1) % 0xFFFF;
if ($this->_connectionInSeqNo === 0) {
foreach ($this->_connectionInSubSeqNo as &$sseqno) {
$sseqno--;
}
$this->_pendingSubPayloads ? array_shift($this->_pendingSubPayloads) : [];
//ksort($this->_pendingPayloads);
/*
$this->_pause->resolve();
$this->_pause = new Deferred;
$this->_minPauseSeqno = 0;
*/
$this->parsePending();
}
}
/*
if ($this->_minPauseSeqno && $this->_connectionInSeqNo > $this->_minPauseSeqno) {
$this->_pause->resolve();
$this->_pause = new Deferred;
$this->_minPauseSeqno = 0;
}*/
}
public function close()
{
if (!$this->_socket) {
return;
}
return $this->_socket->close();
}
public function handleSharedRead($writerId, $buffer, $length)
{
return call([$this, 'handleSharedReadAsync'], $writerId, $buffer, $length);
}
}

View File

@ -0,0 +1,71 @@
<?php
namespace danog\Merger;
use Amp\Success;
use function Amp\call;
class SequentialSocket
{
private $socket;
private $buffer;
private $last_write;
private $stats;
public function __construct($socket, $id = null)
{
$this->socket = $socket;
$this->buffer = fopen('php://memory', 'r+');
$this->last_write = new Success();
$this->stats = Stats::getInstance($id);
}
public function setId($id)
{
$this->stats = Stats::getInstance($id);
}
public function getBuffer()
{
return $this->buffer;
}
public function read($length = 0)
{
return call([$this, 'readAsync'], $length);
}
public function readAsync($length)
{
if (!$length) {
$pos = ftell($this->buffer);
$read = yield $this->socket->read();
fwrite($this->buffer, $read);
fseek($this->buffer, $pos);
return $read !== null;
}
$read = true;
$pos = ftell($this->buffer);
fseek($this->buffer, 0, SEEK_END);
while (fstat($this->buffer)['size'] - $pos < $length && ($read = yield $this->socket->read()) !== null) {
fwrite($this->buffer, $read);
}
fseek($this->buffer, $pos);
return $read !== null;
}
public function write($data)
{
return $this->socket->write($data);
}
public function writeSequential($data)
{
return call([$this, 'writeAsync'], $data);
}
public function writeAsync($data)
{
yield $this->last_write;
$started = microtime(true);
$wrote = yield $this->last_write = $this->socket->write($data);
$this->stats->stopSending($started, $wrote);
return $wrote;
}
public function close()
{
return $this->socket->close();
}
}

View File

@ -17,6 +17,10 @@ namespace danog\Merger;
class Settings class Settings
{ {
const ACTION_CONNECT = 0;
const ACTION_DISCONNECT = 1;
const ACTION_SYNC = 2;
/** /**
* Addresses from which to connect * Addresses from which to connect
* *

View File

@ -1,193 +0,0 @@
<?php
/**
* Merger client
*
* This file is part of Merger.
* Merger is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* Merger is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with Merger.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2019 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
*/
namespace danog\Merger;
use Amp\Deferred;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\Socket\connect;
abstract class SharedMerger
{
protected $pending_in_payloads = [];
protected $pending_out_payloads = [];
protected $connection_out_seq_no = [];
protected $connection_in_seq_no = [];
const ACTION_CONNECT = 0;
const ACTION_DISCONNECT = 1;
public function readMore($socket, $buffer, $length)
{
return call([$this, 'readMoreAsync'], $socket, $buffer, $length);
}
public function readMoreAsync($socket, $buffer, $length)
{
$read = true;
$pos = ftell($buffer);
fseek($buffer, 0, SEEK_END);
while (fstat($buffer)['size'] - $pos < $length && ($read = yield $socket->read()) !== null) {
fwrite($buffer, $read);
}
fseek($buffer, $pos);
return $read !== null;
}
public function commonWrite($port, $chunk)
{
$shared_deferred = new Deferred();
$promise = $shared_deferred->promise();
$length = fstat($chunk)['size'] - ftell($chunk);
foreach ($this->shared_stats->balance($length) as $id => $bytes) {
if ($bytes === 0) continue;
$stats = $this->stats[$id];
$seqno = $this->connection_out_seq_no[$port];
$this->connection_out_seq_no[$port] = ($this->connection_out_seq_no[$port] + 1) % 0xFFFF;
$this->logger->write("Still sending $port seqno $seqno length $bytes\n");
$this->writers[$id]->write(pack('Vnn', $bytes, $port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve(
function ($error = null, $result = null) use ($stats, &$shared_deferred, $bytes) {
if ($error) {
throw $error;
}
if ($shared_deferred) {
$shared_deferred->resolve();
$shared_deferred = null;
}
$stats->stopSending($result, $bytes);
}
);
}
ftruncate($chunk, 0);
fseek($chunk, 0);
return $promise;
}
public function handleSharedReads($id, $server)
{
$socket = $this->writers[$id];
$buffer = $socket->getBuffer();
while (true) {
if (!yield $socket->read(6)) {
$this->logger->write("Breaking out of $id\n");
break;
}
$length = unpack('V', stream_get_contents($buffer, 4))[1];
$port = unpack('n', stream_get_contents($buffer, 2))[1];
$this->logger->write("Reading length $length port $port\n");
if ($length === 0) {
yield $socket->read(1);
$cmd = ord(stream_get_contents($buffer, 1));
$this->logger->write("Reading special action $cmd $id\n");
if ($cmd === self::ACTION_DISCONNECT) {
$this->connections[$port]->close();
unset($this->connections[$port]);
unset($this->connection_out_seq_no[$port]);
unset($this->connection_in_seq_no[$port]);
unset($this->pending_in_payloads[$port]);
} else if ($cmd === self::ACTION_CONNECT && $server) {
yield $socket->read(3);
$rport = unpack('n', stream_get_contents($buffer, 2))[1];
$type = ord(stream_get_contents($buffer, 1));
switch ($type) {
case 0x03:
yield $socket->read(1);
$toRead = ord(stream_get_contents($buffer, 1));
yield $socket->read($toRead);
$host = stream_get_contents($buffer, $toRead);
break;
case 0x04:
$toRead = 16;
yield $socket->read($toRead);
$host = '[' . inet_ntop(stream_get_contents($buffer, $toRead)) . ']';
break;
case 0x01:
$toRead = 4;
yield $socket->read($toRead);
$host = inet_ntop(stream_get_contents($buffer, $toRead));
break;
}
$this->logger->write("Connecting to $host:$rport, $port\n");
try {
$this->connection_out_seq_no[$port] = 0;
$this->connection_in_seq_no[$port] = 0;
$this->pending_in_payloads[$port] = [];
$this->connections[$port] = yield connect("tcp://$host:$rport");
ksort($this->pending_in_payloads[$port]);
foreach ($this->pending_in_payloads[$port] as $seqno => $payload) {
if ($this->connection_in_seq_no[$port] !== $seqno) {
break;
}
$this->logger->write("Receiving proxy => $port seqno $seqno init $id\n");
unset($this->pending_in_payloads[$port][$seqno]);
$this->connections[$port]->write($payload);
$this->connection_in_seq_no[$port] = ($this->connection_in_seq_no[$port] + 1) % 0xFFFF;
}
asyncCall([$this, 'handleClientReads'], $port);
} catch (\Exception $e) {
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, self::ACTION_DISCONNECT));
}
} else if ($cmd > 1) {
throw new \Exception("Got unknown cmd $cmd");
}
} else {
$this->logger->write("Reading payload\n");
yield $socket->read($length + 2);
$seqno = unpack('n', stream_get_contents($buffer, 2))[1];
if (isset($this->connections[$port]) && $seqno === $this->connection_in_seq_no[$port]) {
$this->logger->write("Receiving proxy => $port seqno $seqno main $id\n");
$this->connections[$port]->write($d = stream_get_contents($buffer, $length));
if (strlen($d) != $length) {
die('Wrong length');
}
$this->connection_in_seq_no[$port] = ($this->connection_in_seq_no[$port] + 1) % 0xFFFF;
ksort($this->pending_in_payloads[$port]);
foreach ($this->pending_in_payloads[$port] as $seqno => $payload) {
if ($this->connection_in_seq_no[$port] !== $seqno) {
break;
}
$this->logger->write("Receiving proxy => $port seqno $seqno subloop $id\n");
unset($this->pending_in_payloads[$port][$seqno]);
$this->connections[$port]->write($payload);
$this->connection_in_seq_no[$port] = ($this->connection_in_seq_no[$port] + 1) % 0xFFFF;
}
} else {
if (!isset($this->pending_in_payloads[$port])) {
$this->pending_in_payloads[$port] = [];
}
$this->logger->write("Postponing payload {$this->connection_in_seq_no[$port]} != seqno $seqno postpone $id\n");
$this->pending_in_payloads[$port][$seqno] = stream_get_contents($buffer, $length);
}
}
if (fstat($buffer)['size'] > 1 * 1024 * 1024) {
$rest = stream_get_contents($buffer);
ftruncate($buffer, strlen($rest));
fseek($buffer, 0);
fwrite($buffer, $rest);
fseek($buffer, 0);
}
}
}
}

View File

@ -15,6 +15,9 @@
*/ */
namespace danog\Merger; namespace danog\Merger;
use Amp\Loop;
class Stats class Stats
{ {
const MEAN_COUNT = 10; const MEAN_COUNT = 10;
@ -69,8 +72,15 @@ class Stats
} }
private $speeds = []; private $speeds = [];
private $needs_starting = []; public function __construct()
{
Loop::repeat(1000, (function () {
foreach ($this->speeds as $elem) {
$elem->unshift((1024*1024 * 8) / 1);
$elem->pop();
}
})->bindTo($this, get_class($this)));
}
public function allocate($ID) public function allocate($ID)
{ {
$this->speeds[$ID] = new \Ds\Deque(); $this->speeds[$ID] = new \Ds\Deque();
@ -82,11 +92,7 @@ class Stats
$time = microtime(true) - $started; $time = microtime(true) - $started;
$this->speeds[$ID]->unshift(($sent * 8) / $time); $this->speeds[$ID]->unshift(($sent * 8) / $time);
$this->speeds[$ID]->pop(); $this->speeds[$ID]->pop();
if (isset($this->needs_starting[$ID])) {
echo "Re-start sending $ID\n";
unset($this->needs_starting[$ID]);
$this->startSending($ID);
}
} }
public function getSpeed($ID, $powerOf = 6) public function getSpeed($ID, $powerOf = 6)
{ {
@ -110,6 +116,9 @@ class Stats
foreach ($result as &$elem) { foreach ($result as &$elem) {
$elem = (int) ($elem * $per_bytes); $elem = (int) ($elem * $per_bytes);
if (!$elem) {
$elem += 2;
}
$sum += $elem; $sum += $elem;
} }