Skip to content
Draft
2 changes: 1 addition & 1 deletion core/src/Stack.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { BaseDomainEvent } from "event-types/_base";
import type {
DomainEvent,
PoppedEvent,
Expand Down Expand Up @@ -29,6 +28,7 @@ export type Activity = {
id: string;
name: string;
transitionState: ActivityTransitionState;
estimatedTransitionEnd: number;
params: {
[key: string]: string | undefined;
};
Expand Down
14 changes: 8 additions & 6 deletions core/src/activity-utils/makeActivitiesReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ export function makeActivitiesReducer({
* Push new activity to activities
*/
Pushed(activities: Activity[], event: PushedEvent): Activity[] {
const isTransitionDone =
now - (resumedAt ?? event.eventDate) >= transitionDuration;
const estimatedTransitionEnd =
(resumedAt ?? event.eventDate) + transitionDuration;
const isTransitionDone = estimatedTransitionEnd <= now;

const transitionState: ActivityTransitionState =
event.skipEnterActiveState || isTransitionDone
Expand All @@ -37,7 +38,7 @@ export function makeActivitiesReducer({

return [
...activities.slice(0, reservedIndex),
makeActivityFromEvent(event, transitionState),
makeActivityFromEvent(event, transitionState, estimatedTransitionEnd),
...activities.slice(reservedIndex + 1),
];
},
Expand All @@ -46,8 +47,9 @@ export function makeActivitiesReducer({
* Replace activity at reservedIndex with new activity
*/
Replaced(activities: Activity[], event: ReplacedEvent): Activity[] {
const isTransitionDone =
now - (resumedAt ?? event.eventDate) >= transitionDuration;
const estimatedTransitionEnd =
(resumedAt ?? event.eventDate) + transitionDuration;
const isTransitionDone = estimatedTransitionEnd <= now;

const reservedIndex = findNewActivityIndex(activities, event);

Expand All @@ -60,7 +62,7 @@ export function makeActivitiesReducer({

return [
...activities.slice(0, reservedIndex),
makeActivityFromEvent(event, transitionState),
makeActivityFromEvent(event, transitionState, estimatedTransitionEnd),
...activities.slice(reservedIndex + 1),
];
},
Expand Down
2 changes: 2 additions & 0 deletions core/src/activity-utils/makeActivityFromEvent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import type { Activity, ActivityTransitionState } from "../Stack";
export function makeActivityFromEvent(
event: PushedEvent | ReplacedEvent,
transitionState: ActivityTransitionState,
estimatedTransitionEnd: number,
): Activity {
return {
id: event.activityId,
name: event.activityName,
transitionState,
estimatedTransitionEnd,
params: event.activityParams,
context: event.activityContext,
steps: [
Expand Down
8 changes: 5 additions & 3 deletions core/src/activity-utils/makeActivityReducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ export function makeActivityReducer(context: {
...activity,
exitedBy: event,
transitionState: "exit-done",
estimatedTransitionEnd: context.resumedAt ?? event.eventDate,
}),

/**
* Change transition state to exit-done or exit-active depending on skipExitActiveState
*/
Popped: (activity: Activity, event: PoppedEvent): Activity => {
const isTransitionDone =
context.now - (context.resumedAt ?? event.eventDate) >=
context.transitionDuration;
const estimatedTransitionEnd =
(context.resumedAt ?? event.eventDate) + context.transitionDuration;
const isTransitionDone = estimatedTransitionEnd <= context.now;

const transitionState: ActivityTransitionState =
event.skipExitActiveState || isTransitionDone
Expand All @@ -49,6 +50,7 @@ export function makeActivityReducer(context: {
...activity,
exitedBy: event,
transitionState,
estimatedTransitionEnd,
params:
transitionState === "exit-done"
? activity.steps[0].params
Expand Down
1 change: 1 addition & 0 deletions core/src/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ export function aggregate(inputEvents: DomainEvent[], now: number): Stack {
id: activity.id,
name: activity.name,
transitionState: activity.transitionState,
estimatedTransitionEnd: activity.estimatedTransitionEnd,
params: activity.params,
steps,
enteredBy: activity.enteredBy,
Expand Down
11 changes: 11 additions & 0 deletions core/src/aggregators/Aggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import type { Stack } from "../Stack";

export interface Aggregator {
getStack(): Stack;
dispatchEvent(event: DomainEvent): void;
subscribeChanges: (
listener: (effects: Effect[], stack: Stack) => void,
) => () => void;
}
97 changes: 97 additions & 0 deletions core/src/aggregators/SyncAggregator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { aggregate } from "../aggregate";
import type { Effect } from "../Effect";
import type { DomainEvent } from "../event-types";
import { produceEffects } from "../produceEffects";
import type { Stack } from "../Stack";
import { delay } from "../utils/delay";
import type { Publisher } from "../utils/publishers/Publisher";
import type { Scheduler } from "../utils/schedulers/Scheduler";
import type { Aggregator } from "./Aggregator";

export class SyncAggregator implements Aggregator {
private events: DomainEvent[];
private latestStackSnapshot: Stack;
private changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
private updateScheduler: Scheduler;

constructor(options: {
initialEvents: DomainEvent[];
changePublisher: Publisher<{ effects: Effect[]; stack: Stack }>;
updateScheduler: Scheduler;
}) {
this.events = options.initialEvents;
this.latestStackSnapshot = aggregate(this.events, Date.now());
this.changePublisher = options.changePublisher;
this.updateScheduler = options.updateScheduler;
}

getStack(): Stack {
return this.latestStackSnapshot;
}

dispatchEvent(event: DomainEvent): void {
this.events.push(event);
this.updateSnapshot();
}

subscribeChanges(
listener: (effects: Effect[], stack: Stack) => void,
): () => void {
return this.changePublisher.subscribe(({ effects, stack }) => {
listener(effects, stack);
});
}

private updateSnapshot(): void {
const previousSnapshot = this.latestStackSnapshot;
const currentSnapshot = aggregate(this.events, Date.now());
const effects = produceEffects(previousSnapshot, currentSnapshot);

if (effects.length > 0) {
this.latestStackSnapshot = currentSnapshot;
this.changePublisher.publish({
effects,
stack: this.latestStackSnapshot,
});
}

const earliestUpcomingTransitionStateUpdate =
this.calculateEarliestUpcomingTransitionStateUpdate();

if (earliestUpcomingTransitionStateUpdate) {
this.updateScheduler.schedule(async (options) => {
await delay(
earliestUpcomingTransitionStateUpdate.timestamp - Date.now(),
{ signal: options?.signal },
);

if (options?.signal?.aborted) return;

this.updateSnapshot();
});
}
}

private calculateEarliestUpcomingTransitionStateUpdate(): {
event: DomainEvent;
timestamp: number;
} | null {
const activeActivities = this.latestStackSnapshot.activities.filter(
(activity) =>
activity.transitionState === "enter-active" ||
activity.transitionState === "exit-active",
);
const mostRecentlyActivatedActivity = activeActivities.sort(
(a, b) => a.estimatedTransitionEnd - b.estimatedTransitionEnd,
)[0];

return mostRecentlyActivatedActivity
? {
event:
mostRecentlyActivatedActivity.exitedBy ??
mostRecentlyActivatedActivity.enteredBy,
timestamp: mostRecentlyActivatedActivity.estimatedTransitionEnd,
}
: null;
}
}
60 changes: 19 additions & 41 deletions core/src/makeCoreStore.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import isEqual from "react-fast-compare";
import { aggregate } from "./aggregate";
import type { Aggregator } from "./aggregators/Aggregator";
import { SyncAggregator } from "./aggregators/SyncAggregator";
import type { DomainEvent, PushedEvent, StepPushedEvent } from "./event-types";
import { makeEvent } from "./event-utils";
import type { StackflowActions, StackflowPlugin } from "./interfaces";
import { produceEffects } from "./produceEffects";
import type { Stack } from "./Stack";
import { divideBy, once } from "./utils";
import { Mutex } from "./utils/Mutex";
import { makeActions } from "./utils/makeActions";
import { ScheduledPublisher } from "./utils/publishers/ScheduledPublisher";
import { SequentialScheduler } from "./utils/schedulers/SequentialScheduler";
import { SwitchScheduler } from "./utils/schedulers/SwitchScheduler";
import { triggerPostEffectHooks } from "./utils/triggerPostEffectHooks";

const SECOND = 1000;

// 60FPS
const INTERVAL_MS = SECOND / 60;

export type MakeCoreStoreOptions = {
initialEvents: DomainEvent[];
initialContext?: any;
Expand Down Expand Up @@ -76,39 +73,26 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
options.handlers?.onInitialActivityNotFound?.();
}

const events: { value: DomainEvent[] } = {
value: [...initialRemainingEvents, ...initialPushedEvents],
};
const aggregator: Aggregator = new SyncAggregator({
initialEvents: [...initialRemainingEvents, ...initialPushedEvents],
changePublisher: new ScheduledPublisher(
new SequentialScheduler(new Mutex()),
),
updateScheduler: new SwitchScheduler(new Mutex()),
});

const stack = {
value: aggregate(events.value, new Date().getTime()),
};
aggregator.subscribeChanges((effects) => {
triggerPostEffectHooks(effects, pluginInstances, actions);
});

const actions: StackflowActions = {
getStack() {
return stack.value;
return aggregator.getStack();
},
dispatchEvent(name, params) {
const newEvent = makeEvent(name, params);
const nextStackValue = aggregate(
[...events.value, newEvent],
new Date().getTime(),
);

events.value.push(newEvent);
setStackValue(nextStackValue);

const interval = setInterval(() => {
const nextStackValue = aggregate(events.value, new Date().getTime());

if (!isEqual(stack.value, nextStackValue)) {
setStackValue(nextStackValue);
}

if (nextStackValue.globalTransitionState === "idle") {
clearInterval(interval);
}
}, INTERVAL_MS);
aggregator.dispatchEvent(newEvent);
},
push: () => {},
replace: () => {},
Expand All @@ -120,12 +104,6 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
resume: () => {},
};

const setStackValue = (nextStackValue: Stack) => {
const effects = produceEffects(stack.value, nextStackValue);
stack.value = nextStackValue;
triggerPostEffectHooks(effects, pluginInstances, actions);
};

// Initialize action methods after actions object is fully created
Object.assign(
actions,
Expand All @@ -145,7 +123,7 @@ export function makeCoreStore(options: MakeCoreStoreOptions): CoreStore {
});
});
}),
pullEvents: () => events.value,
pullEvents: () => aggregator.getStack().events,
subscribe(listener) {
storeListeners.push(listener);

Expand Down
12 changes: 6 additions & 6 deletions core/src/produceEffects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import { omit } from "./utils";
export function produceEffects(prevOutput: Stack, nextOutput: Stack): Effect[] {
const output: Effect[] = [];

const somethingChanged = !isEqual(prevOutput, nextOutput);

if (somethingChanged) {
output.push({
_TAG: "%SOMETHING_CHANGED%",
});
if (isEqual(prevOutput, nextOutput)) {
return [];
}

output.push({
_TAG: "%SOMETHING_CHANGED%",
});

const isPaused =
prevOutput.globalTransitionState !== "paused" &&
nextOutput.globalTransitionState === "paused";
Expand Down
56 changes: 56 additions & 0 deletions core/src/utils/Mutex.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { getAbortReason } from "./getAbortReason";

export class Mutex {
private lockWaitQueue: ((lockHandle: LockHandle) => void)[] = [];
private waitQueueFlushTask: Promise<void> | null = null;

acquire(options?: { signal?: AbortSignal }): Promise<LockHandle> {
return new Promise((resolve, reject) => {
const signal = options?.signal;
const abortHandler = () => {
if (!signal) return;

this.lockWaitQueue = this.lockWaitQueue.filter((h) => h !== resolve);

reject(getAbortReason(signal));
};
const lockWaiter = (lockHandle: LockHandle) => {
if (signal?.aborted) {
reject(getAbortReason(signal));
lockHandle.release();
} else {
resolve(lockHandle);
}

signal?.removeEventListener("abort", abortHandler);
};

if (signal?.aborted) throw getAbortReason(signal);

signal?.addEventListener("abort", abortHandler, { once: true });

this.lockWaitQueue.push(lockWaiter);
this.scheduleWaitQueueFlush();
});
}

private scheduleWaitQueueFlush(): void {
if (this.waitQueueFlushTask) return;

this.waitQueueFlushTask = Promise.resolve().then(async () => {
do {
const nextWaiter = this.lockWaitQueue.shift();

if (!nextWaiter) break;

await new Promise<void>((resolve) => nextWaiter({ release: resolve }));
} while (this.lockWaitQueue.length > 0);

this.waitQueueFlushTask = null;
});
}
}

export interface LockHandle {
release: () => void;
}
Loading
Loading