← All libraries

CandyAsync

🍬 CandyAsync

Shared async vocabulary for ReactPHP

pioneering (no upstream) async cancellation reactphp foundation

Shared async vocabulary unifying ReactPHP usage across the SugarCraft monorepo. CancellationToken (owner/source pattern), Subscription interface, Subscriptions::compose() for atomic disposal, and AsyncOps static helpers for common async patterns.

Install

composer require sugarcraft/candy-async

Quickstart

use SugarCraft\Async\{AsyncOps, CancellationSource};

$source = CancellationSource::new();

// Attach a cancellation callback
$source->token()->onCancel(fn() => echo "Cancelled!\n");

$source->cancel(); // prints "Cancelled!"

// Timeout wrapper
$loop = \React\EventLoop\Loop::get();
$promise = AsyncOps::withTimeout($loop, $somePromise, 5.0);

// Retry with backoff
$promise = AsyncOps::retry(
    fn() => $httpClient->request('GET', 'https://example.com'),
    attempts: 3,
    baseBackoffSeconds: 0.5,
);

What's in the box

CancellationTokenRead-only view of a cancellation state. Consumers can observe isCancelled() and register onCancel() callbacks but cannot trigger cancellation.
CancellationSourceOwns the mutable cancellation flag. Create via CancellationSource::new(). Call cancel() to flip the flag and fire all registered callbacks.
Cancellable interfaceContract for components that can be cancelled. Implemented by CancellationSource. Callbacks fire exactly once, even on repeated cancel() calls.
Subscription interfaceDisposal handle returned by subscribe()-style APIs. unsubscribe() disposes the subscription; isActive() reports whether it is still live.
Subscriptions::compose()Combine multiple subscriptions into a single atomic handle. Calling unsubscribe() on the composite disposes all underlying subscriptions.
AsyncOps::withTimeout()Wrap a promise with a timeout. Rejects with TimeoutException if the timeout fires before the promise settles.
AsyncOps::retry()Retry a failed operation up to N times with exponential backoff. Respects CancellationToken — cancelling aborts remaining retries.
AsyncOps::debounce()Wrap a callable so only the last call within the window fires, after silence.
AsyncOps::throttle()Wrap a callable so it fires at most once per interval, ignoring excess calls.

Use it for

Source & demos

Try the quickstart →

API

ClassMethodDescription
CancellationSourcenew(): selfFactory: create a new source with its own token
CancellationSourcetoken(): CancellationTokenReturns the read-only token for this source
CancellationSourcecancel(): voidRequest cancellation. Idempotent.
CancellationSourceisCancelled(): boolReturns true if cancel() has been called
CancellationTokenisCancelled(): boolReturns true if the source has been cancelled
CancellationTokenonCancel(callable): voidRegister a callback; fires immediately if already cancelled
Subscriptionscompose(Subscription ...): selfCombine multiple subscriptions into one atomic handle
Subscriptionsunsubscribe(): voidDispose all underlying subscriptions
SubscriptionsisActive(): boolReturns false after unsubscribe()
AsyncOpswithTimeout(LoopInterface, PromiseInterface, float): PromiseInterfaceWrap a promise with a timeout; rejects with TimeoutException
AsyncOpsretry(callable, int, float, ?CancellationToken): PromiseInterfaceRetry with exponential backoff; respects cancellation
AsyncOpsdebounce(callable, float, ?LoopInterface): callableDebounce wrapper; only last call fires after window
AsyncOpsthrottle(callable, float, ?LoopInterface): callableThrottle wrapper; fires at most once per interval