Skip to content

stream

Terminal window
bun add @stopcock/stream

Streams are lazy. Nothing runs until you pull values out with a terminal operation like toArray or reduce. Because of that, infinite sequences just work as long as you take from them.

import { pipe } from '@stopcock/fp'
import { range, map, filter, take, toArray } from '@stopcock/stream'
const squares = pipe(
range(1, Infinity),
map(n => n * n),
filter(n => n % 2 === 0),
take(5),
toArray,
)
// [4, 16, 36, 64, 100]
// Array: 3 full passes, 3 intermediate arrays allocated
const result = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter(n => n % 2 === 0)
.map(n => n * n)
.slice(0, 3)
// Stream: single lazy pass, stops after 3 results
const result = pipe(
from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
filter(n => n % 2 === 0),
map(n => n * n),
take(3),
toArray,
)
const lines = pipe(
from(file.split('\n')),
drop(1), // skip header
map(line => line.split(',')),
filter(cols => cols[2] !== ''), // skip empty emails
map(([name, age, email]) => ({ name, age: +age, email })),
take(1000), // first 1000 valid rows
toArray,
)
const fibs = pipe(
iterate(([a, b]) => [b, a + b] as [number, number], [0, 1] as [number, number]),
map(([a]) => a),
)
pipe(fibs, take(10), toArray) // [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
pipe(fibs, find(n => n > 1000)) // 1597
const ids = range(1, 10001) // 10,000 IDs
pipe(
ids,
chunk(100), // groups of 100
forEach(batch => sendBatchRequest(toArray(batch))),
)
interface Stream<A> {
[Symbol.iterator](): Iterator<A>
}

Streams implement Iterable, so they work with for...of and spread.

from<A>(iterable: Iterable<A>): Stream<A>
range(start: number, end: number): Stream<number>
iterate<A>(f: (a: A) => A, seed: A): Stream<A>
repeat<A>(value: A): Stream<A>
empty<A = never>(): Stream<A>

iterate produces an infinite stream: iterate(n => n * 2, 1) yields 1, 2, 4, 8, ...

All dual-form.

map<A, B>(stream: Stream<A>, f: (a: A) => B): Stream<B>
filter<A>(stream: Stream<A>, pred: (a: A) => boolean): Stream<A>
flatMap<A, B>(stream: Stream<A>, f: (a: A) => Stream<B>): Stream<B>
take<A>(stream: Stream<A>, n: number): Stream<A>
drop<A>(stream: Stream<A>, n: number): Stream<A>
takeWhile<A>(stream: Stream<A>, pred: (a: A) => boolean): Stream<A>
dropWhile<A>(stream: Stream<A>, pred: (a: A) => boolean): Stream<A>
chunk<A>(stream: Stream<A>, n: number): Stream<A[]>
scan<A, B>(stream: Stream<A>, f: (acc: B, a: A) => B, init: B): Stream<B>
zip<A, B>(stream: Stream<A>, other: Stream<B>): Stream<[A, B]>
concat<A>(stream: Stream<A>, other: Stream<A>): Stream<A>
distinct<A>(stream: Stream<A>): Stream<A>
distinctN<A>(stream: Stream<A>, maxSize: number): Stream<A>
intersperse<A>(stream: Stream<A>, sep: A): Stream<A>
toArray<A>(stream: Stream<A>): A[]
collect<A>(stream: Stream<A>): A[] // alias for toArray
reduce<A, B>(stream: Stream<A>, f: (acc: B, a: A) => B, init: B): B
first<A>(stream: Stream<A>): A | undefined
last<A>(stream: Stream<A>): A | undefined
count<A>(stream: Stream<A>): number
every<A>(stream: Stream<A>, pred: (a: A) => boolean): boolean
some<A>(stream: Stream<A>, pred: (a: A) => boolean): boolean
find<A>(stream: Stream<A>, pred: (a: A) => boolean): A | undefined
forEach<A>(stream: Stream<A>, f: (a: A) => void): void

reduce, every, some, find, forEach are dual-form.