Skip to content

async

Terminal window
bun add @stopcock/async

Task<A, E> is a lazy, cancellable async computation. Nothing runs until you call .run().

Constructing a Task just describes the work. You can compose, retry, add timeouts, set concurrency limits, and chain multiple Tasks together before anything actually executes. Every Task accepts an AbortSignal, so cancellation propagates through the whole chain.

import { pipe } from '@stopcock/fp'
import { fromPromise, map, flatMap, retry, timeout, mapAsync, run } from '@stopcock/async'
const fetchUser = pipe(
fromPromise(() => fetch('/api/user')),
flatMap(res => fromPromise(() => res.json())),
map(data => data.name),
retry({ attempts: 3 }),
timeout(5000),
)
const name = await run(fetchUser)
type Task<A, E = never> = {
readonly _tag: 'Task'
readonly run: (signal?: AbortSignal) => Promise<A>
}
of<A, E>(run: (signal?: AbortSignal) => Promise<A>): Task<A, E>
resolve<A>(value: A): Task<A, never>
reject<E>(error: E): Task<never, E>
fromPromise<A>(f: () => Promise<A>): Task<A, unknown>
fromResult<A, E>(result: Result<A, E>): Task<A, E>
fromOption<A>(option: Option<A>, onNone: () => Error): Task<A, Error>
delay(ms: number): Task<void, never>
never: Task<never, never>

fromPromise wraps a thunk. The promise isn’t created until .run() is called.

// Raw Promise: manual retry loop, easy to get wrong
async function fetchWithRetry(url: string, attempts = 3) {
for (let i = 0; i < attempts; i++) {
try { return await fetch(url) }
catch (e) {
if (i === attempts - 1) throw e
await new Promise(r => setTimeout(r, 100 * Math.pow(2, i)))
}
}
}
// stopcock: declarative, composable, cancellable
const fetchWithRetry = pipe(
fromPromise(() => fetch(url)),
retry({ attempts: 3, backoff: 'exponential' }),
)
const fetchAllPages = (baseUrl: string) => {
const fetchPage = (page: number) => pipe(
fromPromise(() => fetch(`${baseUrl}?page=${page}`)),
flatMap(res => fromPromise(() => res.json())),
)
return pipe(
of(async () => {
const results = []
let page = 1
while (true) {
const data = await fetchPage(page).run()
results.push(...data.items)
if (!data.hasMore) break
page++
}
return results
}),
timeout(30000),
)
}
const checkPrimary = pipe(
fromPromise(() => fetch('https://api.primary.com/health')),
map(res => res.ok),
timeout(2000),
)
const checkFallback = pipe(
fromPromise(() => fetch('https://api.fallback.com/health')),
map(res => res.ok),
timeout(2000),
)
const isHealthy = await pipe(
fallback(checkPrimary, checkFallback),
catchError(() => false),
run,
)
const callApi = rateLimit(10, 1000, async (endpoint: string) => {
const res = await fetch(`https://api.example.com${endpoint}`)
return res.json()
})
// First 10 calls go through immediately, then queued at 10/sec
const users = await Promise.all(
ids.map(id => callApi(`/users/${id}`))
)

All dual-form: data-first and data-last for pipe.

map<A, B, E>(task: Task<A, E>, f: (a: A) => B): Task<B, E>
flatMap<A, B, E, E2>(task: Task<A, E>, f: (a: A) => Task<B, E2>): Task<B, E | E2>
tap<A, E>(task: Task<A, E>, f: (a: A) => void): Task<A, E>
mapError<A, E, E2>(task: Task<A, E>, f: (e: E) => E2): Task<A, E2>
catchError<A, E>(task: Task<A, E>, f: (e: E) => A): Task<A, never>
flatMapError<A, E, E2>(task: Task<A, E>, f: (e: E) => Task<A, E2>): Task<A, E2>
match<A, E, B>(task: Task<A, E>, cases: { ok: (a: A) => B, err: (e: E) => B }): Task<B, never>
run<A>(task: Task<A, any>): Promise<A>
runSafe<A, E>(task: Task<A, E>): Promise<Result<A, E>>
runWithCancel<A>(task: Task<A, any>): [Promise<A>, () => void]

runSafe catches errors into a Result instead of throwing. runWithCancel returns a cancel function that aborts the signal.

all<A, E>(tasks: readonly Task<A, E>[]): Task<A[], E>
allSettled<A, E>(tasks: readonly Task<A, E>[]): Task<Result<A, E>[], never>
race<A, E>(tasks: readonly Task<A, E>[]): Task<A, E>
any<A, E>(tasks: readonly Task<A, E>[]): Task<A, E[]>
parallel(concurrency: number): <A, E>(tasks: readonly Task<A, E>[]) => Task<A[], E>
sequential<A, E>(tasks: readonly Task<A, E>[]): Task<A[], E>

parallel(n) returns a function. Use it to run up to n tasks at a time:

const results = await pipe(
tasks,
parallel(4),
run,
)
retry(opts: RetryOptions): <A, E>(task: Task<A, E>) => Task<A, E>
timeout(ms: number): <A, E>(task: Task<A, E>) => Task<A, E | TimeoutError>
fallback<A, E>(task: Task<A, E>, alt: Task<A, E>): Task<A, E>
interface RetryOptions {
attempts: number
delay?: number // default 100ms
backoff?: 'constant' | 'linear' | 'exponential' // default exponential
jitter?: boolean // default true
maxDelay?: number // default 30000ms
retryIf?: (error: unknown, attempt: number) => boolean
}
throttle<T extends (...args: any[]) => any>(ms: number, fn: T): T
debounce<T extends (...args: any[]) => any>(ms: number, fn: T): T & { cancel: () => void }
rateLimit<T extends (...args: any[]) => any>(count: number, windowMs: number, fn: T): T

All return Task and support optional concurrency limits.

mapAsync<A, B>(arr: A[], fn: (a: A, signal?: AbortSignal) => Promise<B>, concurrency?: number): Task<B[], unknown>
filterAsync<A>(arr: A[], fn: (a: A, signal?: AbortSignal) => Promise<boolean>, concurrency?: number): Task<A[], unknown>
forEachAsync<A>(arr: A[], fn: (a: A, signal?: AbortSignal) => Promise<void>, concurrency?: number): Task<void, unknown>
reduceAsync<A, B>(arr: A[], fn: (acc: B, a: A, signal?: AbortSignal) => Promise<B>, init: B): Task<B, unknown>
collectAsync<A>(iter: AsyncIterable<A>): Task<A[], unknown>

All dual-form. Data-last works in pipe:

const results = await pipe(
urls,
mapAsync(url => fetch(url).then(r => r.json()), 4),
run,
)