1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Fix whitespace

This commit is contained in:
Niklas Keller 2016-12-11 16:17:51 +01:00
parent 2ba73e90fc
commit e4d97caad7
2 changed files with 34 additions and 34 deletions

View File

@ -17,7 +17,7 @@ use Interop\Async\Promise;
class Observer {
/** @var \Amp\Observable */
private $observable;
/** @var mixed[] */
private $values = [];
@ -44,7 +44,7 @@ class Observer {
*/
public function __construct(Observable $observable) {
$this->observable = $observable;
$deferred = &$this->deferred;
$values = &$this->values;
$deferreds = &$this->deferreds;
@ -94,12 +94,12 @@ class Observer {
*/
public function __destruct() {
$this->resolved = true;
foreach ($this->deferreds as $deferred) {
$deferred->resolve();
}
}
/**
* @return \Amp\Observable The observable being observed.
*/
@ -179,7 +179,7 @@ class Observer {
return $this->result;
}
/**
* Returns an array of values that were not consumed by the Observer before the Observable completed.
*
@ -191,17 +191,17 @@ class Observer {
if (!$this->resolved) {
throw new \Error("The observable has not resolved");
}
$values = $this->values;
$this->values = [];
$this->position = -1;
$deferreds = $this->deferreds;
$this->deferreds = [];
foreach ($deferreds as $deferred) {
$deferred->resolve();
}
return $values;
}
}

View File

@ -98,7 +98,7 @@ function onSignal(int $signo, callable $callback, $data = null): string {
/**
* Defer the execution of a callback.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
*
* @see \Interop\Async\Loop::defer()
*
* @param callable(string $watcherId, mixed $data) $callback The callback to delay.
@ -118,7 +118,7 @@ function defer(callable $callback, $data = null): string {
/**
* Delay the execution of a callback.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
*
* @see \Interop\Async\Loop::delay()
*
* @param int $time
@ -139,7 +139,7 @@ function delay(int $time, callable $callback, $data = null): string {
/**
* Repeatedly execute a callback.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
*
* @see \Interop\Async\Loop::repeat()
*
* @param int $time
@ -265,7 +265,7 @@ function coroutine(callable $worker): callable {
if ($result instanceof \Generator) {
return new Coroutine($result);
}
if (!$result instanceof Promise) {
return new Success($result);
}
@ -748,7 +748,7 @@ function map(callable $callback, array ...$promises): array {
function each(Observable $observable, callable $onNext, callable $onComplete = null): Observable {
$postponed = new Postponed;
$pending = true;
$observable->subscribe(function ($value) use (&$pending, $postponed, $onNext) {
if ($pending) {
try {
@ -760,30 +760,30 @@ function each(Observable $observable, callable $onNext, callable $onComplete = n
}
return null;
});
$observable->when(function ($exception, $value) use (&$pending, $postponed, $onComplete) {
if (!$pending) {
return;
}
$pending = false;
if ($exception) {
$postponed->fail($exception);
return;
}
if ($onComplete === null) {
$postponed->resolve($value);
return;
}
try {
$postponed->resolve($onComplete($value));
} catch (\Throwable $exception) {
$postponed->fail($exception);
}
});
return $postponed->observe();
}
@ -796,7 +796,7 @@ function each(Observable $observable, callable $onNext, callable $onComplete = n
function filter(Observable $observable, callable $filter): Observable {
$postponed = new Postponed;
$pending = true;
$observable->subscribe(function ($value) use (&$pending, $postponed, $filter) {
if ($pending) {
try {
@ -811,21 +811,21 @@ function filter(Observable $observable, callable $filter): Observable {
}
return null;
});
$observable->when(function ($exception, $value) use (&$pending, $postponed) {
if (!$pending) {
return;
}
$pending = false;
if ($exception) {
$postponed->fail($exception);
return;
}
$postponed->resolve($value);
});
return $postponed->observe();
}
@ -838,23 +838,23 @@ function filter(Observable $observable, callable $filter): Observable {
*/
function merge(array $observables): Observable {
$postponed = new Postponed;
foreach ($observables as $observable) {
if (!$observable instanceof Observable) {
throw new \Error("Non-observable provided");
}
$observable->subscribe([$postponed, 'emit']);
}
all($observables)->when(function ($exception, array $values) use ($postponed) {
if ($exception) {
$postponed->fail($exception);
return;
}
$postponed->resolve($values);
});
return $postponed->observe();
}
@ -875,7 +875,7 @@ function stream(array $promises): Observable {
throw new \Error("Non-promise provided");
}
}
return new Emitter(function (callable $emit) use ($promises) {
$emits = [];
foreach ($promises as $promise) {
@ -900,12 +900,12 @@ function concat(array $observables): Observable {
throw new \Error("Non-observable provided");
}
}
$postponed = new Postponed;
$subscriptions = [];
$previous = [];
$promise = all($previous);
foreach ($observables as $observable) {
$subscriptions[] = $observable->subscribe(coroutine(function ($value) use ($postponed, $promise) {
try {
@ -913,22 +913,22 @@ function concat(array $observables): Observable {
} catch (\Throwable $exception) {
// Ignore exception in this context.
}
return yield $postponed->emit($value);
}));
$previous[] = $observable;
$promise = all($previous);
}
$promise->when(function ($exception, array $values) use ($postponed) {
if ($exception) {
$postponed->fail($exception);
return;
}
$postponed->resolve($values);
});
return $postponed->observe();
}