-
Notifications
You must be signed in to change notification settings - Fork 2
✨ server: add queue on user creation #619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| "@exactly/server": patch | ||
| --- | ||
|
|
||
| ✨ add queue on user creation |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| import { addBreadcrumb, captureException, startSpan, type Span } from "@sentry/node"; | ||
| import { Queue, Worker, type Job } from "bullmq"; | ||
| import Redis from "ioredis"; | ||
|
|
||
| import { AlchemyJob, QueueName } from "./constants"; | ||
| import { headers } from "../utils/alchemy"; | ||
|
|
||
| const ALCHEMY_WEBHOOK_URL = "https://dashboard.alchemy.com/api/update-webhook-addresses"; | ||
| const disableWorkers = process.env.DISABLE_WORKERS === "true"; | ||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| const SENTRY_SPAN_ERROR_CODE = 2; | ||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /** | ||
| * interface representing the data payload for alchemy background jobs. | ||
| */ | ||
| export type AlchemyJobData = { | ||
| /** the likely ethereum address of the account to subscribe. */ | ||
| account: string; | ||
| /** the alchemy webhook id to update. */ | ||
| webhookId: string; | ||
| }; | ||
|
|
||
| if (!process.env.REDIS_URL) throw new Error("REDIS_URL environment variable is not set"); | ||
| const connection = new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null }); | ||
|
|
||
| /** | ||
| * bullmq queue for managing alchemy-related background tasks. | ||
| * used primarily for offloading webhook subscription updates to avoid blocking the main thread | ||
| * and to allow for retries on api failures. | ||
| */ | ||
| export const alchemyQueue = new Queue(QueueName.ALCHEMY, { connection }); | ||
|
|
||
| /** | ||
| * processor function for the alchemy worker. | ||
| * handles 'add-subscriber' jobs by calling the alchemy api. | ||
| * | ||
| * @param job - the bullmq job containing the subscription details. | ||
| */ | ||
| export async function processor(job: Job<AlchemyJobData>) { | ||
| return startSpan( | ||
| { name: "alchemy.processor", op: "queue.process", attributes: { job: job.name, ...job.data } }, | ||
| async (span: Span) => { | ||
| switch (job.name) { | ||
| case AlchemyJob.ADD_SUBSCRIBER: { | ||
| const { account, webhookId } = job.data; | ||
| const response = await fetch(ALCHEMY_WEBHOOK_URL, { | ||
| method: "PATCH", | ||
| headers, | ||
| body: JSON.stringify({ webhook_id: webhookId, addresses_to_add: [account], addresses_to_remove: [] }), | ||
| }); | ||
| if (!response.ok) { | ||
| const text = await response.text(); | ||
| span.setStatus({ code: SENTRY_SPAN_ERROR_CODE, message: text }); | ||
| throw new Error(`${response.status} ${text}`); | ||
| } | ||
| break; | ||
| } | ||
| default: { | ||
| const message = `Unknown job name: ${job.name}`; | ||
| span.setStatus({ code: SENTRY_SPAN_ERROR_CODE, message }); | ||
| throw new Error(message); | ||
| } | ||
| } | ||
| }, | ||
| ); | ||
| } | ||
|
|
||
| export let alchemyWorker: undefined | Worker; | ||
|
|
||
| // this logic here is to prevent certain build code from trying | ||
| // to initialize the worker and redis | ||
| if (!disableWorkers) { | ||
| alchemyWorker = new Worker(QueueName.ALCHEMY, processor, { | ||
| connection, | ||
| limiter: { max: 10, duration: 1000 }, | ||
| // cspell:ignore autorun | ||
| autorun: true, | ||
| }); | ||
|
|
||
| alchemyWorker | ||
| .on("failed", (job: Job<AlchemyJobData> | undefined, error: Error) => { | ||
| captureException(error, { extra: { job: job?.data } }); | ||
| }) | ||
| .on("completed", (job: Job<AlchemyJobData>) => { | ||
| addBreadcrumb({ | ||
| category: "queue", | ||
| message: `Job ${job.id} completed`, | ||
| level: "info", | ||
| data: { job: job.data }, | ||
| }); | ||
| }) | ||
| .on("active", (job: Job<AlchemyJobData>) => { | ||
| addBreadcrumb({ | ||
| category: "queue", | ||
| message: `Job ${job.id} active`, | ||
| level: "info", | ||
| data: { job: job.data }, | ||
| }); | ||
| }) | ||
| .on("error", (error: Error) => { | ||
| captureException(error, { tags: { queue: QueueName.ALCHEMY } }); | ||
| }); | ||
| } | ||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| export async function close() { | ||
| await Promise.all([connection.quit(), alchemyWorker?.close() ?? Promise.resolve(), alchemyQueue.close()]); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| export const QueueName = { | ||
| ALCHEMY: "alchemy", | ||
| } as const; | ||
|
|
||
| export type QueueNameEnum = (typeof QueueName)[keyof typeof QueueName]; | ||
|
|
||
| export const AlchemyJob = { | ||
| ADD_SUBSCRIBER: "add-subscriber", | ||
| } as const; | ||
|
|
||
| export type AlchemyJobEnum = (typeof AlchemyJob)[keyof typeof AlchemyJob]; |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,89 @@ | ||||||
| import { startSpan } from "@sentry/node"; | ||||||
| import type { Job } from "bullmq"; | ||||||
| import { beforeEach, describe, expect, it, vi } from "vitest"; | ||||||
|
|
||||||
| import { type AlchemyJobData, processor } from "../../queues/alchemyQueue"; | ||||||
| import { AlchemyJob } from "../../queues/constants"; | ||||||
|
|
||||||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| // Mock dependencies | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use lowercase comment text to match the style guide. ♻️ Proposed fix-// Mock dependencies
+// mock dependencies📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||
| vi.mock("../../utils/alchemy", () => ({ | ||||||
| headers: { "X-Alchemy-Token": "mock-token" }, | ||||||
| })); | ||||||
|
|
||||||
| vi.mock("../../hooks/activity", () => ({ | ||||||
| webhookId: "hook-123", | ||||||
| })); | ||||||
|
|
||||||
| vi.mock("@sentry/node", () => ({ | ||||||
| captureException: vi.fn<(...args: unknown[]) => unknown>(), | ||||||
| startSpan: vi | ||||||
| .fn<(context: unknown, callback: (span: unknown) => unknown) => unknown>() | ||||||
| .mockImplementation((_context: unknown, callback: (span: unknown) => unknown) => | ||||||
| callback({ setStatus: vi.fn<() => unknown>() }), | ||||||
| ), | ||||||
| addBreadcrumb: vi.fn<(...args: unknown[]) => unknown>(), | ||||||
| })); | ||||||
|
|
||||||
| vi.spyOn(globalThis, "fetch").mockResolvedValue({ | ||||||
| ok: true, | ||||||
| json: () => Promise.resolve({}), | ||||||
| text: () => Promise.resolve(""), | ||||||
| } as Response); | ||||||
|
|
||||||
| describe("alchemyQueue worker processor", () => { | ||||||
| beforeEach(() => { | ||||||
| vi.clearAllMocks(); | ||||||
| }); | ||||||
|
|
||||||
| it("should call Alchemy API to update webhook addresses", async () => { | ||||||
| const job = { | ||||||
| name: AlchemyJob.ADD_SUBSCRIBER, | ||||||
| data: { | ||||||
| account: "0x123", | ||||||
| webhookId: "hook-123", | ||||||
| }, | ||||||
| } as unknown as Job<AlchemyJobData>; | ||||||
|
|
||||||
| await processor(job); | ||||||
|
|
||||||
| expect(fetch).toHaveBeenCalledWith( | ||||||
| "https://dashboard.alchemy.com/api/update-webhook-addresses", | ||||||
| expect.objectContaining({ | ||||||
| method: "PATCH", | ||||||
| headers: expect.objectContaining({ "X-Alchemy-Token": "mock-token" }) as Record<string, string>, | ||||||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| body: JSON.stringify({ | ||||||
| webhook_id: "hook-123", | ||||||
| addresses_to_add: ["0x123"], | ||||||
| addresses_to_remove: [], | ||||||
| }), | ||||||
| }), | ||||||
| ); | ||||||
| expect(startSpan).toHaveBeenCalledWith( | ||||||
| expect.objectContaining({ name: "alchemy.processor", op: "queue.process" }), | ||||||
| expect.any(Function), | ||||||
| ); | ||||||
| }); | ||||||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
|
||||||
| it("should throw an error for unknown job names", async () => { | ||||||
| const job = { name: "unknown", data: {} } as unknown as Job<AlchemyJobData>; | ||||||
| await expect(processor(job)).rejects.toThrow("Unknown job name: unknown"); | ||||||
| }); | ||||||
|
|
||||||
| it("should throw an error if Alchemy API call fails", async () => { | ||||||
| vi.spyOn(globalThis, "fetch").mockResolvedValueOnce({ | ||||||
| ok: false, | ||||||
| status: 500, | ||||||
| text: () => Promise.resolve("Internal Server Error"), | ||||||
| } as Response); | ||||||
|
|
||||||
| const job = { | ||||||
| name: AlchemyJob.ADD_SUBSCRIBER, | ||||||
| data: { | ||||||
| account: "0x123", | ||||||
| webhookId: "hook-123", | ||||||
| }, | ||||||
| } as unknown as Job<AlchemyJobData>; | ||||||
|
|
||||||
| await expect(processor(job)).rejects.toThrow("500 Internal Server Error"); | ||||||
| }); | ||||||
| }); | ||||||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,114 @@ | ||
| import { captureException } from "@sentry/core"; | ||
| import { beforeEach, describe, expect, it, vi } from "vitest"; | ||
|
|
||
| import { AlchemyJob } from "../../queues/constants"; | ||
| import createCredential from "../../utils/createCredential"; | ||
|
|
||
| import type { Context } from "hono"; | ||
|
|
||
| const mocks = vi.hoisted(() => ({ | ||
| webhookId: { value: "webhook-id" as string | undefined }, | ||
| addJob: vi.fn().mockResolvedValue({}), | ||
| })); | ||
|
|
||
| // Mock dependencies | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use lowercase comment text to match the style guide. ♻️ Proposed fix-// Mock dependencies
+// mock dependencies-// Mock global fetch to avoid actual network calls
+// mock global fetch to avoid actual network callsAlso applies to: 55-55 🤖 Prompt for AI Agents |
||
| vi.mock("../../queues/alchemyQueue", () => ({ | ||
| alchemyQueue: { | ||
| add: mocks.addJob, | ||
| }, | ||
| })); | ||
|
|
||
| vi.mock("../../database", () => ({ | ||
| default: { | ||
| insert: vi.fn().mockReturnValue({ values: vi.fn() }), | ||
| }, | ||
| credentials: {}, | ||
| })); | ||
|
|
||
| vi.mock("hono/cookie", () => ({ | ||
| setSignedCookie: vi.fn(), | ||
| })); | ||
|
|
||
| vi.mock("@sentry/core", () => ({ | ||
| setUser: vi.fn(), | ||
| captureException: vi.fn(), | ||
| })); | ||
|
|
||
| vi.mock("@sentry/node", { spy: true }); | ||
|
|
||
| vi.mock("../../utils/segment", () => ({ | ||
| identify: vi.fn(), | ||
| })); | ||
|
|
||
| vi.mock("../../utils/authSecret", () => ({ | ||
| default: "secret", | ||
| })); | ||
|
|
||
| vi.mock("../../hooks/activity", () => ({ | ||
| get webhookId() { | ||
| return mocks.webhookId.value; | ||
| }, | ||
| })); | ||
|
|
||
| vi.mock("../../utils/sardine", () => ({ | ||
| customer: vi.fn().mockResolvedValue({}), | ||
| })); | ||
|
|
||
| // Mock global fetch to avoid actual network calls | ||
| vi.spyOn(globalThis, "fetch").mockResolvedValue({ | ||
| ok: true, | ||
| json: () => Promise.resolve({}), | ||
| text: () => Promise.resolve(""), | ||
| } as Response); | ||
|
|
||
| describe("createCredential - job queue", () => { | ||
| const mockContext = { req: {}, json: vi.fn<(data: unknown) => Response>() } as unknown as Context; | ||
|
|
||
| beforeEach(() => { | ||
| vi.clearAllMocks(); | ||
| mocks.webhookId.value = "webhook-id"; | ||
| }); | ||
|
|
||
| it("should add a job to alchemyQueue when credential is created", async () => { | ||
| const credentialId = "0x1234567890123456789012345678901234567890"; | ||
|
|
||
| await createCredential(mockContext, credentialId); | ||
|
|
||
| expect(mocks.addJob).toHaveBeenCalledWith( | ||
| AlchemyJob.ADD_SUBSCRIBER, | ||
| expect.objectContaining({ | ||
| account: expect.stringMatching(/^0x/) as string, | ||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| webhookId: "webhook-id", | ||
| }), | ||
| ); | ||
| }); | ||
|
|
||
| it("should capture exception when alchemyQueue.add fails", async () => { | ||
| const credentialId = "0x1234567890123456789012345678901234567890"; | ||
| const error = new Error("queue error"); | ||
| mocks.addJob.mockRejectedValueOnce(error); | ||
|
|
||
| await createCredential(mockContext, credentialId); | ||
|
|
||
| expect(vi.mocked(captureException)).toHaveBeenCalledWith( | ||
| error, | ||
| expect.objectContaining({ | ||
| level: "error", | ||
| extra: expect.objectContaining({ | ||
| job: AlchemyJob.ADD_SUBSCRIBER, | ||
| webhookId: "webhook-id", | ||
| }) as Record<string, unknown>, | ||
aguxez marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }), | ||
| ); | ||
| }); | ||
|
|
||
| it("should not add job to alchemyQueue if webhookId is undefined", async () => { | ||
| const credentialId = "0x1234567890123456789012345678901234567890"; | ||
| mocks.webhookId.value = undefined; | ||
|
|
||
| await createCredential(mockContext, credentialId); | ||
|
|
||
| expect(mocks.addJob).not.toHaveBeenCalled(); | ||
| expect(vi.mocked(captureException)).not.toHaveBeenCalled(); | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,18 +8,18 @@ import deriveAddress from "@exactly/common/deriveAddress"; | |
| import domain from "@exactly/common/domain"; | ||
| import { exaAccountFactoryAddress } from "@exactly/common/generated/chain"; | ||
| import { Address } from "@exactly/common/validation"; | ||
| import type { WebAuthnCredential } from "@simplewebauthn/server"; | ||
| import type { Context } from "hono"; | ||
|
|
||
| import { updateWebhookAddresses } from "./alchemy"; | ||
| import database from "../database"; | ||
| import { credentials } from "../database/schema"; | ||
| import { webhookId } from "../hooks/activity"; | ||
| import { alchemyQueue } from "../queues/alchemyQueue"; | ||
| import { AlchemyJob } from "../queues/constants"; | ||
| import authSecret from "./authSecret"; | ||
| import decodePublicKey from "./decodePublicKey"; | ||
| import { customer } from "./sardine"; | ||
| import { identify } from "./segment"; | ||
| import database from "../database"; | ||
| import { credentials } from "../database/schema"; | ||
| import { webhookId } from "../hooks/activity"; | ||
|
|
||
| import type { WebAuthnCredential } from "@simplewebauthn/server"; | ||
| import type { Context } from "hono"; | ||
|
|
||
| export default async function createCredential<C extends string>( | ||
| c: Context, | ||
|
|
@@ -52,7 +52,13 @@ export default async function createCredential<C extends string>( | |
| ? { sameSite: "lax", secure: false } | ||
| : { domain, sameSite: "none", secure: true, partitioned: true }), | ||
| }), | ||
| updateWebhookAddresses(webhookId, [account]).catch((error: unknown) => captureException(error)), | ||
| webhookId | ||
| ? alchemyQueue | ||
| .add(AlchemyJob.ADD_SUBSCRIBER, { account, webhookId }) | ||
| .catch((error: unknown) => | ||
| captureException(error, { level: "error", extra: { job: AlchemyJob.ADD_SUBSCRIBER, account, webhookId } }), | ||
| ) | ||
| : undefined, | ||
|
Comment on lines
+55
to
+61
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check if there's any retry or reconciliation logic for missed webhook subscriptions
echo "=== Checking for retry/reconciliation mechanisms ==="
rg -n -B 2 -A 5 "webhookId|ADD_SUBSCRIBER" server/queues/ server/hooks/activity.ts 2>/dev/null || echo "(No matches or files not found)"
echo -e "\n=== Check if webhookId is ever retried or reconciled ==="
rg -n "retry|reconcile|missing.*webhook|sync.*subscriber" server/ 2>/dev/null || echo "(No matches)"
echo -e "\n=== Check createCredential.ts context around webhookId handling ==="
cat -n server/utils/createCredential.ts 2>/dev/null | head -80Repository: exactly/exa Length of output: 7191 Conditional guard prevents errors, but silent skip during startup race causes missed subscriptions. The No retry or reconciliation logic exists to handle missed subscriptions. Consider:
🤖 Prompt for AI Agents |
||
| customer({ flow: { name: "signup", type: "signup" }, customer: { id: credentialId } }).catch((error: unknown) => | ||
| captureException(error, { level: "error" }), | ||
| ), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.