diff --git a/examples/001_basic_run_func.php b/examples/001_basic_run.php similarity index 88% rename from examples/001_basic_run_func.php rename to examples/001_basic_run.php index 6e9dcf5..c88cbae 100644 --- a/examples/001_basic_run_func.php +++ b/examples/001_basic_run.php @@ -20,12 +20,12 @@ require __DIR__ . '/../vendor/autoload.php'; define('RUN_TIME', 10); printf("Each line you type will be echoed back for the next %d seconds ...\n\n", RUN_TIME); -Amp\run(function() { +Amp\run(function($reactor) { // Set the STDIN stream to "non-blocking" mode stream_set_blocking(STDIN, false); // Echo back the line each time there is readable data on STDIN - Amp\onReadable(STDIN, function() { + $reactor->onReadable(STDIN, function() { if ($line = fgets(STDIN)) { echo "INPUT> ", $line, "\n"; } @@ -33,11 +33,11 @@ Amp\run(function() { // Countdown RUN_TIME seconds then end the event loop $secondsRemaining = RUN_TIME; - Amp\repeat(function() use (&$secondsRemaining) { + $reactor->repeat(function() use (&$secondsRemaining) { if (--$secondsRemaining > 0) { echo "$secondsRemaining seconds to shutdown\n"; } else { - Amp\stop(); // <-- explicitly stop the loop + $reactor->stop(); // <-- explicitly stop the loop } }, $msInterval = 1000); }); diff --git a/examples/002_basic_run_obj.php b/examples/002_basic_run_obj.php deleted file mode 100644 index 7d80876..0000000 --- a/examples/002_basic_run_obj.php +++ /dev/null @@ -1,50 +0,0 @@ -onReadable(STDIN, function() { - if ($line = fgets(STDIN)) { - echo "INPUT> ", $line, "\n"; - } - }); - - // Countdown RUN_TIME seconds then end the event loop - $secondsRemaining = RUN_TIME; - $reactor->repeat(function() use (&$secondsRemaining, $reactor) { - if (--$secondsRemaining > 0) { - echo "$secondsRemaining seconds to shutdown\n"; - } else { - $reactor->stop(); - } - }, $msInterval = 1000); -}); diff --git a/examples/003_scheduling_func.php b/examples/002_scheduling.php similarity index 87% rename from examples/003_scheduling_func.php rename to examples/002_scheduling.php index 8ced293..83a8a7a 100644 --- a/examples/003_scheduling_func.php +++ b/examples/002_scheduling.php @@ -2,7 +2,7 @@ require __DIR__ . '/../vendor/autoload.php'; -Amp\run(function() { +Amp\run(function($reactor) { $ticker = function() { $now = time(); $vowel = ($now % 2) ? 'i' : 'o'; @@ -12,15 +12,15 @@ Amp\run(function() { // Execute the specified callback ASAP in the next event loop iteration. There is no // need to clear an "immediately" watcher after execution. The Reactor will automatically // garbage collect resources associated with one-time events after they finish executing. - Amp\immediately($ticker); + $reactor->immediately($ticker); // Execute every $msInterval milliseconds until the resulting $watcherId is canceled. // At some point in the future we need to cancel this watcher or our program will never end. $repeatingWatcherId = Amp\repeat($ticker, $msInterval = 1000); // Five seconds from now let's cancel the repeating ticker we just registered - Amp\once(function() use ($repeatingWatcherId) { - Amp\cancel($repeatingWatcherId); + $reactor->once(function() use ($repeatingWatcherId) { + $reactor->cancel($repeatingWatcherId); echo "Cancelled repeating ticker\n"; }, $msDelay = 5000); diff --git a/examples/005_tcp_server_obj.php b/examples/003_tcp_server.php similarity index 98% rename from examples/005_tcp_server_obj.php rename to examples/003_tcp_server.php index afede47..853e90b 100644 --- a/examples/005_tcp_server_obj.php +++ b/examples/003_tcp_server.php @@ -27,7 +27,7 @@ class Server { private $ioGranularity = 8192; public function __construct(Amp\Reactor $reactor = null) { - $this->reactor = $reactor ?: Amp\getReactor(); + $this->reactor = $reactor ?: Amp\reactor(); } public function start($address) { diff --git a/examples/006_libuv_signal_handling.php b/examples/004_libuv_signal_handling.php similarity index 100% rename from examples/006_libuv_signal_handling.php rename to examples/004_libuv_signal_handling.php diff --git a/examples/004_scheduling_obj.php b/examples/004_scheduling_obj.php deleted file mode 100644 index 90a0534..0000000 --- a/examples/004_scheduling_obj.php +++ /dev/null @@ -1,33 +0,0 @@ -immediately($ticker); - - // Execute every $msInterval milliseconds until the resulting $watcherId is cancelled. - // At some point in the future we need to cancel this watcher or our program will never end. - $repeatingWatcherId = $reactor->repeat($ticker, $msInterval = 1000); - - // Five seconds from now let's cancel the repeating ticker we just registered - $reactor->once(function() use ($repeatingWatcherId, $reactor) { - $reactor->cancel($repeatingWatcherId); - echo "Cancelled repeating ticker\n"; - }, $msDelay = 5000); - - // After about five seconds the program will exit on its own. Why? This happens because in - // that time frame we will have canceled the repeating watcher we registered using repeat() - // and the two one-off events (immediately() + once()) are automatically garbage collected - // by the Reactor after they execute. -}); diff --git a/examples/007_broadcast_server.php b/examples/007_broadcast_server.php deleted file mode 100644 index 15c6d46..0000000 --- a/examples/007_broadcast_server.php +++ /dev/null @@ -1,153 +0,0 @@ -reactor = $reactor ?: Amp\getReactor(); - } - - public function start($address) { - if (!$server = @stream_socket_server($address, $errno, $errstr)) { - throw new RuntimeException( - sprintf('Failed binding server on %s; [%d] %s', $address, $errno, $errstr) - ); - } - - printf("Server socket listening on %s\n", $address); - - stream_set_blocking($server, false); - - $this->reactor->onReadable($server, function() use ($server) { - $this->acceptClients($server); - }); - - // Release the hounds! - $this->reactor->run(); - } - - private function acceptClients($server) { - while ($socket = @stream_socket_accept($server, $timeout=0, $name)) { - $client = new Client; - $client->id = (int) $socket; - $client->socket = $socket; - - // What to do when the client socket is readable - $client->readWatcher = $this->reactor->onReadable($socket, function() use ($client) { - $this->readFromClient($client); - }); - - // What to do when the socket is writable - $client->writeWatcher = $this->reactor->onWritable($socket, function() use ($client) { - $this->writeToClient($client); - }); - - // Buffer something to send to the client. The writability watcher we just enabled - // above will take care of sending this data automatically. - $message = "--- Welcome to the example server! ---\n\n"; - - printf("Client socket accepted: %s\n", $name); - - // Store the client using its integer ID - if (0 === sizeof($this->clients)) { - $message .= "Hello! Looks like you are alone here.\nOpen another connection and start typing something…\n"; - } else { - $message .= "{$client->id} joined\n"; - } - - $this->clients[$client->id] = $client; - $this->broadcast($client, $message, true); - } - } - - private function broadcast(Client $sender, $data, $ignoreSender = false) { - foreach ($this->clients as $client) { - if ($ignoreSender || $client->id !== $sender->id) { - $client->outputBuffer = $data; - $this->reactor->enable($client->writeWatcher); - } - } - } - - private function readFromClient(Client $client) { - $data = @fread($client->socket, $this->ioGranularity); - - // This only happens on EOF. If the socket has died we need to unload it. - if ($data == '' && $this->isSocketDead($client->socket)) { - $this->unloadClient($client); - } else { - printf("Data received from client %d: %s\n", $client->id, $data); - $this->broadcast($client, "{$client->id} said: {$data}\n"); - } - } - - private function isSocketDead($socket) { - return (!is_resource($socket) || feof($socket)); - } - - private function writeToClient(Client $client) { - $bytesWritten = @fwrite($client->socket, $client->outputBuffer); - - if ($bytesWritten === strlen($client->outputBuffer)) { - // All data written. Disable the writability watcher. Sockets are essentially "always" - // writable, so it's important to disable write watchers when you don't have any data - // remaining to write. Otherwise you'll just hammer your CPU. - $client->outputBuffer = ''; - $this->reactor->disable($client->writeWatcher); - } elseif ($bytesWritten > 0) { - // Data was partially written -- truncate the buffer - $client->outputBuffer = substr($client->outputBuffer, $bytesWritten); - } elseif ($this->isSocketDead($client->socket)) { - // Otherwise the client is dead and we just unload it - $this->unloadClient($client); - } - } - - /** - * We have to clean up after ourselves or we'll create memory leaks. Always be sure to cancel - * any stream IO watchers or repeating timer events once they're no longer needed! - */ - private function unloadClient(Client $client) { - $this->reactor->cancel($client->readWatcher); - $this->reactor->cancel($client->writeWatcher); - if (is_resource($client->socket)) { - @fclose($client->socket); - } - unset($this->clients[$client->id]); - - printf("Client %d disconnected\n", $client->id); - $this->broadcast($client, "{$client->id} left\n"); - } -} - -(new Server)->start(SERVER_ADDRESS);