async
bun add @stopcock/asyncTask<A, E> is a lazy, cancellable async computation. Nothing runs until you call .run().
Constructing a Task just describes the work. You can compose, retry, add timeouts, set concurrency limits, and chain multiple Tasks together before anything actually executes. Every Task accepts an AbortSignal, so cancellation propagates through the whole chain.
import { pipe } from '@stopcock/fp'import { fromPromise, map, flatMap, retry, timeout, mapAsync, run } from '@stopcock/async'
const fetchUser = pipe( fromPromise(() => fetch('/api/user')), flatMap(res => fromPromise(() => res.json())), map(data => data.name), retry({ attempts: 3 }), timeout(5000),)
const name = await run(fetchUser)Task type
Section titled “Task type”type Task<A, E = never> = { readonly _tag: 'Task' readonly run: (signal?: AbortSignal) => Promise<A>}Constructors
Section titled “Constructors”of<A, E>(run: (signal?: AbortSignal) => Promise<A>): Task<A, E>resolve<A>(value: A): Task<A, never>reject<E>(error: E): Task<never, E>fromPromise<A>(f: () => Promise<A>): Task<A, unknown>fromResult<A, E>(result: Result<A, E>): Task<A, E>fromOption<A>(option: Option<A>, onNone: () => Error): Task<A, Error>delay(ms: number): Task<void, never>never: Task<never, never>fromPromise wraps a thunk. The promise isn’t created until .run() is called.
vs. raw Promises
Section titled “vs. raw Promises”// Raw Promise: manual retry loop, easy to get wrongasync function fetchWithRetry(url: string, attempts = 3) { for (let i = 0; i < attempts; i++) { try { return await fetch(url) } catch (e) { if (i === attempts - 1) throw e await new Promise(r => setTimeout(r, 100 * Math.pow(2, i))) } }}
// stopcock: declarative, composable, cancellableconst fetchWithRetry = pipe( fromPromise(() => fetch(url)), retry({ attempts: 3, backoff: 'exponential' }),)// Raw: p-limit or manual semaphoreimport pLimit from 'p-limit'const limit = pLimit(4)const results = await Promise.all(urls.map(url => limit(() => fetch(url))))
// stopcock: built-in, returns a Task you can compose furtherconst results = await pipe( urls, mapAsync(url => fetch(url), 4), run,)// Raw: AbortController boilerplateconst controller = new AbortController()const promise = fetch(url, { signal: controller.signal })setTimeout(() => controller.abort(), 5000)
// stopcock: cancellation is built into the Task modelconst [promise, cancel] = runWithCancel( pipe(fromPromise(() => fetch(url)), timeout(5000)))// cancel() aborts immediately, signal propagates to fetchReal-world patterns
Section titled “Real-world patterns”Paginated API fetch
Section titled “Paginated API fetch”const fetchAllPages = (baseUrl: string) => { const fetchPage = (page: number) => pipe( fromPromise(() => fetch(`${baseUrl}?page=${page}`)), flatMap(res => fromPromise(() => res.json())), )
return pipe( of(async () => { const results = [] let page = 1 while (true) { const data = await fetchPage(page).run() results.push(...data.items) if (!data.hasMore) break page++ } return results }), timeout(30000), )}Health check with fallback
Section titled “Health check with fallback”const checkPrimary = pipe( fromPromise(() => fetch('https://api.primary.com/health')), map(res => res.ok), timeout(2000),)
const checkFallback = pipe( fromPromise(() => fetch('https://api.fallback.com/health')), map(res => res.ok), timeout(2000),)
const isHealthy = await pipe( fallback(checkPrimary, checkFallback), catchError(() => false), run,)Rate-limited API client
Section titled “Rate-limited API client”const callApi = rateLimit(10, 1000, async (endpoint: string) => { const res = await fetch(`https://api.example.com${endpoint}`) return res.json()})
// First 10 calls go through immediately, then queued at 10/secconst users = await Promise.all( ids.map(id => callApi(`/users/${id}`)))Combinators
Section titled “Combinators”All dual-form: data-first and data-last for pipe.
map<A, B, E>(task: Task<A, E>, f: (a: A) => B): Task<B, E>flatMap<A, B, E, E2>(task: Task<A, E>, f: (a: A) => Task<B, E2>): Task<B, E | E2>tap<A, E>(task: Task<A, E>, f: (a: A) => void): Task<A, E>mapError<A, E, E2>(task: Task<A, E>, f: (e: E) => E2): Task<A, E2>catchError<A, E>(task: Task<A, E>, f: (e: E) => A): Task<A, never>flatMapError<A, E, E2>(task: Task<A, E>, f: (e: E) => Task<A, E2>): Task<A, E2>match<A, E, B>(task: Task<A, E>, cases: { ok: (a: A) => B, err: (e: E) => B }): Task<B, never>Running
Section titled “Running”run<A>(task: Task<A, any>): Promise<A>runSafe<A, E>(task: Task<A, E>): Promise<Result<A, E>>runWithCancel<A>(task: Task<A, any>): [Promise<A>, () => void]runSafe catches errors into a Result instead of throwing. runWithCancel returns a cancel function that aborts the signal.
Concurrency
Section titled “Concurrency”all<A, E>(tasks: readonly Task<A, E>[]): Task<A[], E>allSettled<A, E>(tasks: readonly Task<A, E>[]): Task<Result<A, E>[], never>race<A, E>(tasks: readonly Task<A, E>[]): Task<A, E>any<A, E>(tasks: readonly Task<A, E>[]): Task<A, E[]>parallel(concurrency: number): <A, E>(tasks: readonly Task<A, E>[]) => Task<A[], E>sequential<A, E>(tasks: readonly Task<A, E>[]): Task<A[], E>parallel(n) returns a function. Use it to run up to n tasks at a time:
const results = await pipe( tasks, parallel(4), run,)Resilience
Section titled “Resilience”retry(opts: RetryOptions): <A, E>(task: Task<A, E>) => Task<A, E>timeout(ms: number): <A, E>(task: Task<A, E>) => Task<A, E | TimeoutError>fallback<A, E>(task: Task<A, E>, alt: Task<A, E>): Task<A, E>interface RetryOptions { attempts: number delay?: number // default 100ms backoff?: 'constant' | 'linear' | 'exponential' // default exponential jitter?: boolean // default true maxDelay?: number // default 30000ms retryIf?: (error: unknown, attempt: number) => boolean}Flow control
Section titled “Flow control”throttle<T extends (...args: any[]) => any>(ms: number, fn: T): Tdebounce<T extends (...args: any[]) => any>(ms: number, fn: T): T & { cancel: () => void }rateLimit<T extends (...args: any[]) => any>(count: number, windowMs: number, fn: T): TArray operations
Section titled “Array operations”All return Task and support optional concurrency limits.
mapAsync<A, B>(arr: A[], fn: (a: A, signal?: AbortSignal) => Promise<B>, concurrency?: number): Task<B[], unknown>filterAsync<A>(arr: A[], fn: (a: A, signal?: AbortSignal) => Promise<boolean>, concurrency?: number): Task<A[], unknown>forEachAsync<A>(arr: A[], fn: (a: A, signal?: AbortSignal) => Promise<void>, concurrency?: number): Task<void, unknown>reduceAsync<A, B>(arr: A[], fn: (acc: B, a: A, signal?: AbortSignal) => Promise<B>, init: B): Task<B, unknown>collectAsync<A>(iter: AsyncIterable<A>): Task<A[], unknown>All dual-form. Data-last works in pipe:
const results = await pipe( urls, mapAsync(url => fetch(url).then(r => r.json()), 4), run,)