From adf94a73f810b1b4ad81c98b9a99166fa089028b Mon Sep 17 00:00:00 2001 From: kegren Date: Sun, 4 Jan 2026 20:16:04 +0100 Subject: [PATCH] feat(auth+queue+jobs): add email verification job, split queue roles, fix query keys and a lot of small fixes Enable email verification and enqueue sending via job producer - Require email verification for email/password signups and enable send-on-sign-in. Hook better-auth's sendVerificationEmail to the new enqueueSendVerificationEmail producer so verification emails are processed asynchronously. Introduce typed job payload and robust worker handler - Change SendVerificationEmailPayload to include a nested user object (id, email, name) to carry user details instead of separate fields. - Wrap sendEmail call in try/catch and rethrow errors to surface job failures to the worker/pg-boss retry mechanism. Split pg-boss client roles and lifecycle management - Replace single boss with separate producerClient and workerClient. - Add getQueueClient() for lightweight enqueueing in the main app process and initQueueWorker() to start a worker and register jobs. - Add shutdownQueue() to gracefully stop clients on process exit. - Add runtime checks for DATABASE_URL and improved error logging. Misc - Add import of enqueueSendVerificationEmail in auth integration. - Adjust logging messages to clarify producer vs worker startup. --- src/features/auth/components/sign-up-form.tsx | 6 +- .../auth/components/social-auth-buttons.tsx | 15 ++-- src/features/jobs/definitions/email.job.tsx | 42 +++++----- src/features/jobs/producers/email.producer.ts | 15 ++++ .../todos/components/todo-skeleton.tsx | 12 +++ src/lib/email/send-email.ts | 4 +- src/lib/email/use-send.ts | 2 +- src/lib/server/auth.ts | 12 ++- src/lib/server/db.ts | 7 +- src/lib/server/queue.ts | 78 +++++++++++++++---- src/routes/_authenticated.tsx | 2 +- src/routes/_authenticated/todos.tsx | 4 + src/worker/index.ts | 25 ++++-- 13 files changed, 165 insertions(+), 59 deletions(-) create mode 100644 src/features/jobs/producers/email.producer.ts create mode 100644 src/features/todos/components/todo-skeleton.tsx diff --git a/src/features/auth/components/sign-up-form.tsx b/src/features/auth/components/sign-up-form.tsx index 84212ff..1c1ebcb 100644 --- a/src/features/auth/components/sign-up-form.tsx +++ b/src/features/auth/components/sign-up-form.tsx @@ -38,14 +38,16 @@ export default function SignUpForm() { await authClient.signUp.email( { ...data, + callbackURL: "/auth/email-verified", }, { headers: { "x-turnstile-token": turnstileToken }, onSuccess: () => { - queryClient.invalidateQueries({ queryKey: authKeys.session() }); + // manually remove query data to clear the cache and refetch + queryClient.removeQueries({ queryKey: authKeys.session() }); resetTurnstile(); - navigate({ to: "/dashboard" }); + navigate({ to: "/auth/verify-email" }); toast.success("Sign up successful"); }, diff --git a/src/features/auth/components/social-auth-buttons.tsx b/src/features/auth/components/social-auth-buttons.tsx index f86801c..7feb0dd 100644 --- a/src/features/auth/components/social-auth-buttons.tsx +++ b/src/features/auth/components/social-auth-buttons.tsx @@ -38,24 +38,23 @@ export default function SocialAuthButtons() {
+
diff --git a/src/features/jobs/definitions/email.job.tsx b/src/features/jobs/definitions/email.job.tsx index a9fa7f6..c7eecbd 100644 --- a/src/features/jobs/definitions/email.job.tsx +++ b/src/features/jobs/definitions/email.job.tsx @@ -6,9 +6,11 @@ import { sendEmail } from "@/lib/email/send-email"; export const SEND_VERIFICATION_EMAIL = "email.send-verification"; export const SendVerificationEmailPayload = z.object({ - userId: z.string(), - email: z.email(), - name: z.string(), + user: z.object({ + id: z.string(), + email: z.email(), + name: z.string(), + }), verifyUrl: z.url(), }); @@ -29,24 +31,22 @@ export async function registerEmailJobs(boss: PgBoss) { async ([job]) => { const payload = SendVerificationEmailPayload.parse(job.data); - await sendEmail({ - to: payload.email, - subject: "Verify your email", - template: ( - - ), - }); - - return { sent: true, timestamp: new Date().toISOString() }; + try { + await sendEmail({ + to: payload.user.email, + subject: "Verify your email", + template: ( + + ), + }); + + return { sent: true, timestamp: new Date().toISOString() }; + } catch (error) { + throw new Error((error as Error).message); + } } ); } - -export async function enqueueSendVerificationEmail( - boss: PgBoss, - payload: SendVerificationEmailPayload -) { - await boss.send(SEND_VERIFICATION_EMAIL, payload, { - priority: 1, // Higher priority for verification emails - }); -} diff --git a/src/features/jobs/producers/email.producer.ts b/src/features/jobs/producers/email.producer.ts new file mode 100644 index 0000000..b93709d --- /dev/null +++ b/src/features/jobs/producers/email.producer.ts @@ -0,0 +1,15 @@ +import { + SEND_VERIFICATION_EMAIL, + type SendVerificationEmailPayload, +} from "@/features/jobs/definitions/email.job"; +import { getQueueClient } from "@/lib/server/queue"; + +export async function enqueueSendVerificationEmail( + payload: SendVerificationEmailPayload +) { + const client = await getQueueClient(); + + await client.send(SEND_VERIFICATION_EMAIL, payload, { + priority: 1, // Higher priority for verification emails + }); +} diff --git a/src/features/todos/components/todo-skeleton.tsx b/src/features/todos/components/todo-skeleton.tsx new file mode 100644 index 0000000..5d7a34a --- /dev/null +++ b/src/features/todos/components/todo-skeleton.tsx @@ -0,0 +1,12 @@ +export default function TodoSkeleton() { + return ( +
+
+
+ {[1, 2, 3].map((i) => ( +
+ ))} +
+
+ ); +} diff --git a/src/lib/email/send-email.ts b/src/lib/email/send-email.ts index 55de40e..b54d51c 100644 --- a/src/lib/email/send-email.ts +++ b/src/lib/email/send-email.ts @@ -14,12 +14,12 @@ export async function sendEmail({ to, subject, template, - from, + from = env.USESEND_FROM_EMAIL, }: SendEmailOptions) { const html = await render(template); return useSend.emails.send({ - from: from ?? env.EMAIL_FROM ?? "noreply@yourdomain.com", + from, to: Array.isArray(to) ? to : [to], subject, html, diff --git a/src/lib/email/use-send.ts b/src/lib/email/use-send.ts index e67b302..679aa51 100644 --- a/src/lib/email/use-send.ts +++ b/src/lib/email/use-send.ts @@ -1,4 +1,4 @@ import { UseSend } from "usesend-js"; import { env } from "@/lib/server/env"; -export const useSend = new UseSend(env.USESEND_API_KEY); +export const useSend = new UseSend(env.USESEND_API_KEY, env.USESEND_BASE_URL); diff --git a/src/lib/server/auth.ts b/src/lib/server/auth.ts index 1bfa4f6..334e726 100644 --- a/src/lib/server/auth.ts +++ b/src/lib/server/auth.ts @@ -2,6 +2,7 @@ import { drizzleAdapter } from "better-auth/adapters/drizzle"; import { betterAuth } from "better-auth/minimal"; import { admin } from "better-auth/plugins"; import { tanstackStartCookies } from "better-auth/tanstack-start"; +import { enqueueSendVerificationEmail } from "@/features/jobs/producers/email.producer"; import { db } from "@/lib/server/db"; import { env } from "@/lib/server/env"; @@ -29,7 +30,16 @@ export const auth = betterAuth({ }, emailAndPassword: { enabled: true, - requireEmailVerification: false, + requireEmailVerification: true, + }, + emailVerification: { + sendOnSignIn: true, + sendVerificationEmail: ({ user, url }) => { + return enqueueSendVerificationEmail({ + user, + verifyUrl: url, + }); + }, }, rateLimit: { enabled: true, diff --git a/src/lib/server/db.ts b/src/lib/server/db.ts index 0275f3b..89bc815 100644 --- a/src/lib/server/db.ts +++ b/src/lib/server/db.ts @@ -10,7 +10,12 @@ import { import { todo } from "@/db/schema/todos"; import { env } from "@/lib/server/env"; -const client = postgres(env.DATABASE_URL); +const client = postgres(env.DATABASE_URL, { + max: 10, // Maximum connections + idle_timeout: 20, // Seconds before idle connection is closed + connect_timeout: 10, // Connection timeout in seconds + prepare: false, // Disable prepared statements for serverless +}); export const db = drizzle(client, { schema: { user, session, account, verification, rateLimit, todo }, diff --git a/src/lib/server/queue.ts b/src/lib/server/queue.ts index 383d19c..bdbc0e0 100644 --- a/src/lib/server/queue.ts +++ b/src/lib/server/queue.ts @@ -1,32 +1,78 @@ import { PgBoss } from "pg-boss"; import { registerJobs } from "@/features/jobs/register"; -let boss: PgBoss | null = null; +let producerClient: PgBoss | null = null; +let workerClient: PgBoss | null = null; -async function createQueue(databaseUrl: string) { - boss = new PgBoss(databaseUrl); +function getDatabaseUrl(): string { + if (!process.env.DATABASE_URL) { + throw new Error("DATABASE_URL is not set"); + } + return process.env.DATABASE_URL; +} - boss.on("error", (error) => { - // Wire up to your error tracking system here - console.error(error); +/** + * Get a queue client for enqueueing jobs (producer mode). + * This is lightweight - it only connects to pg-boss without registering handlers. + * Safe to use in the main app process. + */ +export async function getQueueClient(): Promise { + if (producerClient) { + return producerClient; + } + + producerClient = new PgBoss(getDatabaseUrl()); + + producerClient.on("error", (error) => { + console.error("[pg-boss] Producer client error:", error); }); - await boss.start(); + await producerClient.start(); - await registerJobs(boss); + console.log("[pg-boss] Producer client connected"); + return producerClient; +} - console.log("[pg-boss] Queue started"); - return boss; +/** + * Initialize the queue worker (worker mode). + * This connects to pg-boss AND registers all job handlers. + * Should only be called from the dedicated worker process. + */ +export async function initQueueWorker(): Promise { + if (workerClient) { + return workerClient; + } + + workerClient = new PgBoss(getDatabaseUrl()); + + workerClient.on("error", (error) => { + console.error("[pg-boss] Worker error:", error); + }); + + await workerClient.start(); + await registerJobs(workerClient); + + console.log("[pg-boss] Worker initialized with job handlers"); + return workerClient; } -export async function initQueue() { - if (!process.env.DATABASE_URL) { - throw new Error("DATABASE_URL is not set"); +/** + * Gracefully shutdown queue connections. + * Call this on process termination. + */ +export async function shutdownQueue(): Promise { + const shutdownPromises: Promise[] = []; + + if (producerClient) { + shutdownPromises.push(producerClient.stop()); + producerClient = null; } - if (!boss) { - boss = await createQueue(process.env.DATABASE_URL as string); + if (workerClient) { + shutdownPromises.push(workerClient.stop()); + workerClient = null; } - return boss; + await Promise.all(shutdownPromises); + console.log("[pg-boss] Queue connections closed"); } diff --git a/src/routes/_authenticated.tsx b/src/routes/_authenticated.tsx index 6bc5b89..82f068e 100644 --- a/src/routes/_authenticated.tsx +++ b/src/routes/_authenticated.tsx @@ -49,7 +49,7 @@ function RouteComponent() { await authClient.signOut({ fetchOptions: { onSuccess: () => { - queryClient.invalidateQueries({ queryKey: authKeys.session() }); + queryClient.removeQueries({ queryKey: authKeys.session() }); navigate({ to: "/" }); }, }, diff --git a/src/routes/_authenticated/todos.tsx b/src/routes/_authenticated/todos.tsx index 63c56f5..79f798a 100644 --- a/src/routes/_authenticated/todos.tsx +++ b/src/routes/_authenticated/todos.tsx @@ -14,15 +14,19 @@ import { prefetchTodos } from "@/features/todos/api/todo-queries"; import TodoForm from "@/features/todos/components/todo-form"; import TodoItem from "@/features/todos/components/todo-item"; import Stats from "@/features/todos/components/todo-stats"; +import TodoSkeleton from "@/features/todos/components/todo-skeleton"; export const Route = createFileRoute("/_authenticated/todos")({ component: TodosPage, loader: async ({ context }) => { await prefetchTodos(context.queryClient, context.user.id); }, + pendingComponent: TodoSkeleton, ssr: false, }); + + function TodosPage() { const { user } = Route.useRouteContext(); const queryClient = useQueryClient(); diff --git a/src/worker/index.ts b/src/worker/index.ts index 3250e77..f78aceb 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,11 +1,24 @@ -import { initQueue } from "@/lib/server/queue"; +import { initQueueWorker, shutdownQueue } from "@/lib/server/queue"; -export async function startQueueWorker() { - await initQueue(); - console.log("[pg-boss] Queue worker started"); +async function startWorker() { + await initQueueWorker(); + console.log("[pg-boss] Queue worker started and listening for jobs"); } -startQueueWorker().catch((error) => { - console.error("[pg-boss] Error starting queue worker", error); +// Graceful shutdown handlers +process.on("SIGTERM", async () => { + console.log("[pg-boss] Received SIGTERM, shutting down..."); + await shutdownQueue(); + process.exit(0); +}); + +process.on("SIGINT", async () => { + console.log("[pg-boss] Received SIGINT, shutting down..."); + await shutdownQueue(); + process.exit(0); +}); + +startWorker().catch((error) => { + console.error("[pg-boss] Error starting queue worker:", error); process.exit(1); });