mirror of
https://github.com/danog/Merger.git
synced 2024-11-30 04:19:10 +01:00
More fixes
This commit is contained in:
parent
3e0e584512
commit
4d4de73f41
@ -6,7 +6,7 @@ use danog\Merger\Merger;
|
||||
use danog\Merger\Settings;
|
||||
|
||||
$settings = new Settings();
|
||||
$settings->setTunnelEndpoint('127.0.0.1', 4444);
|
||||
$settings->setTunnelEndpoint('manuel.giuseppem99.cf', 4444);
|
||||
$settings->addConnectAddress('192.168.1.236');
|
||||
$client = new Merger($settings);
|
||||
$client->loop();
|
@ -18,8 +18,8 @@ namespace danog\Merger\Abstr;
|
||||
use danog\Merger\MergerWorker;
|
||||
use danog\Merger\SequentialSocket;
|
||||
use danog\Merger\Settings;
|
||||
use function Amp\Socket\connect;
|
||||
use function Amp\asyncCall;
|
||||
use function Amp\Socket\connect;
|
||||
|
||||
/**
|
||||
* Abstract class shared merger
|
||||
@ -94,7 +94,7 @@ abstract class SharedMerger
|
||||
$this->logger->write("Connected to $host:$rport, {$port}\n");
|
||||
} catch (\Exception $e) {
|
||||
$this->logger->write("Exception {$e->getMessage()} in $host:$rport, {$port}\n");
|
||||
$this->writers[key($this->writers)]->write(pack('VnC', 0, $port, Settings::ACTION_DISCONNECT));
|
||||
$this->writers[0]->write(pack('VnC', 0, $port, Settings::ACTION_DISCONNECT));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
@ -86,50 +86,51 @@ class Merger extends SharedMerger
|
||||
public function getReadLoop(): callable
|
||||
{
|
||||
return function () {
|
||||
$this->_logger->write("New {$this->_port}\n");
|
||||
$socket = $this->_socket;
|
||||
$socksInit = $socket->getBuffer();
|
||||
try {
|
||||
$this->_logger->write("New {$this->_port}\n");
|
||||
$socket = $this->_socket;
|
||||
$socksInit = $socket->getBuffer();
|
||||
|
||||
yield $socket->read(2);
|
||||
yield $socket->read(2);
|
||||
|
||||
if (fread($socksInit, 1) !== chr(5)) {
|
||||
throw new \Exception('Wrong socks5 init ');
|
||||
}
|
||||
yield $socket->write(chr(5));
|
||||
$auth = null;
|
||||
for ($x = 0; $x < ord(fread($socksInit, 1)); $x++) {
|
||||
yield $socket->read(1);
|
||||
$type = ord(fread($socksInit, 1));
|
||||
if ($type === 0) {
|
||||
$auth = false;
|
||||
} else if ($type === 2) {
|
||||
$auth = true;
|
||||
if (fread($socksInit, 1) !== chr(5)) {
|
||||
throw new \Exception('Wrong socks5 init ');
|
||||
}
|
||||
}
|
||||
if ($auth === null) {
|
||||
throw new \Exception('No socks5 method');
|
||||
}
|
||||
$authchr = chr($auth ? 2 : 0);
|
||||
yield $socket->write($authchr);
|
||||
yield $socket->write(chr(5));
|
||||
$auth = null;
|
||||
for ($x = 0; $x < ord(fread($socksInit, 1)); $x++) {
|
||||
yield $socket->read(1);
|
||||
$type = ord(fread($socksInit, 1));
|
||||
if ($type === 0) {
|
||||
$auth = false;
|
||||
} elseif ($type === 2) {
|
||||
$auth = true;
|
||||
}
|
||||
}
|
||||
if ($auth === null) {
|
||||
throw new \Exception('No socks5 method');
|
||||
}
|
||||
$authchr = chr($auth ? 2 : 0);
|
||||
yield $socket->write($authchr);
|
||||
|
||||
yield $socket->read(3);
|
||||
if (fread($socksInit, 3) !== chr(5) . chr(1) . $authchr) {
|
||||
throw new \Exception('Wrong socks5 ack');
|
||||
}
|
||||
if ($auth) {
|
||||
yield $socket->read(1);
|
||||
$ulen = ord(fread(1));
|
||||
yield $socket->read($ulen);
|
||||
$username = fread($socksInit, $ulen);
|
||||
yield $socket->read(3);
|
||||
if (fread($socksInit, 3) !== chr(5) . chr(1) . $authchr) {
|
||||
throw new \Exception('Wrong socks5 ack');
|
||||
}
|
||||
if ($auth) {
|
||||
yield $socket->read(1);
|
||||
$ulen = ord(fread(1));
|
||||
yield $socket->read($ulen);
|
||||
$username = fread($socksInit, $ulen);
|
||||
|
||||
yield $socket->read(1);
|
||||
$plen = ord(fread(1));
|
||||
yield $socket->read($plen);
|
||||
$password = fread($socksInit, $plen);
|
||||
}
|
||||
yield $socket->read(1);
|
||||
$plen = ord(fread(1));
|
||||
yield $socket->read($plen);
|
||||
$password = fread($socksInit, $plen);
|
||||
}
|
||||
yield $socket->read(1);
|
||||
$payload = fread($socksInit, 1);
|
||||
switch (ord($payload)) {
|
||||
$payload = fread($socksInit, 1);
|
||||
switch (ord($payload)) {
|
||||
case 0x03:
|
||||
yield $socket->read(1);
|
||||
$payload .= fread($socksInit, 1);
|
||||
@ -148,20 +149,24 @@ class Merger extends SharedMerger
|
||||
$payload .= fread($socksInit, $toRead);
|
||||
break;
|
||||
}
|
||||
yield $socket->read(2);
|
||||
$rport = unpack('n', fread($socksInit, 2))[1];
|
||||
yield $socket->read(2);
|
||||
$rport = unpack('n', fread($socksInit, 2))[1];
|
||||
|
||||
yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0));
|
||||
yield $socket->write(chr(5) . chr(0) . chr(0) . chr(1) . pack('Vn', 0, 0));
|
||||
|
||||
$this->_logger->write("================================ SENDING CONNECT ================================\n");
|
||||
yield $this->_writers[key($this->_writers)]->write(pack('VnCn', 0, $this->_port, Settings::ACTION_CONNECT, $rport) . $payload);
|
||||
$this->_logger->write("================================ SENDING CONNECT ================================\n");
|
||||
yield $this->_writers[0]->write(pack('VnCn', 0, $this->_port, Settings::ACTION_CONNECT, $rport) . $payload);
|
||||
|
||||
if (fstat($socksInit)['size'] - ftell($socksInit)) {
|
||||
yield $this->commonWrite($socksInit);
|
||||
}
|
||||
while (yield $socket->read()) {
|
||||
yield $this->commonWrite($socksInit);
|
||||
if (fstat($socksInit)['size'] - ftell($socksInit)) {
|
||||
yield $this->commonWrite($socksInit);
|
||||
}
|
||||
while (yield $socket->read()) {
|
||||
yield $this->commonWrite($socksInit);
|
||||
}
|
||||
} catch (\Exception $e) {
|
||||
$this->_logger->write($e);
|
||||
}
|
||||
|
||||
$this->close();
|
||||
};
|
||||
}
|
||||
|
@ -58,14 +58,18 @@ class MergerServer extends SharedMerger
|
||||
$server = listen($this->settings->getTunnelEndpoint());
|
||||
|
||||
while ($socket = yield $server->accept()) {
|
||||
$socket = new SequentialSocket($socket);
|
||||
yield $socket->read(2);
|
||||
$id = unpack('n', fread($socket->getBuffer(), 2))[1];
|
||||
$socket->setId($id);
|
||||
$this->writers[$id] = $socket;
|
||||
ksort($this->writers);
|
||||
asyncCall([$this, 'sharedLoop'], $id);
|
||||
yield $socket->write(pack('n', $id));
|
||||
try {
|
||||
$socket = new SequentialSocket($socket);
|
||||
yield $socket->read(2);
|
||||
$id = unpack('n', fread($socket->getBuffer(), 2))[1];
|
||||
$socket->setId($id);
|
||||
$this->writers[$id] = $socket;
|
||||
ksort($this->writers);
|
||||
asyncCall([$this, 'sharedLoop'], $id);
|
||||
yield $socket->write(pack('n', $id));
|
||||
} catch (\Exception $e) {
|
||||
$this->logger->write($e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -112,19 +112,20 @@ class MergerWorker
|
||||
|
||||
public function commonWrite($chunk)
|
||||
{
|
||||
$shared_deferred = new Deferred();
|
||||
$promise = $shared_deferred->promise();
|
||||
$length = fstat($chunk)['size'] - ftell($chunk);
|
||||
foreach ($this->_sharedStats->balance($length) as $writerId => $bytes) {
|
||||
if ($bytes <= 0) {
|
||||
$this->_logger->write("Skipping $bytes\n");
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
$shared_deferred = new Deferred();
|
||||
$promise = $shared_deferred->promise();
|
||||
$length = fstat($chunk)['size'] - ftell($chunk);
|
||||
foreach ($this->_sharedStats->balance($length) as $writerId => $bytes) {
|
||||
if ($bytes <= 0) {
|
||||
$this->_logger->write("Skipping $bytes\n");
|
||||
continue;
|
||||
}
|
||||
|
||||
$seqno = $this->_connectionOutSeqNo;
|
||||
$this->_connectionOutSeqNo = ($this->_connectionOutSeqNo + 1) % 0xFFFF;
|
||||
$seqno = $this->_connectionOutSeqNo;
|
||||
$this->_connectionOutSeqNo = ($this->_connectionOutSeqNo + 1) % 0xFFFF;
|
||||
|
||||
$this->_writers[$writerId]->writeSequential(pack('Vnn', $bytes, $this->_port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve(
|
||||
$this->_writers[$writerId]->writeSequential(pack('Vnn', $bytes, $this->_port, $seqno) . stream_get_contents($chunk, $bytes))->onResolve(
|
||||
function ($error = null, $result = null) use (&$shared_deferred) {
|
||||
if ($error) {
|
||||
throw $error;
|
||||
@ -135,10 +136,13 @@ class MergerWorker
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
fseek($chunk, 0);
|
||||
ftruncate($chunk, 0);
|
||||
return $promise;
|
||||
} catch (\Exception $e) {
|
||||
return $this->_logger->write($e);
|
||||
}
|
||||
fseek($chunk, 0);
|
||||
ftruncate($chunk, 0);
|
||||
return $promise;
|
||||
}
|
||||
public function close()
|
||||
{
|
||||
@ -149,7 +153,7 @@ class MergerWorker
|
||||
$this->_socket = null;
|
||||
$this->_logger->write("Closing {$this->_port}\n");
|
||||
$socket->close();
|
||||
$this->_writers[key($this->_writers)]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT));
|
||||
$this->_writers[0]->write(pack('VnC', 0, $this->_port, Settings::ACTION_DISCONNECT));
|
||||
}
|
||||
|
||||
public function handleSharedRead($writerId, $buffer, $length)
|
||||
|
@ -72,6 +72,15 @@ class Stats
|
||||
}
|
||||
|
||||
private $speeds = [];
|
||||
public function __construct()
|
||||
{
|
||||
Loop::repeat(1000, (function () {
|
||||
foreach ($this->speeds as $elem) {
|
||||
$elem->pop();
|
||||
$elem->unshift(0);
|
||||
}
|
||||
})->bindTo($this, get_class($this)));
|
||||
}
|
||||
public function allocate($ID)
|
||||
{
|
||||
$this->speeds[$ID] = new \Ds\Deque();
|
||||
@ -81,8 +90,8 @@ class Stats
|
||||
public function stopSending($ID, $started, $sent)
|
||||
{
|
||||
$time = microtime(true) - $started;
|
||||
$this->speeds[$ID]->unshift(($sent * 8) / $time);
|
||||
$this->speeds[$ID]->pop();
|
||||
$this->speeds[$ID]->unshift(($sent * 8) / $time);
|
||||
|
||||
}
|
||||
public function getSpeed($ID, $powerOf = 6)
|
||||
@ -115,8 +124,8 @@ class Stats
|
||||
foreach ($result as $key => &$elem) {
|
||||
$elem = (int) ($elem * $per_bytes);
|
||||
if (!$elem) {
|
||||
$this->speeds[$key]->unshift(1000000);
|
||||
$this->speeds[$key]->pop();
|
||||
$this->speeds[$key]->unshift(1000000);
|
||||
$elem += 2;
|
||||
}
|
||||
$sum += $elem;
|
||||
|
Loading…
Reference in New Issue
Block a user