diff --git a/docs/src/config/sidebar.json b/docs/src/config/sidebar.json index 99c9a7d..b29c5b0 100644 --- a/docs/src/config/sidebar.json +++ b/docs/src/config/sidebar.json @@ -41,6 +41,10 @@ "label": "Creating Operators", "slug": "core-concepts/creating-operators" }, + { + "label": "Higher-Order Operators", + "slug": "core-concepts/higher-order-operators" + }, { "label": "The Problems Solves", "slug": "core-concepts/the-problem-solves" @@ -99,6 +103,10 @@ "label": "debounce", "slug": "operator-api/debounce" }, + { + "label": "exhaustMap", + "slug": "operator-api/exhaust-map" + }, { "label": "extend", "slug": "operator-api/extend" @@ -107,6 +115,10 @@ "label": "filter", "slug": "operator-api/filter" }, + { + "label": "flatMap", + "slug": "operator-api/flat-map" + }, { "label": "map", "slug": "operator-api/map" @@ -135,6 +147,10 @@ "label": "spy", "slug": "operator-api/spy" }, + { + "label": "switchMap", + "slug": "operator-api/switch-map" + }, { "label": "throttle", "slug": "operator-api/throttle" diff --git a/docs/src/content/core-concepts/higher-order-operators.mdx b/docs/src/content/core-concepts/higher-order-operators.mdx new file mode 100644 index 0000000..cb40088 --- /dev/null +++ b/docs/src/content/core-concepts/higher-order-operators.mdx @@ -0,0 +1,93 @@ +# Higher-Order Operators + +Higher-order operators transform streams where each signal creates a new "inner stream". They control how these inner streams are managed over time. + +## The Problem + +Consider a search input that fetches results on each keystroke: + +```typescript +keyboard(searchInput) + .pipe( + map((signal) => fetchResults(signal.value.key)) // Returns a Stream, not a value! + ) +``` + +This produces a `Stream>`—a stream of streams. Higher-order operators flatten this into a single stream while controlling concurrency. + +## Comparison + +| Operator | Behavior | New Signal Arrives While Active | +|----------|----------|--------------------------------| +| `flatMap` | Run all concurrently | Subscribe to new stream alongside existing | +| `switchMap` | Switch to latest | Cancel previous, subscribe to new | +| `exhaustMap` | Ignore until done | Ignore new signal completely | + +## Visual Comparison + +Given source signals `a`, `b`, `c` arriving in sequence: + +``` +Source: ──a──────b──────c──▶ + +flatMap: ────A1─A2─B1─A3─C1─B2─C2─▶ (all run together) + +switchMap: ────A1─────────C1─C2─▶ (only latest completes) + +exhaustMap: ────A1─A2─A3─────D1─▶ (ignores b, c while a runs) +``` + +## Decision Guide + +Choose based on what should happen when a new signal arrives while processing: + +``` +New signal arrives while inner stream is active + │ + ┌───────────────┼───────────────┐ + ▼ ▼ ▼ + Run both? Cancel old? Ignore new? + │ │ │ + ▼ ▼ ▼ + flatMap switchMap exhaustMap +``` + +### Use `flatMap` when: +- Order doesn't matter +- All operations should complete +- Example: Playing multiple sounds, logging events + +### Use `switchMap` when: +- Only the latest result matters +- Previous operations are obsolete +- Example: Search autocomplete, preview on hover + +### Use `exhaustMap` when: +- Current operation must complete first +- Duplicate requests should be prevented +- Example: Form submission, drag session + +## Example: Same Scenario, Different Operators + +Fetching user details on hover: + +```typescript +// flatMap: All hovers trigger fetches (may show stale data) +hover$.pipe(flatMap((s) => fetchUser(s.value.userId))) + +// switchMap: Only shows latest hover's user (recommended) +hover$.pipe(switchMap((s) => fetchUser(s.value.userId))) + +// exhaustMap: Ignores hovers until current fetch completes +hover$.pipe(exhaustMap((s) => fetchUser(s.value.userId))) +``` + +## Relation to Monad + +These operators implement the "bind" operation from monad theory, enabling composition of effectful computations. `flatMap` satisfies the three monad laws: + +1. **Left Identity**: `pure(a).flatMap(f) === f(a)` +2. **Right Identity**: `m.flatMap(pure) === m` +3. **Associativity**: `m.flatMap(f).flatMap(g) === m.flatMap(x => f(x).flatMap(g))` + +This mathematical foundation ensures predictable composition behavior. diff --git a/docs/src/content/operator-api/exhaust-map.mdx b/docs/src/content/operator-api/exhaust-map.mdx new file mode 100644 index 0000000..8c81ec1 --- /dev/null +++ b/docs/src/content/operator-api/exhaust-map.mdx @@ -0,0 +1,90 @@ +# `exhaustMap` + +Maps each signal to a Stream, ignoring new signals while an inner stream is active. + +## Signature + +```typescript +function exhaustMap( + project: (signal: T, index: number) => Stream +): Operator +``` + +## Example + +```typescript +import { domEvent } from "cereb"; +import { exhaustMap } from "cereb/operators"; + +domEvent(submitButton, "click") + .pipe( + exhaustMap(() => submitForm()) + ) + .on((result) => { + showConfirmation(result.value); + }); +``` + +## How It Works + +When a signal arrives and no inner stream is active, `exhaustMap` creates and subscribes to a new inner stream. While that inner stream is active, all incoming signals are ignored. Once the inner stream completes, the next signal will be processed. + +``` +Source: ──a────b────c────d──▶ + \ ✕ ✕ \ +Inner: ─A1─A2─▶ ─D1─▶ + +Output: ────A1─A2─────────D1─▶ +``` + +## Use Cases + +### Prevent Double Submission + +Ignore additional clicks while form is being submitted: + +```typescript +domEvent(submitButton, "click") + .pipe( + exhaustMap(() => submitFormToServer()) + ) + .on((response) => { + if (response.value.success) { + showSuccess(); + } + }); +``` + +### Drag Session Locking + +Process only one drag session at a time: + +```typescript +singlePointer(element) + .pipe( + session(), + exhaustMap((signal) => handleDragSession(signal)) + ) + .on((result) => { + applyTransform(result.value); + }); +``` + +### Rate-Limited Operations + +Process expensive operations one at a time: + +```typescript +keyboard(window) + .pipe( + filter((s) => s.value.key === "Enter"), + exhaustMap(() => performExpensiveCalculation()) + ) + .on((result) => { + displayResult(result.value); + }); +``` + +## Note + +Use `exhaustMap` when you want to complete the current operation before handling new signals. For canceling previous operations, use `switchMap`. For concurrent operations, use `flatMap`. diff --git a/docs/src/content/operator-api/flat-map.mdx b/docs/src/content/operator-api/flat-map.mdx new file mode 100644 index 0000000..1caea66 --- /dev/null +++ b/docs/src/content/operator-api/flat-map.mdx @@ -0,0 +1,74 @@ +# `flatMap` + +Maps each signal to a Stream and flattens all inner streams concurrently. + +## Signature + +```typescript +function flatMap( + project: (signal: T, index: number) => Stream +): Operator +``` + +Also exported as `mergeMap`. + +## Example + +```typescript +import { singlePointer } from "cereb"; +import { flatMap, session } from "cereb/operators"; + +singlePointer(element) + .pipe( + session(), + flatMap((signal) => fetchRelatedData(signal.value.x, signal.value.y)) + ) + .on((result) => { + console.log(result.value); + }); +``` + +## How It Works + +When a signal arrives, `flatMap` creates a new inner stream and subscribes to it immediately—without canceling any existing inner streams. All inner streams run concurrently, and their signals are merged into the output stream. + +``` +Source: ──a──────b──────c──▶ + \ \ \ +Inner: ─A1─A2─▶ ─B1─▶ ─C1─C2─▶ + +Output: ────A1─A2──B1───C1─C2─▶ +``` + +## Use Cases + +### Parallel Data Fetching + +Fetch data for each pointer session without waiting for previous requests: + +```typescript +singlePointer(element) + .pipe( + session(), + flatMap((signal) => fetchLocationData(signal.value.x, signal.value.y)) + ) + .on((data) => { + renderMarker(data.value); + }); +``` + +### Concurrent Sound Effects + +Play multiple sounds simultaneously: + +```typescript +keyboard(window) + .pipe( + flatMap((signal) => playSound(signal.value.key)) + ) + .on(() => {}); +``` + +## Note + +If you need to cancel previous operations when new signals arrive, use `switchMap` instead. If you need to ignore new signals until the current operation completes, use `exhaustMap`. diff --git a/docs/src/content/operator-api/switch-map.mdx b/docs/src/content/operator-api/switch-map.mdx new file mode 100644 index 0000000..af7e990 --- /dev/null +++ b/docs/src/content/operator-api/switch-map.mdx @@ -0,0 +1,86 @@ +# `switchMap` + +Maps each signal to a Stream and switches to the new inner stream, canceling the previous one. + +## Signature + +```typescript +function switchMap( + project: (signal: T, index: number) => Stream +): Operator +``` + +## Example + +```typescript +import { singlePointer } from "cereb"; +import { switchMap } from "cereb/operators"; + +singlePointer(element) + .pipe( + switchMap((signal) => fetchSuggestions(signal.value.x)) + ) + .on((result) => { + showSuggestions(result.value); + }); +``` + +## How It Works + +When a signal arrives, `switchMap` unsubscribes from any active inner stream before subscribing to the new one. Only the most recent inner stream is active at any time, automatically preventing race conditions. + +``` +Source: ──a──────b──────c──▶ + \ ✕ \ +Inner: ─A1─A2─╳ ─B1─╳ ─C1─C2─▶ + +Output: ────A1─────────────C1─C2─▶ +``` + +## Use Cases + +### Search Autocomplete + +Only show results for the latest search query: + +```typescript +keyboard(searchInput) + .pipe( + debounce(300), + switchMap((signal) => searchAPI(signal.value.key)) + ) + .on((results) => { + renderDropdown(results.value); + }); +``` + +### Latest Selection Preview + +When user hovers over items, show preview for the current item only: + +```typescript +singlePointer(listElement) + .pipe( + switchMap((signal) => fetchPreview(getItemAt(signal.value.x, signal.value.y))) + ) + .on((preview) => { + showPreview(preview.value); + }); +``` + +### Cancel Previous Animations + +Start new animation, canceling any in-progress one: + +```typescript +keyboard(window) + .pipe( + filter((s) => s.value.key === "ArrowRight"), + switchMap(() => animateSlide("next")) + ) + .on(() => {}); +``` + +## Note + +Use `switchMap` when you only care about the latest operation. For concurrent operations, use `flatMap`. To ignore new signals until current completes, use `exhaustMap`. diff --git a/docs/src/pages/core-concepts/higher-order-operators.astro b/docs/src/pages/core-concepts/higher-order-operators.astro new file mode 100644 index 0000000..81dc42d --- /dev/null +++ b/docs/src/pages/core-concepts/higher-order-operators.astro @@ -0,0 +1,17 @@ +--- +import DocsLayout from "@/layouts/docs-layout.astro"; +import Content from "@/content/core-concepts/higher-order-operators.mdx"; + +const headings = [ + { depth: 2, slug: "the-problem", text: "The Problem" }, + { depth: 2, slug: "comparison", text: "Comparison" }, + { depth: 2, slug: "visual-comparison", text: "Visual Comparison" }, + { depth: 2, slug: "decision-guide", text: "Decision Guide" }, + { depth: 2, slug: "example-same-scenario-different-operators", text: "Example: Same Scenario" }, + { depth: 2, slug: "relation-to-monad", text: "Relation to Monad" }, +]; +--- + + + + diff --git a/docs/src/pages/operator-api/exhaust-map.astro b/docs/src/pages/operator-api/exhaust-map.astro new file mode 100644 index 0000000..aa74b01 --- /dev/null +++ b/docs/src/pages/operator-api/exhaust-map.astro @@ -0,0 +1,16 @@ +--- +import DocsLayout from "@/layouts/docs-layout.astro"; +import Content from "@/content/operator-api/exhaust-map.mdx"; + +const headings = [ + { depth: 2, slug: "signature", text: "Signature" }, + { depth: 2, slug: "example", text: "Example" }, + { depth: 2, slug: "how-it-works", text: "How It Works" }, + { depth: 2, slug: "use-cases", text: "Use Cases" }, + { depth: 2, slug: "note", text: "Note" }, +]; +--- + + + + diff --git a/docs/src/pages/operator-api/flat-map.astro b/docs/src/pages/operator-api/flat-map.astro new file mode 100644 index 0000000..f9ef9fc --- /dev/null +++ b/docs/src/pages/operator-api/flat-map.astro @@ -0,0 +1,16 @@ +--- +import DocsLayout from "@/layouts/docs-layout.astro"; +import Content from "@/content/operator-api/flat-map.mdx"; + +const headings = [ + { depth: 2, slug: "signature", text: "Signature" }, + { depth: 2, slug: "example", text: "Example" }, + { depth: 2, slug: "how-it-works", text: "How It Works" }, + { depth: 2, slug: "use-cases", text: "Use Cases" }, + { depth: 2, slug: "note", text: "Note" }, +]; +--- + + + + diff --git a/docs/src/pages/operator-api/switch-map.astro b/docs/src/pages/operator-api/switch-map.astro new file mode 100644 index 0000000..d6ab937 --- /dev/null +++ b/docs/src/pages/operator-api/switch-map.astro @@ -0,0 +1,16 @@ +--- +import DocsLayout from "@/layouts/docs-layout.astro"; +import Content from "@/content/operator-api/switch-map.mdx"; + +const headings = [ + { depth: 2, slug: "signature", text: "Signature" }, + { depth: 2, slug: "example", text: "Example" }, + { depth: 2, slug: "how-it-works", text: "How It Works" }, + { depth: 2, slug: "use-cases", text: "Use Cases" }, + { depth: 2, slug: "note", text: "Note" }, +]; +--- + + + + diff --git a/packages/cereb/package.json b/packages/cereb/package.json index b1613cd..546b3a6 100644 --- a/packages/cereb/package.json +++ b/packages/cereb/package.json @@ -1,7 +1,7 @@ { "name": "cereb", "description": "User input modeling and orchestration with a lightweight reactive stream library.", - "version": "0.11.6", + "version": "0.12.0", "license": "MIT", "author": "devphilip21 ", "repository": { diff --git a/packages/cereb/src/operators/exhaust-map.spec.ts b/packages/cereb/src/operators/exhaust-map.spec.ts new file mode 100644 index 0000000..f3c3f62 --- /dev/null +++ b/packages/cereb/src/operators/exhaust-map.spec.ts @@ -0,0 +1,186 @@ +import { describe, expect, it, vi } from "vitest"; +import { createStream } from "../core/stream.js"; +import { createTestSignal, fromArray, ofValue, type TestSignal } from "../internal/test-utils.js"; +import { exhaustMap } from "./exhaust-map.js"; + +describe("exhaustMap", () => { + it("should map and flatten when no active stream", () => { + const values: number[] = []; + + // With sync streams, each completes before the next starts + fromArray([1, 2, 3]) + .pipe(exhaustMap((s: TestSignal) => ofValue(s.value * 10))) + .on((v) => values.push(v.value)); + + expect(values).toEqual([10, 20, 30]); + }); + + it("should ignore signals while inner stream is active", async () => { + const values: number[] = []; + const projectCalls: number[] = []; + + // Create a delayed stream that doesn't complete immediately + const delayedStream = (value: number) => + createStream>((observer) => { + projectCalls.push(value); + const timeoutId = setTimeout(() => { + observer.next(createTestSignal(value * 10)); + observer.complete?.(); + }, 50); + return () => clearTimeout(timeoutId); + }); + + // Emit signals rapidly + const source = createStream>((observer) => { + observer.next(createTestSignal(1)); + setTimeout(() => observer.next(createTestSignal(2)), 10); + setTimeout(() => observer.next(createTestSignal(3)), 20); + setTimeout(() => observer.complete?.(), 100); + return () => {}; + }); + + source + .pipe(exhaustMap((s: TestSignal) => delayedStream(s.value))) + .on((v) => values.push(v.value)); + + // Wait for all async operations + await new Promise((resolve) => setTimeout(resolve, 150)); + + // Only first signal should be processed (2 and 3 ignored while 1 is active) + expect(projectCalls).toEqual([1]); + expect(values).toEqual([10]); + }); + + it("should pass index to project function", () => { + const indices: number[] = []; + + fromArray([1, 2, 3]) + .pipe( + exhaustMap((s: TestSignal, index) => { + indices.push(index); + return ofValue(s.value); + }), + ) + .on(() => {}); + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should catch errors in project function", () => { + const error = new Error("project error"); + const errorFn = vi.fn(); + + ofValue(1) + .pipe( + exhaustMap(() => { + throw error; + }), + ) + .on({ next: vi.fn(), error: errorFn }); + + expect(errorFn).toHaveBeenCalledWith(error); + }); + + it("should propagate errors from inner stream", () => { + const error = new Error("inner error"); + const errorFn = vi.fn(); + + ofValue(1) + .pipe( + exhaustMap(() => + createStream((observer) => { + observer.error?.(error); + return () => {}; + }), + ), + ) + .on({ next: vi.fn(), error: errorFn }); + + expect(errorFn).toHaveBeenCalledWith(error); + }); + + it("should reset active state on error in project", () => { + const values: number[] = []; + let callCount = 0; + + fromArray([1, 2]) + .pipe( + exhaustMap((s: TestSignal) => { + callCount++; + if (s.value === 1) { + throw new Error("first error"); + } + return ofValue(s.value * 10); + }), + ) + .on({ next: (v) => values.push(v.value), error: () => {} }); + + // After error, should be able to process next signal + expect(callCount).toBe(2); + expect(values).toEqual([20]); + }); + + it("should cleanup inner subscription on unsubscribe", () => { + let cleaned = false; + + const unsub = ofValue(1) + .pipe( + exhaustMap(() => + createStream((observer) => { + observer.next(createTestSignal(1)); + return () => { + cleaned = true; + }; + }), + ), + ) + .on(() => {}); + + unsub(); + + expect(cleaned).toBe(true); + }); + + it("should call complete when source completes and no active stream", () => { + const completeFn = vi.fn(); + + fromArray([1, 2]) + .pipe(exhaustMap((s: TestSignal) => ofValue(s.value))) + .on({ next: () => {}, complete: completeFn }); + + expect(completeFn).toHaveBeenCalledTimes(1); + }); + + it("should process new signals after inner stream completes", async () => { + const values: number[] = []; + const projectCalls: number[] = []; + + const delayedStream = (value: number, delay: number) => + createStream>((observer) => { + projectCalls.push(value); + const timeoutId = setTimeout(() => { + observer.next(createTestSignal(value * 10)); + observer.complete?.(); + }, delay); + return () => clearTimeout(timeoutId); + }); + + // Emit signals with enough time between them + const source = createStream>((observer) => { + observer.next(createTestSignal(1)); + setTimeout(() => observer.next(createTestSignal(2)), 60); // After first completes + setTimeout(() => observer.complete?.(), 150); + return () => {}; + }); + + source + .pipe(exhaustMap((s: TestSignal) => delayedStream(s.value, 30))) + .on((v) => values.push(v.value)); + + await new Promise((resolve) => setTimeout(resolve, 200)); + + // Both signals should be processed (with gap between them) + expect(projectCalls).toEqual([1, 2]); + expect(values).toEqual([10, 20]); + }); +}); diff --git a/packages/cereb/src/operators/exhaust-map.ts b/packages/cereb/src/operators/exhaust-map.ts new file mode 100644 index 0000000..e439c48 --- /dev/null +++ b/packages/cereb/src/operators/exhaust-map.ts @@ -0,0 +1,81 @@ +import type { Signal } from "../core/signal.js"; +import type { Operator, Stream, Unsubscribe } from "../core/stream.js"; +import { createStream } from "../core/stream.js"; + +/** + * Maps each signal to a Stream, but ignores new signals while an inner + * stream is still active. Only processes the next signal after the current + * inner stream completes. + * + * Use this to prevent overlapping operations (e.g., form submissions, + * preventing double-clicks). + * + * @param project - Function that maps a signal to a Stream + * @returns An operator that produces an exhausted stream + * + * @example + * // Prevent duplicate form submissions + * submitButton$.pipe( + * exhaustMap(signal => submitForm(signal.value.data)) + * ) + */ +export function exhaustMap( + project: (signal: T, index: number) => Stream, +): Operator { + return (source: Stream): Stream => + createStream((observer) => { + let currentInnerUnsub: Unsubscribe | null = null; + let isActive = false; + let index = 0; + let sourceCompleted = false; + + const checkComplete = () => { + if (sourceCompleted && !isActive) { + observer.complete?.(); + } + }; + + const sourceUnsub = source.on({ + next(signal) { + // Ignore new signals while an inner stream is active + if (isActive) return; + + try { + isActive = true; + const innerStream = project(signal, index++); + + currentInnerUnsub = innerStream.on({ + next(innerSignal) { + observer.next(innerSignal); + }, + error(err) { + observer.error?.(err); + }, + complete() { + isActive = false; + currentInnerUnsub = null; + checkComplete(); + }, + }); + } catch (err) { + isActive = false; + observer.error?.(err); + } + }, + error(err) { + observer.error?.(err); + }, + complete() { + sourceCompleted = true; + checkComplete(); + }, + }); + + return () => { + sourceUnsub(); + if (currentInnerUnsub) { + currentInnerUnsub(); + } + }; + }); +} diff --git a/packages/cereb/src/operators/flat-map.spec.ts b/packages/cereb/src/operators/flat-map.spec.ts new file mode 100644 index 0000000..c186a9f --- /dev/null +++ b/packages/cereb/src/operators/flat-map.spec.ts @@ -0,0 +1,163 @@ +import { describe, expect, it, vi } from "vitest"; +import type { Stream } from "../core/stream.js"; +import { createStream } from "../core/stream.js"; +import { createTestSignal, fromArray, ofValue, type TestSignal } from "../internal/test-utils.js"; +import { flatMap } from "./flat-map.js"; + +describe("flatMap", () => { + it("should flatten inner streams", () => { + const values: number[] = []; + + fromArray([1, 2]) + .pipe(flatMap((s: TestSignal) => fromArray([s.value * 10, s.value * 10 + 1]))) + .on((v) => values.push(v.value)); + + expect(values).toEqual([10, 11, 20, 21]); + }); + + it("should handle single value streams", () => { + const values: number[] = []; + + ofValue(5) + .pipe(flatMap((s: TestSignal) => ofValue(s.value * 2))) + .on((v) => values.push(v.value)); + + expect(values).toEqual([10]); + }); + + it("should pass index to project function", () => { + const indices: number[] = []; + + fromArray([1, 2, 3]) + .pipe( + flatMap((s: TestSignal, index) => { + indices.push(index); + return ofValue(s.value); + }), + ) + .on(() => {}); + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should catch errors in project function", () => { + const error = new Error("project error"); + const errorFn = vi.fn(); + + ofValue(1) + .pipe( + flatMap(() => { + throw error; + }), + ) + .on({ next: vi.fn(), error: errorFn }); + + expect(errorFn).toHaveBeenCalledWith(error); + }); + + it("should propagate errors from inner stream", () => { + const error = new Error("inner error"); + const errorFn = vi.fn(); + + ofValue(1) + .pipe( + flatMap(() => + createStream((observer) => { + observer.error?.(error); + return () => {}; + }), + ), + ) + .on({ next: vi.fn(), error: errorFn }); + + expect(errorFn).toHaveBeenCalledWith(error); + }); + + it("should cleanup all inner subscriptions on unsubscribe", () => { + const cleanupCalls: number[] = []; + + const unsub = fromArray([1, 2, 3]) + .pipe( + flatMap((s: TestSignal) => + createStream((observer) => { + observer.next(createTestSignal(s.value)); + return () => { + cleanupCalls.push(s.value); + }; + }), + ), + ) + .on(() => {}); + + unsub(); + + expect(cleanupCalls.sort()).toEqual([1, 2, 3]); + }); + + it("should call complete when source and all inner streams complete", () => { + const completeFn = vi.fn(); + + fromArray([1, 2]) + .pipe(flatMap((s: TestSignal) => ofValue(s.value))) + .on({ next: () => {}, complete: completeFn }); + + expect(completeFn).toHaveBeenCalledTimes(1); + }); +}); + +describe("flatMap - Monad Laws", () => { + // Helper: wrap value in stream (return/pure) + const pure = (value: V): Stream> => ofValue(value); + + // 1. Left Identity: pure(a).flatMap(f) === f(a) + it("satisfies left identity", () => { + const a = 42; + const f = (s: TestSignal) => ofValue(s.value * 2); + + const left: number[] = []; + const right: number[] = []; + + pure(a) + .pipe(flatMap(f)) + .on((s) => left.push(s.value)); + f(createTestSignal(a)).on((s) => right.push(s.value)); + + expect(left).toEqual(right); + }); + + // 2. Right Identity: m.flatMap(pure) === m + it("satisfies right identity", () => { + const values = [1, 2, 3]; + + const left: number[] = []; + const right: number[] = []; + + fromArray(values) + .pipe(flatMap((s: TestSignal) => pure(s.value))) + .on((s) => left.push(s.value)); + + fromArray(values).on((s) => right.push(s.value)); + + expect(left).toEqual(right); + }); + + // 3. Associativity: m.flatMap(f).flatMap(g) === m.flatMap(x => f(x).flatMap(g)) + it("satisfies associativity", () => { + const values = [1, 2]; + const f = (s: TestSignal) => fromArray([s.value, s.value + 1]); + const g = (s: TestSignal) => pure(s.value * 10); + + const left: number[] = []; + const right: number[] = []; + + fromArray(values) + .pipe(flatMap(f), flatMap(g)) + .on((s) => left.push(s.value)); + + fromArray(values) + .pipe(flatMap((x: TestSignal) => f(x).pipe(flatMap(g)))) + .on((s) => right.push(s.value)); + + expect(left.sort()).toEqual(right.sort()); + }); +}); diff --git a/packages/cereb/src/operators/flat-map.ts b/packages/cereb/src/operators/flat-map.ts new file mode 100644 index 0000000..395e00f --- /dev/null +++ b/packages/cereb/src/operators/flat-map.ts @@ -0,0 +1,82 @@ +import type { Signal } from "../core/signal.js"; +import type { Operator, Stream, Unsubscribe } from "../core/stream.js"; +import { createStream } from "../core/stream.js"; + +/** + * Maps each signal to a Stream and flattens all inner streams concurrently. + * All inner streams remain active simultaneously (mergeMap behavior). + * + * Use this when you want to handle all inner streams without cancellation. + * + * @param project - Function that maps a signal to a Stream + * @returns An operator that produces a flattened stream + * + * @example + * // Handle multiple concurrent gesture streams + * gestureType$.pipe( + * flatMap(signal => + * signal.value.type === 'pan' ? pan(el) : pinch(el) + * ) + * ) + */ +export function flatMap( + project: (signal: T, index: number) => Stream, +): Operator { + return (source: Stream): Stream => + createStream((observer) => { + const activeSubscriptions = new Set(); + let index = 0; + let sourceCompleted = false; + + const checkComplete = () => { + if (sourceCompleted && activeSubscriptions.size === 0) { + observer.complete?.(); + } + }; + + const sourceUnsub = source.on({ + next(signal) { + try { + const innerStream = project(signal, index++); + + const innerUnsub = innerStream.on({ + next(innerSignal) { + observer.next(innerSignal); + }, + error(err) { + observer.error?.(err); + }, + complete() { + activeSubscriptions.delete(innerUnsub); + checkComplete(); + }, + }); + + activeSubscriptions.add(innerUnsub); + } catch (err) { + observer.error?.(err); + } + }, + error(err) { + observer.error?.(err); + }, + complete() { + sourceCompleted = true; + checkComplete(); + }, + }); + + return () => { + sourceUnsub(); + for (const unsub of activeSubscriptions) { + unsub(); + } + activeSubscriptions.clear(); + }; + }); +} + +/** + * Alias for flatMap (RxJS naming convention) + */ +export const mergeMap = flatMap; diff --git a/packages/cereb/src/operators/index.ts b/packages/cereb/src/operators/index.ts index 8c47eee..f27442e 100644 --- a/packages/cereb/src/operators/index.ts +++ b/packages/cereb/src/operators/index.ts @@ -1,7 +1,9 @@ export { compose } from "./compose.js"; export { debounce } from "./debounce.js"; +export { exhaustMap } from "./exhaust-map.js"; export { extend } from "./extend.js"; export { filter } from "./filter.js"; +export { flatMap, mergeMap } from "./flat-map.js"; export { map } from "./map.js"; export { merge, mergeWith } from "./merge.js"; export { offset } from "./offset.js"; @@ -9,6 +11,7 @@ export { reduce } from "./reduce.js"; export { multiPointerSession, session, singlePointerSession } from "./session.js"; export { share, shareReplay } from "./share.js"; export { spy, tap } from "./spy.js"; +export { switchMap } from "./switch-map.js"; export { throttle, throttleLast } from "./throttle.js"; export { when } from "./when.js"; export type { ZoomInput, ZoomOptions, ZoomValue } from "./zoom.js"; diff --git a/packages/cereb/src/operators/switch-map.spec.ts b/packages/cereb/src/operators/switch-map.spec.ts new file mode 100644 index 0000000..3ccaa99 --- /dev/null +++ b/packages/cereb/src/operators/switch-map.spec.ts @@ -0,0 +1,146 @@ +import { describe, expect, it, vi } from "vitest"; +import { createStream } from "../core/stream.js"; +import { createTestSignal, fromArray, ofValue, type TestSignal } from "../internal/test-utils.js"; +import { switchMap } from "./switch-map.js"; + +describe("switchMap", () => { + it("should switch to new inner stream", () => { + const values: number[] = []; + + fromArray([1, 2]) + .pipe(switchMap((s: TestSignal) => ofValue(s.value * 10))) + .on((v) => values.push(v.value)); + + // Only last stream's value for sync case + expect(values).toEqual([10, 20]); + }); + + it("should unsubscribe from previous inner stream", () => { + const cleanupCalls: number[] = []; + let emitCount = 0; + + fromArray([1, 2, 3]) + .pipe( + switchMap((s: TestSignal) => + createStream((observer) => { + emitCount++; + observer.next(createTestSignal(s.value)); + return () => { + cleanupCalls.push(s.value); + }; + }), + ), + ) + .on(() => {}); + + // Each new source signal should cleanup the previous inner stream + // For sync source, cleanup happens for all but the last + expect(cleanupCalls).toEqual([1, 2]); + expect(emitCount).toBe(3); + }); + + it("should pass index to project function", () => { + const indices: number[] = []; + + fromArray([1, 2, 3]) + .pipe( + switchMap((s: TestSignal, index) => { + indices.push(index); + return ofValue(s.value); + }), + ) + .on(() => {}); + + expect(indices).toEqual([0, 1, 2]); + }); + + it("should catch errors in project function", () => { + const error = new Error("project error"); + const errorFn = vi.fn(); + + ofValue(1) + .pipe( + switchMap(() => { + throw error; + }), + ) + .on({ next: vi.fn(), error: errorFn }); + + expect(errorFn).toHaveBeenCalledWith(error); + }); + + it("should propagate errors from inner stream", () => { + const error = new Error("inner error"); + const errorFn = vi.fn(); + + ofValue(1) + .pipe( + switchMap(() => + createStream((observer) => { + observer.error?.(error); + return () => {}; + }), + ), + ) + .on({ next: vi.fn(), error: errorFn }); + + expect(errorFn).toHaveBeenCalledWith(error); + }); + + it("should cleanup inner subscription on unsubscribe", () => { + let cleaned = false; + + const unsub = ofValue(1) + .pipe( + switchMap(() => + createStream((observer) => { + observer.next(createTestSignal(1)); + return () => { + cleaned = true; + }; + }), + ), + ) + .on(() => {}); + + unsub(); + + expect(cleaned).toBe(true); + }); + + it("should call complete when source and current inner stream complete", () => { + const completeFn = vi.fn(); + + fromArray([1, 2]) + .pipe(switchMap((s: TestSignal) => ofValue(s.value))) + .on({ next: () => {}, complete: completeFn }); + + expect(completeFn).toHaveBeenCalledTimes(1); + }); + + it("should handle async inner streams correctly", async () => { + const values: number[] = []; + const completeFn = vi.fn(); + + // Create a delayed stream + const delayedStream = (value: number, delay: number) => + createStream>((observer) => { + const timeoutId = setTimeout(() => { + observer.next(createTestSignal(value)); + observer.complete?.(); + }, delay); + return () => clearTimeout(timeoutId); + }); + + fromArray([1, 2, 3]) + .pipe(switchMap((s: TestSignal) => delayedStream(s.value * 10, 10))) + .on({ next: (v) => values.push(v.value), complete: completeFn }); + + // Wait for async operations + await new Promise((resolve) => setTimeout(resolve, 50)); + + // Only the last inner stream should emit (30) + expect(values).toEqual([30]); + expect(completeFn).toHaveBeenCalledTimes(1); + }); +}); diff --git a/packages/cereb/src/operators/switch-map.ts b/packages/cereb/src/operators/switch-map.ts new file mode 100644 index 0000000..ff94145 --- /dev/null +++ b/packages/cereb/src/operators/switch-map.ts @@ -0,0 +1,84 @@ +import type { Signal } from "../core/signal.js"; +import type { Operator, Stream, Unsubscribe } from "../core/stream.js"; +import { createStream } from "../core/stream.js"; + +/** + * Maps each signal to a Stream and switches to the new inner stream, + * unsubscribing from the previous one. + * Only the most recent inner stream is active at any time. + * + * Use this when you only care about the latest value and want to cancel + * previous operations (e.g., search autocomplete, latest selection). + * + * @param project - Function that maps a signal to a Stream + * @returns An operator that produces a switched stream + * + * @example + * // Only show results for the latest search query + * searchInput$.pipe( + * switchMap(signal => fetchResults(signal.value.query)) + * ) + */ +export function switchMap( + project: (signal: T, index: number) => Stream, +): Operator { + return (source: Stream): Stream => + createStream((observer) => { + let currentInnerUnsub: Unsubscribe | null = null; + let innerActive = false; + let index = 0; + let sourceCompleted = false; + + const checkComplete = () => { + if (sourceCompleted && !innerActive) { + observer.complete?.(); + } + }; + + const sourceUnsub = source.on({ + next(signal) { + try { + // Unsubscribe from previous inner stream + if (currentInnerUnsub) { + currentInnerUnsub(); + currentInnerUnsub = null; + } + innerActive = false; + + const innerStream = project(signal, index++); + innerActive = true; + + currentInnerUnsub = innerStream.on({ + next(innerSignal) { + observer.next(innerSignal); + }, + error(err) { + observer.error?.(err); + }, + complete() { + innerActive = false; + currentInnerUnsub = null; + checkComplete(); + }, + }); + } catch (err) { + observer.error?.(err); + } + }, + error(err) { + observer.error?.(err); + }, + complete() { + sourceCompleted = true; + checkComplete(); + }, + }); + + return () => { + sourceUnsub(); + if (currentInnerUnsub) { + currentInnerUnsub(); + } + }; + }); +}