diff --git a/.changeset/short-cars-return.md b/.changeset/short-cars-return.md new file mode 100644 index 000000000..caeb50425 --- /dev/null +++ b/.changeset/short-cars-return.md @@ -0,0 +1,5 @@ +--- +"@exactly/server": patch +--- + +✨ add queue on user creation diff --git a/server/index.ts b/server/index.ts index 101d82029..258f1e726 100644 --- a/server/index.ts +++ b/server/index.ts @@ -15,6 +15,7 @@ import block from "./hooks/block"; import manteca from "./hooks/manteca"; import panda from "./hooks/panda"; import persona from "./hooks/persona"; +import { close as closeAlchemyQueue } from "./queues/alchemyQueue"; import androidFingerprints from "./utils/android/fingerprints"; import appOrigin from "./utils/appOrigin"; import { closeAndFlush as closeSegment } from "./utils/segment"; @@ -286,7 +287,7 @@ const server = serve(app); export async function close() { return new Promise((resolve, reject) => { server.close((error) => { - Promise.allSettled([closeSentry(), closeSegment(), database.$client.end()]) + Promise.allSettled([closeSentry(), closeSegment(), database.$client.end(), closeAlchemyQueue()]) .then((results) => { if (error) reject(error); else if (results.some((result) => result.status === "rejected")) reject(new Error("closing services failed")); diff --git a/server/queues/alchemyQueue.ts b/server/queues/alchemyQueue.ts new file mode 100644 index 000000000..882644bfb --- /dev/null +++ b/server/queues/alchemyQueue.ts @@ -0,0 +1,106 @@ +import { addBreadcrumb, captureException, startSpan, type Span } from "@sentry/node"; +import { Queue, Worker, type Job } from "bullmq"; +import Redis from "ioredis"; + +import { AlchemyJob, QueueName } from "./constants"; +import { headers } from "../utils/alchemy"; + +const ALCHEMY_WEBHOOK_URL = "https://dashboard.alchemy.com/api/update-webhook-addresses"; +const disableWorkers = process.env.DISABLE_WORKERS === "true"; +const SENTRY_SPAN_ERROR_CODE = 2; + +/** + * interface representing the data payload for alchemy background jobs. + */ +export type AlchemyJobData = { + /** the likely ethereum address of the account to subscribe. */ + account: string; + /** the alchemy webhook id to update. */ + webhookId: string; +}; + +if (!process.env.REDIS_URL) throw new Error("REDIS_URL environment variable is not set"); +const connection = new Redis(process.env.REDIS_URL, { maxRetriesPerRequest: null }); + +/** + * bullmq queue for managing alchemy-related background tasks. + * used primarily for offloading webhook subscription updates to avoid blocking the main thread + * and to allow for retries on api failures. + */ +export const alchemyQueue = new Queue(QueueName.ALCHEMY, { connection }); + +/** + * processor function for the alchemy worker. + * handles 'add-subscriber' jobs by calling the alchemy api. + * + * @param job - the bullmq job containing the subscription details. + */ +export async function processor(job: Job) { + return startSpan( + { name: "alchemy.processor", op: "queue.process", attributes: { job: job.name, ...job.data } }, + async (span: Span) => { + switch (job.name) { + case AlchemyJob.ADD_SUBSCRIBER: { + const { account, webhookId } = job.data; + const response = await fetch(ALCHEMY_WEBHOOK_URL, { + method: "PATCH", + headers, + body: JSON.stringify({ webhook_id: webhookId, addresses_to_add: [account], addresses_to_remove: [] }), + }); + if (!response.ok) { + const text = await response.text(); + span.setStatus({ code: SENTRY_SPAN_ERROR_CODE, message: text }); + throw new Error(`${response.status} ${text}`); + } + break; + } + default: { + const message = `Unknown job name: ${job.name}`; + span.setStatus({ code: SENTRY_SPAN_ERROR_CODE, message }); + throw new Error(message); + } + } + }, + ); +} + +export let alchemyWorker: undefined | Worker; + +// this logic here is to prevent certain build code from trying +// to initialize the worker and redis +if (!disableWorkers) { + alchemyWorker = new Worker(QueueName.ALCHEMY, processor, { + connection, + limiter: { max: 10, duration: 1000 }, + // cspell:ignore autorun + autorun: true, + }); + + alchemyWorker + .on("failed", (job: Job | undefined, error: Error) => { + captureException(error, { extra: { job: job?.data } }); + }) + .on("completed", (job: Job) => { + addBreadcrumb({ + category: "queue", + message: `Job ${job.id} completed`, + level: "info", + data: { job: job.data }, + }); + }) + .on("active", (job: Job) => { + addBreadcrumb({ + category: "queue", + message: `Job ${job.id} active`, + level: "info", + data: { job: job.data }, + }); + }) + .on("error", (error: Error) => { + captureException(error, { tags: { queue: QueueName.ALCHEMY } }); + }); +} + +export async function close() { + await Promise.all([connection.quit(), alchemyWorker?.close() ?? Promise.resolve(), alchemyQueue.close()]); +} diff --git a/server/queues/constants.ts b/server/queues/constants.ts new file mode 100644 index 000000000..37ff5f82e --- /dev/null +++ b/server/queues/constants.ts @@ -0,0 +1,11 @@ +export const QueueName = { + ALCHEMY: "alchemy", +} as const; + +export type QueueNameEnum = (typeof QueueName)[keyof typeof QueueName]; + +export const AlchemyJob = { + ADD_SUBSCRIBER: "add-subscriber", +} as const; + +export type AlchemyJobEnum = (typeof AlchemyJob)[keyof typeof AlchemyJob]; diff --git a/server/script/openapi.ts b/server/script/openapi.ts index 124c51240..a0879c622 100644 --- a/server/script/openapi.ts +++ b/server/script/openapi.ts @@ -25,7 +25,8 @@ process.env.PERSONA_API_KEY = "persona"; process.env.PERSONA_URL = "https://persona.test"; process.env.PERSONA_WEBHOOK_SECRET = "persona"; process.env.POSTGRES_URL = "postgres"; -process.env.REDIS_URL = "redis"; +process.env.REDIS_URL = "redis://127.0.0.1:6379"; +process.env.DISABLE_WORKERS = "true"; process.env.SARDINE_API_KEY = "sardine"; process.env.SARDINE_API_URL = "https://api.sardine.ai"; process.env.SEGMENT_WRITE_KEY = "segment"; diff --git a/server/test/queues/alchemyQueue.test.ts b/server/test/queues/alchemyQueue.test.ts new file mode 100644 index 000000000..ba8246111 --- /dev/null +++ b/server/test/queues/alchemyQueue.test.ts @@ -0,0 +1,89 @@ +import { startSpan } from "@sentry/node"; +import type { Job } from "bullmq"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { type AlchemyJobData, processor } from "../../queues/alchemyQueue"; +import { AlchemyJob } from "../../queues/constants"; + +// Mock dependencies +vi.mock("../../utils/alchemy", () => ({ + headers: { "X-Alchemy-Token": "mock-token" }, +})); + +vi.mock("../../hooks/activity", () => ({ + webhookId: "hook-123", +})); + +vi.mock("@sentry/node", () => ({ + captureException: vi.fn<(...args: unknown[]) => unknown>(), + startSpan: vi + .fn<(context: unknown, callback: (span: unknown) => unknown) => unknown>() + .mockImplementation((_context: unknown, callback: (span: unknown) => unknown) => + callback({ setStatus: vi.fn<() => unknown>() }), + ), + addBreadcrumb: vi.fn<(...args: unknown[]) => unknown>(), +})); + +vi.spyOn(globalThis, "fetch").mockResolvedValue({ + ok: true, + json: () => Promise.resolve({}), + text: () => Promise.resolve(""), +} as Response); + +describe("alchemyQueue worker processor", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("should call Alchemy API to update webhook addresses", async () => { + const job = { + name: AlchemyJob.ADD_SUBSCRIBER, + data: { + account: "0x123", + webhookId: "hook-123", + }, + } as unknown as Job; + + await processor(job); + + expect(fetch).toHaveBeenCalledWith( + "https://dashboard.alchemy.com/api/update-webhook-addresses", + expect.objectContaining({ + method: "PATCH", + headers: expect.objectContaining({ "X-Alchemy-Token": "mock-token" }) as Record, + body: JSON.stringify({ + webhook_id: "hook-123", + addresses_to_add: ["0x123"], + addresses_to_remove: [], + }), + }), + ); + expect(startSpan).toHaveBeenCalledWith( + expect.objectContaining({ name: "alchemy.processor", op: "queue.process" }), + expect.any(Function), + ); + }); + + it("should throw an error for unknown job names", async () => { + const job = { name: "unknown", data: {} } as unknown as Job; + await expect(processor(job)).rejects.toThrow("Unknown job name: unknown"); + }); + + it("should throw an error if Alchemy API call fails", async () => { + vi.spyOn(globalThis, "fetch").mockResolvedValueOnce({ + ok: false, + status: 500, + text: () => Promise.resolve("Internal Server Error"), + } as Response); + + const job = { + name: AlchemyJob.ADD_SUBSCRIBER, + data: { + account: "0x123", + webhookId: "hook-123", + }, + } as unknown as Job; + + await expect(processor(job)).rejects.toThrow("500 Internal Server Error"); + }); +}); diff --git a/server/test/utils/createCredential.test.ts b/server/test/utils/createCredential.test.ts new file mode 100644 index 000000000..838c99b7b --- /dev/null +++ b/server/test/utils/createCredential.test.ts @@ -0,0 +1,114 @@ +import { captureException } from "@sentry/core"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { AlchemyJob } from "../../queues/constants"; +import createCredential from "../../utils/createCredential"; + +import type { Context } from "hono"; + +const mocks = vi.hoisted(() => ({ + webhookId: { value: "webhook-id" as string | undefined }, + addJob: vi.fn().mockResolvedValue({}), +})); + +// Mock dependencies +vi.mock("../../queues/alchemyQueue", () => ({ + alchemyQueue: { + add: mocks.addJob, + }, +})); + +vi.mock("../../database", () => ({ + default: { + insert: vi.fn().mockReturnValue({ values: vi.fn() }), + }, + credentials: {}, +})); + +vi.mock("hono/cookie", () => ({ + setSignedCookie: vi.fn(), +})); + +vi.mock("@sentry/core", () => ({ + setUser: vi.fn(), + captureException: vi.fn(), +})); + +vi.mock("@sentry/node", { spy: true }); + +vi.mock("../../utils/segment", () => ({ + identify: vi.fn(), +})); + +vi.mock("../../utils/authSecret", () => ({ + default: "secret", +})); + +vi.mock("../../hooks/activity", () => ({ + get webhookId() { + return mocks.webhookId.value; + }, +})); + +vi.mock("../../utils/sardine", () => ({ + customer: vi.fn().mockResolvedValue({}), +})); + +// Mock global fetch to avoid actual network calls +vi.spyOn(globalThis, "fetch").mockResolvedValue({ + ok: true, + json: () => Promise.resolve({}), + text: () => Promise.resolve(""), +} as Response); + +describe("createCredential - job queue", () => { + const mockContext = { req: {}, json: vi.fn<(data: unknown) => Response>() } as unknown as Context; + + beforeEach(() => { + vi.clearAllMocks(); + mocks.webhookId.value = "webhook-id"; + }); + + it("should add a job to alchemyQueue when credential is created", async () => { + const credentialId = "0x1234567890123456789012345678901234567890"; + + await createCredential(mockContext, credentialId); + + expect(mocks.addJob).toHaveBeenCalledWith( + AlchemyJob.ADD_SUBSCRIBER, + expect.objectContaining({ + account: expect.stringMatching(/^0x/) as string, + webhookId: "webhook-id", + }), + ); + }); + + it("should capture exception when alchemyQueue.add fails", async () => { + const credentialId = "0x1234567890123456789012345678901234567890"; + const error = new Error("queue error"); + mocks.addJob.mockRejectedValueOnce(error); + + await createCredential(mockContext, credentialId); + + expect(vi.mocked(captureException)).toHaveBeenCalledWith( + error, + expect.objectContaining({ + level: "error", + extra: expect.objectContaining({ + job: AlchemyJob.ADD_SUBSCRIBER, + webhookId: "webhook-id", + }) as Record, + }), + ); + }); + + it("should not add job to alchemyQueue if webhookId is undefined", async () => { + const credentialId = "0x1234567890123456789012345678901234567890"; + mocks.webhookId.value = undefined; + + await createCredential(mockContext, credentialId); + + expect(mocks.addJob).not.toHaveBeenCalled(); + expect(vi.mocked(captureException)).not.toHaveBeenCalled(); + }); +}); diff --git a/server/utils/createCredential.ts b/server/utils/createCredential.ts index 655bc55a3..b69119a13 100644 --- a/server/utils/createCredential.ts +++ b/server/utils/createCredential.ts @@ -8,18 +8,18 @@ import deriveAddress from "@exactly/common/deriveAddress"; import domain from "@exactly/common/domain"; import { exaAccountFactoryAddress } from "@exactly/common/generated/chain"; import { Address } from "@exactly/common/validation"; +import type { WebAuthnCredential } from "@simplewebauthn/server"; +import type { Context } from "hono"; -import { updateWebhookAddresses } from "./alchemy"; +import database from "../database"; +import { credentials } from "../database/schema"; +import { webhookId } from "../hooks/activity"; +import { alchemyQueue } from "../queues/alchemyQueue"; +import { AlchemyJob } from "../queues/constants"; import authSecret from "./authSecret"; import decodePublicKey from "./decodePublicKey"; import { customer } from "./sardine"; import { identify } from "./segment"; -import database from "../database"; -import { credentials } from "../database/schema"; -import { webhookId } from "../hooks/activity"; - -import type { WebAuthnCredential } from "@simplewebauthn/server"; -import type { Context } from "hono"; export default async function createCredential( c: Context, @@ -52,7 +52,13 @@ export default async function createCredential( ? { sameSite: "lax", secure: false } : { domain, sameSite: "none", secure: true, partitioned: true }), }), - updateWebhookAddresses(webhookId, [account]).catch((error: unknown) => captureException(error)), + webhookId + ? alchemyQueue + .add(AlchemyJob.ADD_SUBSCRIBER, { account, webhookId }) + .catch((error: unknown) => + captureException(error, { level: "error", extra: { job: AlchemyJob.ADD_SUBSCRIBER, account, webhookId } }), + ) + : undefined, customer({ flow: { name: "signup", type: "signup" }, customer: { id: credentialId } }).catch((error: unknown) => captureException(error, { level: "error" }), ),