Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/short-cars-return.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@exactly/server": patch
---

✨ add queue on user creation
3 changes: 2 additions & 1 deletion server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import block from "./hooks/block";
import manteca from "./hooks/manteca";
import panda from "./hooks/panda";
import persona from "./hooks/persona";
import { close as closeAlchemyQueue } from "./queues/alchemyQueue";
import androidFingerprints from "./utils/android/fingerprints";
import appOrigin from "./utils/appOrigin";
import { closeAndFlush as closeSegment } from "./utils/segment";
Expand Down Expand Up @@ -286,7 +287,7 @@ const server = serve(app);
export async function close() {
return new Promise((resolve, reject) => {
server.close((error) => {
Promise.allSettled([closeSentry(), closeSegment(), database.$client.end()])
Promise.allSettled([closeSentry(), closeSegment(), database.$client.end(), closeAlchemyQueue()])
.then((results) => {
if (error) reject(error);
else if (results.some((result) => result.status === "rejected")) reject(new Error("closing services failed"));
Expand Down
106 changes: 106 additions & 0 deletions server/queues/alchemyQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { addBreadcrumb, captureException, startSpan, type Span } from "@sentry/node";
import { Queue, Worker, type Job } from "bullmq";
import Redis from "ioredis";

import { AlchemyJob, QueueName } from "./constants";
import { headers } from "../utils/alchemy";

const ALCHEMY_WEBHOOK_URL = "https://dashboard.alchemy.com/api/update-webhook-addresses";
const disableWorkers = process.env.DISABLE_WORKERS === "true";
const SENTRY_SPAN_ERROR_CODE = 2;

/**
* interface representing the data payload for alchemy background jobs.
*/
export type AlchemyJobData = {
/** the likely ethereum address of the account to subscribe. */
account: string;
/** the alchemy webhook id to update. */
webhookId: string;
};

if (!process.env.REDIS_URL) throw new Error("REDIS_URL environment variable is not set");
const connection = new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null });

/**
* bullmq queue for managing alchemy-related background tasks.
* used primarily for offloading webhook subscription updates to avoid blocking the main thread
* and to allow for retries on api failures.
*/
export const alchemyQueue = new Queue(QueueName.ALCHEMY, { connection });

/**
* processor function for the alchemy worker.
* handles 'add-subscriber' jobs by calling the alchemy api.
*
* @param job - the bullmq job containing the subscription details.
*/
export async function processor(job: Job<AlchemyJobData>) {
return startSpan(
{ name: "alchemy.processor", op: "queue.process", attributes: { job: job.name, ...job.data } },
async (span: Span) => {
switch (job.name) {
case AlchemyJob.ADD_SUBSCRIBER: {
const { account, webhookId } = job.data;
const response = await fetch(ALCHEMY_WEBHOOK_URL, {
method: "PATCH",
headers,
body: JSON.stringify({ webhook_id: webhookId, addresses_to_add: [account], addresses_to_remove: [] }),
});
if (!response.ok) {
const text = await response.text();
span.setStatus({ code: SENTRY_SPAN_ERROR_CODE, message: text });
throw new Error(`${response.status} ${text}`);
}
break;
}
default: {
const message = `Unknown job name: ${job.name}`;
span.setStatus({ code: SENTRY_SPAN_ERROR_CODE, message });
throw new Error(message);
}
}
},
);
}

export let alchemyWorker: undefined | Worker;

// this logic here is to prevent certain build code from trying
// to initialize the worker and redis
if (!disableWorkers) {
alchemyWorker = new Worker(QueueName.ALCHEMY, processor, {
connection,
limiter: { max: 10, duration: 1000 },
// cspell:ignore autorun
autorun: true,
});

alchemyWorker
.on("failed", (job: Job<AlchemyJobData> | undefined, error: Error) => {
captureException(error, { extra: { job: job?.data } });
})
.on("completed", (job: Job<AlchemyJobData>) => {
addBreadcrumb({
category: "queue",
message: `Job ${job.id} completed`,
level: "info",
data: { job: job.data },
});
})
.on("active", (job: Job<AlchemyJobData>) => {
addBreadcrumb({
category: "queue",
message: `Job ${job.id} active`,
level: "info",
data: { job: job.data },
});
})
.on("error", (error: Error) => {
captureException(error, { tags: { queue: QueueName.ALCHEMY } });
});
}

export async function close() {
await Promise.all([connection.quit(), alchemyWorker?.close() ?? Promise.resolve(), alchemyQueue.close()]);
}
11 changes: 11 additions & 0 deletions server/queues/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export const QueueName = {
ALCHEMY: "alchemy",
} as const;

export type QueueNameEnum = (typeof QueueName)[keyof typeof QueueName];

export const AlchemyJob = {
ADD_SUBSCRIBER: "add-subscriber",
} as const;

export type AlchemyJobEnum = (typeof AlchemyJob)[keyof typeof AlchemyJob];
3 changes: 2 additions & 1 deletion server/script/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ process.env.PERSONA_API_KEY = "persona";
process.env.PERSONA_URL = "https://persona.test";
process.env.PERSONA_WEBHOOK_SECRET = "persona";
process.env.POSTGRES_URL = "postgres";
process.env.REDIS_URL = "redis";
process.env.REDIS_URL = "redis://127.0.0.1:6379";
process.env.DISABLE_WORKERS = "true";
process.env.SARDINE_API_KEY = "sardine";
process.env.SARDINE_API_URL = "https://api.sardine.ai";
process.env.SEGMENT_WRITE_KEY = "segment";
Expand Down
89 changes: 89 additions & 0 deletions server/test/queues/alchemyQueue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { startSpan } from "@sentry/node";
import type { Job } from "bullmq";
import { beforeEach, describe, expect, it, vi } from "vitest";

import { type AlchemyJobData, processor } from "../../queues/alchemyQueue";
import { AlchemyJob } from "../../queues/constants";

// Mock dependencies
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use lowercase comment text to match the style guide.
As per coding guidelines.

♻️ Proposed fix
-// Mock dependencies
+// mock dependencies
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Mock dependencies
// mock dependencies
🤖 Prompt for AI Agents
In `@server/test/queues/alchemyQueue.test.ts` at line 8, Change the comment "//
Mock dependencies" to lowercase to follow the style guide: replace it with "mock
dependencies" in the alchemyQueue.test.ts file (look for the top-of-file comment
string "// Mock dependencies") so the test file's comment text uses lowercase
styling.

vi.mock("../../utils/alchemy", () => ({
headers: { "X-Alchemy-Token": "mock-token" },
}));

vi.mock("../../hooks/activity", () => ({
webhookId: "hook-123",
}));

vi.mock("@sentry/node", () => ({
captureException: vi.fn<(...args: unknown[]) => unknown>(),
startSpan: vi
.fn<(context: unknown, callback: (span: unknown) => unknown) => unknown>()
.mockImplementation((_context: unknown, callback: (span: unknown) => unknown) =>
callback({ setStatus: vi.fn<() => unknown>() }),
),
addBreadcrumb: vi.fn<(...args: unknown[]) => unknown>(),
}));

vi.spyOn(globalThis, "fetch").mockResolvedValue({
ok: true,
json: () => Promise.resolve({}),
text: () => Promise.resolve(""),
} as Response);

describe("alchemyQueue worker processor", () => {
beforeEach(() => {
vi.clearAllMocks();
});

it("should call Alchemy API to update webhook addresses", async () => {
const job = {
name: AlchemyJob.ADD_SUBSCRIBER,
data: {
account: "0x123",
webhookId: "hook-123",
},
} as unknown as Job<AlchemyJobData>;

await processor(job);

expect(fetch).toHaveBeenCalledWith(
"https://dashboard.alchemy.com/api/update-webhook-addresses",
expect.objectContaining({
method: "PATCH",
headers: expect.objectContaining({ "X-Alchemy-Token": "mock-token" }) as Record<string, string>,
body: JSON.stringify({
webhook_id: "hook-123",
addresses_to_add: ["0x123"],
addresses_to_remove: [],
}),
}),
);
expect(startSpan).toHaveBeenCalledWith(
expect.objectContaining({ name: "alchemy.processor", op: "queue.process" }),
expect.any(Function),
);
});

it("should throw an error for unknown job names", async () => {
const job = { name: "unknown", data: {} } as unknown as Job<AlchemyJobData>;
await expect(processor(job)).rejects.toThrow("Unknown job name: unknown");
});

it("should throw an error if Alchemy API call fails", async () => {
vi.spyOn(globalThis, "fetch").mockResolvedValueOnce({
ok: false,
status: 500,
text: () => Promise.resolve("Internal Server Error"),
} as Response);

const job = {
name: AlchemyJob.ADD_SUBSCRIBER,
data: {
account: "0x123",
webhookId: "hook-123",
},
} as unknown as Job<AlchemyJobData>;

await expect(processor(job)).rejects.toThrow("500 Internal Server Error");
});
});
114 changes: 114 additions & 0 deletions server/test/utils/createCredential.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { captureException } from "@sentry/core";
import { beforeEach, describe, expect, it, vi } from "vitest";

import { AlchemyJob } from "../../queues/constants";
import createCredential from "../../utils/createCredential";

import type { Context } from "hono";

const mocks = vi.hoisted(() => ({
webhookId: { value: "webhook-id" as string | undefined },
addJob: vi.fn().mockResolvedValue({}),
}));

// Mock dependencies
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Use lowercase comment text to match the style guide.
As per coding guidelines.

♻️ Proposed fix
-// Mock dependencies
+// mock dependencies
-// Mock global fetch to avoid actual network calls
+// mock global fetch to avoid actual network calls

Also applies to: 55-55

🤖 Prompt for AI Agents
In `@server/test/utils/createCredential.test.ts` at line 13, Replace the comment
string "// Mock dependencies" with a lowercase version "// mock dependencies"
wherever it appears in the test (both occurrences flagged in the review) so the
comment text matches the project's style guide; search for the exact comment
token and update it to the lowercase form.

vi.mock("../../queues/alchemyQueue", () => ({
alchemyQueue: {
add: mocks.addJob,
},
}));

vi.mock("../../database", () => ({
default: {
insert: vi.fn().mockReturnValue({ values: vi.fn() }),
},
credentials: {},
}));

vi.mock("hono/cookie", () => ({
setSignedCookie: vi.fn(),
}));

vi.mock("@sentry/core", () => ({
setUser: vi.fn(),
captureException: vi.fn(),
}));

vi.mock("@sentry/node", { spy: true });

vi.mock("../../utils/segment", () => ({
identify: vi.fn(),
}));

vi.mock("../../utils/authSecret", () => ({
default: "secret",
}));

vi.mock("../../hooks/activity", () => ({
get webhookId() {
return mocks.webhookId.value;
},
}));

vi.mock("../../utils/sardine", () => ({
customer: vi.fn().mockResolvedValue({}),
}));

// Mock global fetch to avoid actual network calls
vi.spyOn(globalThis, "fetch").mockResolvedValue({
ok: true,
json: () => Promise.resolve({}),
text: () => Promise.resolve(""),
} as Response);

describe("createCredential - job queue", () => {
const mockContext = { req: {}, json: vi.fn<(data: unknown) => Response>() } as unknown as Context;

beforeEach(() => {
vi.clearAllMocks();
mocks.webhookId.value = "webhook-id";
});

it("should add a job to alchemyQueue when credential is created", async () => {
const credentialId = "0x1234567890123456789012345678901234567890";

await createCredential(mockContext, credentialId);

expect(mocks.addJob).toHaveBeenCalledWith(
AlchemyJob.ADD_SUBSCRIBER,
expect.objectContaining({
account: expect.stringMatching(/^0x/) as string,
webhookId: "webhook-id",
}),
);
});

it("should capture exception when alchemyQueue.add fails", async () => {
const credentialId = "0x1234567890123456789012345678901234567890";
const error = new Error("queue error");
mocks.addJob.mockRejectedValueOnce(error);

await createCredential(mockContext, credentialId);

expect(vi.mocked(captureException)).toHaveBeenCalledWith(
error,
expect.objectContaining({
level: "error",
extra: expect.objectContaining({
job: AlchemyJob.ADD_SUBSCRIBER,
webhookId: "webhook-id",
}) as Record<string, unknown>,
}),
);
});

it("should not add job to alchemyQueue if webhookId is undefined", async () => {
const credentialId = "0x1234567890123456789012345678901234567890";
mocks.webhookId.value = undefined;

await createCredential(mockContext, credentialId);

expect(mocks.addJob).not.toHaveBeenCalled();
expect(vi.mocked(captureException)).not.toHaveBeenCalled();
});
});
22 changes: 14 additions & 8 deletions server/utils/createCredential.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import deriveAddress from "@exactly/common/deriveAddress";
import domain from "@exactly/common/domain";
import { exaAccountFactoryAddress } from "@exactly/common/generated/chain";
import { Address } from "@exactly/common/validation";
import type { WebAuthnCredential } from "@simplewebauthn/server";
import type { Context } from "hono";

import { updateWebhookAddresses } from "./alchemy";
import database from "../database";
import { credentials } from "../database/schema";
import { webhookId } from "../hooks/activity";
import { alchemyQueue } from "../queues/alchemyQueue";
import { AlchemyJob } from "../queues/constants";
import authSecret from "./authSecret";
import decodePublicKey from "./decodePublicKey";
import { customer } from "./sardine";
import { identify } from "./segment";
import database from "../database";
import { credentials } from "../database/schema";
import { webhookId } from "../hooks/activity";

import type { WebAuthnCredential } from "@simplewebauthn/server";
import type { Context } from "hono";

export default async function createCredential<C extends string>(
c: Context,
Expand Down Expand Up @@ -52,7 +52,13 @@ export default async function createCredential<C extends string>(
? { sameSite: "lax", secure: false }
: { domain, sameSite: "none", secure: true, partitioned: true }),
}),
updateWebhookAddresses(webhookId, [account]).catch((error: unknown) => captureException(error)),
webhookId
? alchemyQueue
.add(AlchemyJob.ADD_SUBSCRIBER, { account, webhookId })
.catch((error: unknown) =>
captureException(error, { level: "error", extra: { job: AlchemyJob.ADD_SUBSCRIBER, account, webhookId } }),
)
: undefined,
Comment on lines +55 to +61
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if there's any retry or reconciliation logic for missed webhook subscriptions
echo "=== Checking for retry/reconciliation mechanisms ==="
rg -n -B 2 -A 5 "webhookId|ADD_SUBSCRIBER" server/queues/ server/hooks/activity.ts 2>/dev/null || echo "(No matches or files not found)"

echo -e "\n=== Check if webhookId is ever retried or reconciled ==="
rg -n "retry|reconcile|missing.*webhook|sync.*subscriber" server/ 2>/dev/null || echo "(No matches)"

echo -e "\n=== Check createCredential.ts context around webhookId handling ==="
cat -n server/utils/createCredential.ts 2>/dev/null | head -80

Repository: exactly/exa

Length of output: 7191


Conditional guard prevents errors, but silent skip during startup race causes missed subscriptions.

The webhookId check properly prevents queuing jobs with undefined IDs and error handling includes useful context. However, verification reveals a critical gap: webhookId is a module-level variable initialized asynchronously in activity.ts (lines 248–252). Credentials created during server startup—before the webhook is fetched/created—will silently skip enqueuing ADD_SUBSCRIBER jobs. Unlike early failures after webhookId is set, these early subscriptions have no recovery mechanism.

No retry or reconciliation logic exists to handle missed subscriptions. Consider:

  • Retrying ADD_SUBSCRIBER for credentials created before webhookId initialization completes
  • Or deferring credential creation until webhook is ready
🤖 Prompt for AI Agents
In `@server/utils/createCredential.ts` around lines 55 - 61, The current
conditional in createCredential.ts skips enqueuing AlchemyJob.ADD_SUBSCRIBER
when the module-level webhookId is still undefined (set asynchronously in
activity.ts), causing missed subscriptions during startup; modify
createCredential logic so that when webhookId is undefined it does not silently
drop the job but instead either (A) persist a pending-subscription record (e.g.,
push the credential id/account into a temporary pending list or DB table) and
expose a function like processPendingAlchemySubscriptions() that activity.ts
calls once webhookId is initialized to enqueue AlchemyJob.ADD_SUBSCRIBER for
each pending item, or (B) throw a retriable error to defer credential creation
until webhookId is ready; implement with references to webhookId,
alchemyQueue.add(AlchemyJob.ADD_SUBSCRIBER,...), and activity.ts initialization
to ensure missed ADD_SUBSCRIBER jobs are reconciled after webhookId is set.

customer({ flow: { name: "signup", type: "signup" }, customer: { id: credentialId } }).catch((error: unknown) =>
captureException(error, { level: "error" }),
),
Expand Down