diff --git a/.cursor/rules/trigger.advanced-tasks.mdc b/.cursor/rules/trigger.advanced-tasks.mdc new file mode 100644 index 000000000..78551c1ba --- /dev/null +++ b/.cursor/rules/trigger.advanced-tasks.mdc @@ -0,0 +1,456 @@ +--- +description: Comprehensive rules to help you write advanced Trigger.dev tasks +globs: **/trigger/**/*.ts +alwaysApply: false +--- +# Trigger.dev Advanced Tasks (v4) + +**Advanced patterns and features for writing tasks** + +## Tags & Organization + +```ts +import { task, tags } from "@trigger.dev/sdk"; + +export const processUser = task({ + id: "process-user", + run: async (payload: { userId: string; orgId: string }, { ctx }) => { + // Add tags during execution + await tags.add(`user_${payload.userId}`); + await tags.add(`org_${payload.orgId}`); + + return { processed: true }; + }, +}); + +// Trigger with tags +await processUser.trigger( + { userId: "123", orgId: "abc" }, + { tags: ["priority", "user_123", "org_abc"] } // Max 10 tags per run +); + +// Subscribe to tagged runs +for await (const run of runs.subscribeToRunsWithTag("user_123")) { + console.log(`User task ${run.id}: ${run.status}`); +} +``` + +**Tag Best Practices:** + +- Use prefixes: `user_123`, `org_abc`, `video:456` +- Max 10 tags per run, 1-64 characters each +- Tags don't propagate to child tasks automatically + +## Concurrency & Queues + +```ts +import { task, queue } from "@trigger.dev/sdk"; + +// Shared queue for related tasks +const emailQueue = queue({ + name: "email-processing", + concurrencyLimit: 5, // Max 5 emails processing simultaneously +}); + +// Task-level concurrency +export const oneAtATime = task({ + id: "sequential-task", + queue: { concurrencyLimit: 1 }, // Process one at a time + run: async (payload) => { + // Critical section - only one instance runs + }, +}); + +// Per-user concurrency +export const processUserData = task({ + id: "process-user-data", + run: async (payload: { userId: string }) => { + // Override queue with user-specific concurrency + await childTask.trigger(payload, { + queue: { + name: `user-${payload.userId}`, + concurrencyLimit: 2, + }, + }); + }, +}); + +export const emailTask = task({ + id: "send-email", + queue: emailQueue, // Use shared queue + run: async (payload: { to: string }) => { + // Send email logic + }, +}); +``` + +## Error Handling & Retries + +```ts +import { task, retry, AbortTaskRunError } from "@trigger.dev/sdk"; + +export const resilientTask = task({ + id: "resilient-task", + retry: { + maxAttempts: 10, + factor: 1.8, // Exponential backoff multiplier + minTimeoutInMs: 500, + maxTimeoutInMs: 30_000, + randomize: false, + }, + catchError: async ({ error, ctx }) => { + // Custom error handling + if (error.code === "FATAL_ERROR") { + throw new AbortTaskRunError("Cannot retry this error"); + } + + // Log error details + console.error(`Task ${ctx.task.id} failed:`, error); + + // Allow retry by returning nothing + return { retryAt: new Date(Date.now() + 60000) }; // Retry in 1 minute + }, + run: async (payload) => { + // Retry specific operations + const result = await retry.onThrow( + async () => { + return await unstableApiCall(payload); + }, + { maxAttempts: 3 } + ); + + // Conditional HTTP retries + const response = await retry.fetch("https://api.example.com", { + retry: { + maxAttempts: 5, + condition: (response, error) => { + return response?.status === 429 || response?.status >= 500; + }, + }, + }); + + return result; + }, +}); +``` + +## Machines & Performance + +```ts +export const heavyTask = task({ + id: "heavy-computation", + machine: { preset: "large-2x" }, // 8 vCPU, 16 GB RAM + maxDuration: 1800, // 30 minutes timeout + run: async (payload, { ctx }) => { + // Resource-intensive computation + if (ctx.machine.preset === "large-2x") { + // Use all available cores + return await parallelProcessing(payload); + } + + return await standardProcessing(payload); + }, +}); + +// Override machine when triggering +await heavyTask.trigger(payload, { + machine: { preset: "medium-1x" }, // Override for this run +}); +``` + +**Machine Presets:** + +- `micro`: 0.25 vCPU, 0.25 GB RAM +- `small-1x`: 0.5 vCPU, 0.5 GB RAM (default) +- `small-2x`: 1 vCPU, 1 GB RAM +- `medium-1x`: 1 vCPU, 2 GB RAM +- `medium-2x`: 2 vCPU, 4 GB RAM +- `large-1x`: 4 vCPU, 8 GB RAM +- `large-2x`: 8 vCPU, 16 GB RAM + +## Idempotency + +```ts +import { task, idempotencyKeys } from "@trigger.dev/sdk"; + +export const paymentTask = task({ + id: "process-payment", + retry: { + maxAttempts: 3, + }, + run: async (payload: { orderId: string; amount: number }) => { + // Automatically scoped to this task run, so if the task is retried, the idempotency key will be the same + const idempotencyKey = await idempotencyKeys.create(`payment-${payload.orderId}`); + + // Ensure payment is processed only once + await chargeCustomer.trigger(payload, { + idempotencyKey, + idempotencyKeyTTL: "24h", // Key expires in 24 hours + }); + }, +}); + +// Payload-based idempotency +import { createHash } from "node:crypto"; + +function createPayloadHash(payload: any): string { + const hash = createHash("sha256"); + hash.update(JSON.stringify(payload)); + return hash.digest("hex"); +} + +export const deduplicatedTask = task({ + id: "deduplicated-task", + run: async (payload) => { + const payloadHash = createPayloadHash(payload); + const idempotencyKey = await idempotencyKeys.create(payloadHash); + + await processData.trigger(payload, { idempotencyKey }); + }, +}); +``` + +## Metadata & Progress Tracking + +```ts +import { task, metadata } from "@trigger.dev/sdk"; + +export const batchProcessor = task({ + id: "batch-processor", + run: async (payload: { items: any[] }, { ctx }) => { + const totalItems = payload.items.length; + + // Initialize progress metadata + metadata + .set("progress", 0) + .set("totalItems", totalItems) + .set("processedItems", 0) + .set("status", "starting"); + + const results = []; + + for (let i = 0; i < payload.items.length; i++) { + const item = payload.items[i]; + + // Process item + const result = await processItem(item); + results.push(result); + + // Update progress + const progress = ((i + 1) / totalItems) * 100; + metadata + .set("progress", progress) + .increment("processedItems", 1) + .append("logs", `Processed item ${i + 1}/${totalItems}`) + .set("currentItem", item.id); + } + + // Final status + metadata.set("status", "completed"); + + return { results, totalProcessed: results.length }; + }, +}); + +// Update parent metadata from child task +export const childTask = task({ + id: "child-task", + run: async (payload, { ctx }) => { + // Update parent task metadata + metadata.parent.set("childStatus", "processing"); + metadata.root.increment("childrenCompleted", 1); + + return { processed: true }; + }, +}); +``` + +## Advanced Triggering + +### Frontend Triggering (React) + +```tsx +"use client"; +import { useTaskTrigger } from "@trigger.dev/react-hooks"; +import type { myTask } from "../trigger/tasks"; + +function TriggerButton({ accessToken }: { accessToken: string }) { + const { submit, handle, isLoading } = useTaskTrigger("my-task", { accessToken }); + + return ( + + ); +} +``` + +### Large Payloads + +```ts +// For payloads > 512KB (max 10MB) +export const largeDataTask = task({ + id: "large-data-task", + run: async (payload: { dataUrl: string }) => { + // Trigger.dev automatically handles large payloads + // For > 10MB, use external storage + const response = await fetch(payload.dataUrl); + const largeData = await response.json(); + + return { processed: largeData.length }; + }, +}); + +// Best practice: Use presigned URLs for very large files +await largeDataTask.trigger({ + dataUrl: "https://s3.amazonaws.com/bucket/large-file.json?presigned=true", +}); +``` + +### Advanced Options + +```ts +await myTask.trigger(payload, { + delay: "2h30m", // Delay execution + ttl: "24h", // Expire if not started within 24 hours + priority: 100, // Higher priority (time offset in seconds) + tags: ["urgent", "user_123"], + metadata: { source: "api", version: "v2" }, + queue: { + name: "priority-queue", + concurrencyLimit: 10, + }, + idempotencyKey: "unique-operation-id", + idempotencyKeyTTL: "1h", + machine: { preset: "large-1x" }, + maxAttempts: 5, +}); +``` + +## Hidden Tasks + +```ts +// Hidden task - not exported, only used internally +const internalProcessor = task({ + id: "internal-processor", + run: async (payload: { data: string }) => { + return { processed: payload.data.toUpperCase() }; + }, +}); + +// Public task that uses hidden task +export const publicWorkflow = task({ + id: "public-workflow", + run: async (payload: { input: string }) => { + // Use hidden task internally + const result = await internalProcessor.triggerAndWait({ + data: payload.input, + }); + + if (result.ok) { + return { output: result.output.processed }; + } + + throw new Error("Internal processing failed"); + }, +}); +``` + +## Logging & Tracing + +```ts +import { task, logger } from "@trigger.dev/sdk"; + +export const tracedTask = task({ + id: "traced-task", + run: async (payload, { ctx }) => { + logger.info("Task started", { userId: payload.userId }); + + // Custom trace with attributes + const user = await logger.trace( + "fetch-user", + async (span) => { + span.setAttribute("user.id", payload.userId); + span.setAttribute("operation", "database-fetch"); + + const userData = await database.findUser(payload.userId); + span.setAttribute("user.found", !!userData); + + return userData; + }, + { userId: payload.userId } + ); + + logger.debug("User fetched", { user: user.id }); + + try { + const result = await processUser(user); + logger.info("Processing completed", { result }); + return result; + } catch (error) { + logger.error("Processing failed", { + error: error.message, + userId: payload.userId, + }); + throw error; + } + }, +}); +``` + +## Usage Monitoring + +```ts +import { task, usage } from "@trigger.dev/sdk"; + +export const monitoredTask = task({ + id: "monitored-task", + run: async (payload) => { + // Get current run cost + const currentUsage = await usage.getCurrent(); + logger.info("Current cost", { + costInCents: currentUsage.costInCents, + durationMs: currentUsage.durationMs, + }); + + // Measure specific operation + const { result, compute } = await usage.measure(async () => { + return await expensiveOperation(payload); + }); + + logger.info("Operation cost", { + costInCents: compute.costInCents, + durationMs: compute.durationMs, + }); + + return result; + }, +}); +``` + +## Run Management + +```ts +// Cancel runs +await runs.cancel("run_123"); + +// Replay runs with same payload +await runs.replay("run_123"); + +// Retrieve run with cost details +const run = await runs.retrieve("run_123"); +console.log(`Cost: ${run.costInCents} cents, Duration: ${run.durationMs}ms`); +``` + +## Best Practices + +- **Concurrency**: Use queues to prevent overwhelming external services +- **Retries**: Configure exponential backoff for transient failures +- **Idempotency**: Always use for payment/critical operations +- **Metadata**: Track progress for long-running tasks +- **Machines**: Match machine size to computational requirements +- **Tags**: Use consistent naming patterns for filtering +- **Large Payloads**: Use external storage for files > 10MB +- **Error Handling**: Distinguish between retryable and fatal errors + +Design tasks to be stateless, idempotent, and resilient to failures. Use metadata for state tracking and queues for resource management. diff --git a/.cursor/rules/trigger.basic.mdc b/.cursor/rules/trigger.basic.mdc new file mode 100644 index 000000000..3d96e6657 --- /dev/null +++ b/.cursor/rules/trigger.basic.mdc @@ -0,0 +1,190 @@ +--- +description: Only the most important rules for writing basic Trigger.dev tasks +globs: **/trigger/**/*.ts +alwaysApply: false +--- +# Trigger.dev Basic Tasks (v4) + +**MUST use `@trigger.dev/sdk` (v4), NEVER `client.defineJob`** + +## Basic Task + +```ts +import { task } from "@trigger.dev/sdk"; + +export const processData = task({ + id: "process-data", + retry: { + maxAttempts: 10, + factor: 1.8, + minTimeoutInMs: 500, + maxTimeoutInMs: 30_000, + randomize: false, + }, + run: async (payload: { userId: string; data: any[] }) => { + // Task logic - runs for long time, no timeouts + console.log(`Processing ${payload.data.length} items for user ${payload.userId}`); + return { processed: payload.data.length }; + }, +}); +``` + +## Schema Task (with validation) + +```ts +import { schemaTask } from "@trigger.dev/sdk"; +import { z } from "zod"; + +export const validatedTask = schemaTask({ + id: "validated-task", + schema: z.object({ + name: z.string(), + age: z.number(), + email: z.string().email(), + }), + run: async (payload) => { + // Payload is automatically validated and typed + return { message: `Hello ${payload.name}, age ${payload.age}` }; + }, +}); +``` + +## Scheduled Task + +```ts +import { schedules } from "@trigger.dev/sdk"; + +const dailyReport = schedules.task({ + id: "daily-report", + cron: "0 9 * * *", // Daily at 9:00 AM UTC + // or with timezone: cron: { pattern: "0 9 * * *", timezone: "America/New_York" }, + run: async (payload) => { + console.log("Scheduled run at:", payload.timestamp); + console.log("Last run was:", payload.lastTimestamp); + console.log("Next 5 runs:", payload.upcoming); + + // Generate daily report logic + return { reportGenerated: true, date: payload.timestamp }; + }, +}); +``` + +## Triggering Tasks + +### From Backend Code + +```ts +import { tasks } from "@trigger.dev/sdk"; +import type { processData } from "./trigger/tasks"; + +// Single trigger +const handle = await tasks.trigger("process-data", { + userId: "123", + data: [{ id: 1 }, { id: 2 }], +}); + +// Batch trigger +const batchHandle = await tasks.batchTrigger("process-data", [ + { payload: { userId: "123", data: [{ id: 1 }] } }, + { payload: { userId: "456", data: [{ id: 2 }] } }, +]); +``` + +### From Inside Tasks (with Result handling) + +```ts +export const parentTask = task({ + id: "parent-task", + run: async (payload) => { + // Trigger and continue + const handle = await childTask.trigger({ data: "value" }); + + // Trigger and wait - returns Result object, NOT task output + const result = await childTask.triggerAndWait({ data: "value" }); + if (result.ok) { + console.log("Task output:", result.output); // Actual task return value + } else { + console.error("Task failed:", result.error); + } + + // Quick unwrap (throws on error) + const output = await childTask.triggerAndWait({ data: "value" }).unwrap(); + + // Batch trigger and wait + const results = await childTask.batchTriggerAndWait([ + { payload: { data: "item1" } }, + { payload: { data: "item2" } }, + ]); + + for (const run of results) { + if (run.ok) { + console.log("Success:", run.output); + } else { + console.log("Failed:", run.error); + } + } + }, +}); + +export const childTask = task({ + id: "child-task", + run: async (payload: { data: string }) => { + return { processed: payload.data }; + }, +}); +``` + +> Never wrap triggerAndWait or batchTriggerAndWait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks. + +## Waits + +```ts +import { task, wait } from "@trigger.dev/sdk"; + +export const taskWithWaits = task({ + id: "task-with-waits", + run: async (payload) => { + console.log("Starting task"); + + // Wait for specific duration + await wait.for({ seconds: 30 }); + await wait.for({ minutes: 5 }); + await wait.for({ hours: 1 }); + await wait.for({ days: 1 }); + + // Wait until specific date + await wait.until({ date: new Date("2024-12-25") }); + + // Wait for token (from external system) + await wait.forToken({ + token: "user-approval-token", + timeoutInSeconds: 3600, // 1 hour timeout + }); + + console.log("All waits completed"); + return { status: "completed" }; + }, +}); +``` + +> Never wrap wait calls in a Promise.all or Promise.allSettled as this is not supported in Trigger.dev tasks. + +## Key Points + +- **Result vs Output**: `triggerAndWait()` returns a `Result` object with `ok`, `output`, `error` properties - NOT the direct task output +- **Type safety**: Use `import type` for task references when triggering from backend +- **Waits > 5 seconds**: Automatically checkpointed, don't count toward compute usage + +## NEVER Use (v2 deprecated) + +```ts +// BREAKS APPLICATION +client.defineJob({ + id: "job-id", + run: async (payload, io) => { + /* ... */ + }, +}); +``` + +Use v4 SDK (`@trigger.dev/sdk`), check `result.ok` before accessing `result.output` diff --git a/.cursor/rules/trigger.config.mdc b/.cursor/rules/trigger.config.mdc new file mode 100644 index 000000000..54e400d73 --- /dev/null +++ b/.cursor/rules/trigger.config.mdc @@ -0,0 +1,351 @@ +--- +description: Configure your Trigger.dev project with a trigger.config.ts file +globs: **/trigger.config.ts +alwaysApply: false +--- +# Trigger.dev Configuration (v4) + +**Complete guide to configuring `trigger.config.ts` with build extensions** + +## Basic Configuration + +```ts +import { defineConfig } from "@trigger.dev/sdk"; + +export default defineConfig({ + project: "", // Required: Your project reference + dirs: ["./trigger"], // Task directories + runtime: "node", // "node", "node-22", or "bun" + logLevel: "info", // "debug", "info", "warn", "error" + + // Default retry settings + retries: { + enabledInDev: false, + default: { + maxAttempts: 3, + minTimeoutInMs: 1000, + maxTimeoutInMs: 10000, + factor: 2, + randomize: true, + }, + }, + + // Build configuration + build: { + autoDetectExternal: true, + keepNames: true, + minify: false, + extensions: [], // Build extensions go here + }, + + // Global lifecycle hooks + onStart: async ({ payload, ctx }) => { + console.log("Global task start"); + }, + onSuccess: async ({ payload, output, ctx }) => { + console.log("Global task success"); + }, + onFailure: async ({ payload, error, ctx }) => { + console.log("Global task failure"); + }, +}); +``` + +## Build Extensions + +### Database & ORM + +#### Prisma + +```ts +import { prismaExtension } from "@trigger.dev/build/extensions/prisma"; + +extensions: [ + prismaExtension({ + schema: "prisma/schema.prisma", + version: "5.19.0", // Optional: specify version + migrate: true, // Run migrations during build + directUrlEnvVarName: "DIRECT_DATABASE_URL", + typedSql: true, // Enable TypedSQL support + }), +]; +``` + +#### TypeScript Decorators (for TypeORM) + +```ts +import { emitDecoratorMetadata } from "@trigger.dev/build/extensions/typescript"; + +extensions: [ + emitDecoratorMetadata(), // Enables decorator metadata +]; +``` + +### Scripting Languages + +#### Python + +```ts +import { pythonExtension } from "@trigger.dev/build/extensions/python"; + +extensions: [ + pythonExtension({ + scripts: ["./python/**/*.py"], // Copy Python files + requirementsFile: "./requirements.txt", // Install packages + devPythonBinaryPath: ".venv/bin/python", // Dev mode binary + }), +]; + +// Usage in tasks +const result = await python.runInline(`print("Hello, world!")`); +const output = await python.runScript("./python/script.py", ["arg1"]); +``` + +### Browser Automation + +#### Playwright + +```ts +import { playwright } from "@trigger.dev/build/extensions/playwright"; + +extensions: [ + playwright({ + browsers: ["chromium", "firefox", "webkit"], // Default: ["chromium"] + headless: true, // Default: true + }), +]; +``` + +#### Puppeteer + +```ts +import { puppeteer } from "@trigger.dev/build/extensions/puppeteer"; + +extensions: [puppeteer()]; + +// Environment variable needed: +// PUPPETEER_EXECUTABLE_PATH: "/usr/bin/google-chrome-stable" +``` + +#### Lightpanda + +```ts +import { lightpanda } from "@trigger.dev/build/extensions/lightpanda"; + +extensions: [ + lightpanda({ + version: "latest", // or "nightly" + disableTelemetry: false, + }), +]; +``` + +### Media Processing + +#### FFmpeg + +```ts +import { ffmpeg } from "@trigger.dev/build/extensions/core"; + +extensions: [ + ffmpeg({ version: "7" }), // Static build, or omit for Debian version +]; + +// Automatically sets FFMPEG_PATH and FFPROBE_PATH +// Add fluent-ffmpeg to external packages if using +``` + +#### Audio Waveform + +```ts +import { audioWaveform } from "@trigger.dev/build/extensions/audioWaveform"; + +extensions: [ + audioWaveform(), // Installs Audio Waveform 1.1.0 +]; +``` + +### System & Package Management + +#### System Packages (apt-get) + +```ts +import { aptGet } from "@trigger.dev/build/extensions/core"; + +extensions: [ + aptGet({ + packages: ["ffmpeg", "imagemagick", "curl=7.68.0-1"], // Can specify versions + }), +]; +``` + +#### Additional NPM Packages + +Only use this for installing CLI tools, NOT packages you import in your code. + +```ts +import { additionalPackages } from "@trigger.dev/build/extensions/core"; + +extensions: [ + additionalPackages({ + packages: ["wrangler"], // CLI tools and specific versions + }), +]; +``` + +#### Additional Files + +```ts +import { additionalFiles } from "@trigger.dev/build/extensions/core"; + +extensions: [ + additionalFiles({ + files: ["wrangler.toml", "./assets/**", "./fonts/**"], // Glob patterns supported + }), +]; +``` + +### Environment & Build Tools + +#### Environment Variable Sync + +```ts +import { syncEnvVars } from "@trigger.dev/build/extensions/core"; + +extensions: [ + syncEnvVars(async (ctx) => { + // ctx contains: environment, projectRef, env + return [ + { name: "SECRET_KEY", value: await getSecret(ctx.environment) }, + { name: "API_URL", value: ctx.environment === "prod" ? "api.prod.com" : "api.dev.com" }, + ]; + }), +]; +``` + +#### ESBuild Plugins + +```ts +import { esbuildPlugin } from "@trigger.dev/build/extensions"; +import { sentryEsbuildPlugin } from "@sentry/esbuild-plugin"; + +extensions: [ + esbuildPlugin( + sentryEsbuildPlugin({ + org: process.env.SENTRY_ORG, + project: process.env.SENTRY_PROJECT, + authToken: process.env.SENTRY_AUTH_TOKEN, + }), + { placement: "last", target: "deploy" } // Optional config + ), +]; +``` + +## Custom Build Extensions + +```ts +import { defineConfig } from "@trigger.dev/sdk"; + +const customExtension = { + name: "my-custom-extension", + + externalsForTarget: (target) => { + return ["some-native-module"]; // Add external dependencies + }, + + onBuildStart: async (context) => { + console.log(`Build starting for ${context.target}`); + // Register esbuild plugins, modify build context + }, + + onBuildComplete: async (context, manifest) => { + console.log("Build complete, adding layers"); + // Add build layers, modify deployment + context.addLayer({ + id: "my-layer", + files: [{ source: "./custom-file", destination: "/app/custom" }], + commands: ["chmod +x /app/custom"], + }); + }, +}; + +export default defineConfig({ + project: "my-project", + build: { + extensions: [customExtension], + }, +}); +``` + +## Advanced Configuration + +### Telemetry + +```ts +import { PrismaInstrumentation } from "@prisma/instrumentation"; +import { OpenAIInstrumentation } from "@langfuse/openai"; + +export default defineConfig({ + // ... other config + telemetry: { + instrumentations: [new PrismaInstrumentation(), new OpenAIInstrumentation()], + exporters: [customExporter], // Optional custom exporters + }, +}); +``` + +### Machine & Performance + +```ts +export default defineConfig({ + // ... other config + defaultMachine: "large-1x", // Default machine for all tasks + maxDuration: 300, // Default max duration (seconds) + enableConsoleLogging: true, // Console logging in development +}); +``` + +## Common Extension Combinations + +### Full-Stack Web App + +```ts +extensions: [ + prismaExtension({ schema: "prisma/schema.prisma", migrate: true }), + additionalFiles({ files: ["./public/**", "./assets/**"] }), + syncEnvVars(async (ctx) => [...envVars]), +]; +``` + +### AI/ML Processing + +```ts +extensions: [ + pythonExtension({ + scripts: ["./ai/**/*.py"], + requirementsFile: "./requirements.txt", + }), + ffmpeg({ version: "7" }), + additionalPackages({ packages: ["wrangler"] }), +]; +``` + +### Web Scraping + +```ts +extensions: [ + playwright({ browsers: ["chromium"] }), + puppeteer(), + additionalFiles({ files: ["./selectors.json", "./proxies.txt"] }), +]; +``` + +## Best Practices + +- **Use specific versions**: Pin extension versions for reproducible builds +- **External packages**: Add modules with native addons to the `build.external` array +- **Environment sync**: Use `syncEnvVars` for dynamic secrets +- **File paths**: Use glob patterns for flexible file inclusion +- **Debug builds**: Use `--log-level debug --dry-run` for troubleshooting + +Extensions only affect deployment, not local development. Use `external` array for packages that shouldn't be bundled. diff --git a/.cursor/rules/trigger.realtime.mdc b/.cursor/rules/trigger.realtime.mdc new file mode 100644 index 000000000..b1e94fd72 --- /dev/null +++ b/.cursor/rules/trigger.realtime.mdc @@ -0,0 +1,277 @@ +--- +description: How to use realtime in your Trigger.dev tasks and your frontend +globs: **/trigger/**/*.ts +alwaysApply: false +--- +# Trigger.dev Realtime (v4) + +**Real-time monitoring and updates for runs** + +## Core Concepts + +Realtime allows you to: + +- Subscribe to run status changes, metadata updates, and streams +- Build real-time dashboards and UI updates +- Monitor task progress from frontend and backend + +## Authentication + +### Public Access Tokens + +```ts +import { auth } from "@trigger.dev/sdk"; + +// Read-only token for specific runs +const publicToken = await auth.createPublicToken({ + scopes: { + read: { + runs: ["run_123", "run_456"], + tasks: ["my-task-1", "my-task-2"], + }, + }, + expirationTime: "1h", // Default: 15 minutes +}); +``` + +### Trigger Tokens (Frontend only) + +```ts +// Single-use token for triggering tasks +const triggerToken = await auth.createTriggerPublicToken("my-task", { + expirationTime: "30m", +}); +``` + +## Backend Usage + +### Subscribe to Runs + +```ts +import { runs, tasks } from "@trigger.dev/sdk"; + +// Trigger and subscribe +const handle = await tasks.trigger("my-task", { data: "value" }); + +// Subscribe to specific run +for await (const run of runs.subscribeToRun(handle.id)) { + console.log(`Status: ${run.status}, Progress: ${run.metadata?.progress}`); + if (run.status === "COMPLETED") break; +} + +// Subscribe to runs with tag +for await (const run of runs.subscribeToRunsWithTag("user-123")) { + console.log(`Tagged run ${run.id}: ${run.status}`); +} + +// Subscribe to batch +for await (const run of runs.subscribeToBatch(batchId)) { + console.log(`Batch run ${run.id}: ${run.status}`); +} +``` + +### Streams + +```ts +import { task, metadata } from "@trigger.dev/sdk"; + +// Task that streams data +export type STREAMS = { + openai: OpenAI.ChatCompletionChunk; +}; + +export const streamingTask = task({ + id: "streaming-task", + run: async (payload) => { + const completion = await openai.chat.completions.create({ + model: "gpt-4", + messages: [{ role: "user", content: payload.prompt }], + stream: true, + }); + + // Register stream + const stream = await metadata.stream("openai", completion); + + let text = ""; + for await (const chunk of stream) { + text += chunk.choices[0]?.delta?.content || ""; + } + + return { text }; + }, +}); + +// Subscribe to streams +for await (const part of runs.subscribeToRun(runId).withStreams()) { + switch (part.type) { + case "run": + console.log("Run update:", part.run.status); + break; + case "openai": + console.log("Stream chunk:", part.chunk); + break; + } +} +``` + +## React Frontend Usage + +### Installation + +```bash +npm add @trigger.dev/react-hooks +``` + +### Triggering Tasks + +```tsx +"use client"; +import { useTaskTrigger, useRealtimeTaskTrigger } from "@trigger.dev/react-hooks"; +import type { myTask } from "../trigger/tasks"; + +function TriggerComponent({ accessToken }: { accessToken: string }) { + // Basic trigger + const { submit, handle, isLoading } = useTaskTrigger("my-task", { + accessToken, + }); + + // Trigger with realtime updates + const { + submit: realtimeSubmit, + run, + isLoading: isRealtimeLoading, + } = useRealtimeTaskTrigger("my-task", { accessToken }); + + return ( +
+ + + + + {run &&
Status: {run.status}
} +
+ ); +} +``` + +### Subscribing to Runs + +```tsx +"use client"; +import { useRealtimeRun, useRealtimeRunsWithTag } from "@trigger.dev/react-hooks"; +import type { myTask } from "../trigger/tasks"; + +function SubscribeComponent({ runId, accessToken }: { runId: string; accessToken: string }) { + // Subscribe to specific run + const { run, error } = useRealtimeRun(runId, { + accessToken, + onComplete: (run) => { + console.log("Task completed:", run.output); + }, + }); + + // Subscribe to tagged runs + const { runs } = useRealtimeRunsWithTag("user-123", { accessToken }); + + if (error) return
Error: {error.message}
; + if (!run) return
Loading...
; + + return ( +
+
Status: {run.status}
+
Progress: {run.metadata?.progress || 0}%
+ {run.output &&
Result: {JSON.stringify(run.output)}
} + +

Tagged Runs:

+ {runs.map((r) => ( +
+ {r.id}: {r.status} +
+ ))} +
+ ); +} +``` + +### Streams with React + +```tsx +"use client"; +import { useRealtimeRunWithStreams } from "@trigger.dev/react-hooks"; +import type { streamingTask, STREAMS } from "../trigger/tasks"; + +function StreamComponent({ runId, accessToken }: { runId: string; accessToken: string }) { + const { run, streams } = useRealtimeRunWithStreams(runId, { + accessToken, + }); + + const text = streams.openai + .filter((chunk) => chunk.choices[0]?.delta?.content) + .map((chunk) => chunk.choices[0].delta.content) + .join(""); + + return ( +
+
Status: {run?.status}
+
Streamed Text: {text}
+
+ ); +} +``` + +### Wait Tokens + +```tsx +"use client"; +import { useWaitToken } from "@trigger.dev/react-hooks"; + +function WaitTokenComponent({ tokenId, accessToken }: { tokenId: string; accessToken: string }) { + const { complete } = useWaitToken(tokenId, { accessToken }); + + return ; +} +``` + +### SWR Hooks (Fetch Once) + +```tsx +"use client"; +import { useRun } from "@trigger.dev/react-hooks"; +import type { myTask } from "../trigger/tasks"; + +function SWRComponent({ runId, accessToken }: { runId: string; accessToken: string }) { + const { run, error, isLoading } = useRun(runId, { + accessToken, + refreshInterval: 0, // Disable polling (recommended) + }); + + if (isLoading) return
Loading...
; + if (error) return
Error: {error.message}
; + + return
Run: {run?.status}
; +} +``` + +## Run Object Properties + +Key properties available in run subscriptions: + +- `id`: Unique run identifier +- `status`: `QUEUED`, `EXECUTING`, `COMPLETED`, `FAILED`, `CANCELED`, etc. +- `payload`: Task input data (typed) +- `output`: Task result (typed, when completed) +- `metadata`: Real-time updatable data +- `createdAt`, `updatedAt`: Timestamps +- `costInCents`: Execution cost + +## Best Practices + +- **Use Realtime over SWR**: Recommended for most use cases due to rate limits +- **Scope tokens properly**: Only grant necessary read/trigger permissions +- **Handle errors**: Always check for errors in hooks and subscriptions +- **Type safety**: Use task types for proper payload/output typing +- **Cleanup subscriptions**: Backend subscriptions auto-complete, frontend hooks auto-cleanup diff --git a/.cursor/rules/trigger.scheduled-tasks.mdc b/.cursor/rules/trigger.scheduled-tasks.mdc new file mode 100644 index 000000000..0bda6c998 --- /dev/null +++ b/.cursor/rules/trigger.scheduled-tasks.mdc @@ -0,0 +1,122 @@ +--- +description: How to write and use scheduled Trigger.dev tasks +globs: **/trigger/**/*.ts +alwaysApply: false +--- +# Scheduled tasks (cron) + +Recurring tasks using cron. For one-off future runs, use the **delay** option. + +## Define a scheduled task + +```ts +import { schedules } from "@trigger.dev/sdk"; + +export const task = schedules.task({ + id: "first-scheduled-task", + run: async (payload) => { + payload.timestamp; // Date (scheduled time, UTC) + payload.lastTimestamp; // Date | undefined + payload.timezone; // IANA, e.g. "America/New_York" (default "UTC") + payload.scheduleId; // string + payload.externalId; // string | undefined + payload.upcoming; // Date[] + + payload.timestamp.toLocaleString("en-US", { timeZone: payload.timezone }); + }, +}); +``` + +> Scheduled tasks need at least one schedule attached to run. + +## Attach schedules + +**Declarative (sync on dev/deploy):** + +```ts +schedules.task({ + id: "every-2h", + cron: "0 */2 * * *", // UTC + run: async () => {}, +}); + +schedules.task({ + id: "tokyo-5am", + cron: { pattern: "0 5 * * *", timezone: "Asia/Tokyo", environments: ["PRODUCTION", "STAGING"] }, + run: async () => {}, +}); +``` + +**Imperative (SDK or dashboard):** + +```ts +await schedules.create({ + task: task.id, + cron: "0 0 * * *", + timezone: "America/New_York", // DST-aware + externalId: "user_123", + deduplicationKey: "user_123-daily", // updates if reused +}); +``` + +### Dynamic / multi-tenant example + +```ts +// /trigger/reminder.ts +export const reminderTask = schedules.task({ + id: "todo-reminder", + run: async (p) => { + if (!p.externalId) throw new Error("externalId is required"); + const user = await db.getUser(p.externalId); + await sendReminderEmail(user); + }, +}); +``` + +```ts +// app/reminders/route.ts +export async function POST(req: Request) { + const data = await req.json(); + return Response.json( + await schedules.create({ + task: reminderTask.id, + cron: "0 8 * * *", + timezone: data.timezone, + externalId: data.userId, + deduplicationKey: `${data.userId}-reminder`, + }) + ); +} +``` + +## Cron syntax (no seconds) + +``` +* * * * * +| | | | └ day of week (0–7 or 1L–7L; 0/7=Sun; L=last) +| | | └── month (1–12) +| | └──── day of month (1–31 or L) +| └────── hour (0–23) +└──────── minute (0–59) +``` + +## When schedules won't trigger + +- **Dev:** only when the dev CLI is running. +- **Staging/Production:** only for tasks in the **latest deployment**. + +## SDK management (quick refs) + +```ts +await schedules.retrieve(id); +await schedules.list(); +await schedules.update(id, { cron: "0 0 1 * *", externalId: "ext", deduplicationKey: "key" }); +await schedules.deactivate(id); +await schedules.activate(id); +await schedules.del(id); +await schedules.timezones(); // list of IANA timezones +``` + +## Dashboard + +Create/attach schedules visually (Task, Cron pattern, Timezone, Optional: External ID, Dedup key, Environments). Test scheduled tasks from the **Test** page. diff --git a/apps/app/src/jobs/tasks/email/weekly-task-digest-email.ts b/apps/app/src/jobs/tasks/email/weekly-task-digest-email.ts new file mode 100644 index 000000000..d9e15429a --- /dev/null +++ b/apps/app/src/jobs/tasks/email/weekly-task-digest-email.ts @@ -0,0 +1,52 @@ +import { logger, queue, task } from '@trigger.dev/sdk'; +import { sendWeeklyTaskDigestEmail } from '@trycompai/email/lib/weekly-task-digest'; + +// Queue with concurrency limit to prevent rate limiting +const weeklyTaskDigestQueue = queue({ + name: 'weekly-task-digest-queue', + concurrencyLimit: 2, // Max 2 emails at a time +}); + +interface WeeklyTaskDigestPayload { + email: string; + userName: string; + organizationName: string; + organizationId: string; + tasks: Array<{ + id: string; + title: string; + }>; +} + +export const sendWeeklyTaskDigestEmailTask = task({ + id: 'send-weekly-task-digest-email', + queue: weeklyTaskDigestQueue, + run: async (payload: WeeklyTaskDigestPayload) => { + logger.info('Sending weekly task digest email', { + email: payload.email, + organizationName: payload.organizationName, + taskCount: payload.tasks.length, + }); + + try { + await sendWeeklyTaskDigestEmail(payload); + + logger.info('Successfully sent weekly task digest email', { + email: payload.email, + organizationName: payload.organizationName, + }); + + return { + success: true, + email: payload.email, + }; + } catch (error) { + logger.error('Failed to send weekly task digest email', { + email: payload.email, + error: error instanceof Error ? error.message : String(error), + }); + + throw error; + } + }, +}); diff --git a/apps/app/src/jobs/tasks/task/weekly-task-reminder.ts b/apps/app/src/jobs/tasks/task/weekly-task-reminder.ts new file mode 100644 index 000000000..03a83b0d3 --- /dev/null +++ b/apps/app/src/jobs/tasks/task/weekly-task-reminder.ts @@ -0,0 +1,134 @@ +import { db } from '@db'; +import { logger, schedules } from '@trigger.dev/sdk'; +import { sendWeeklyTaskDigestEmailTask } from '../email/weekly-task-digest-email'; + +export const weeklyTaskReminder = schedules.task({ + id: 'weekly-task-reminder', + cron: '0 9 * * 1', // Every Monday at 9:00 AM UTC + maxDuration: 1000 * 60 * 10, // 10 minutes + run: async () => { + logger.info('Starting weekly task reminder job'); + + // Get all organizations + const organizations = await db.organization.findMany({ + select: { + id: true, + name: true, + members: { + where: { + OR: [{ role: { contains: 'owner' } }, { role: { contains: 'admin' } }], + }, + select: { + id: true, + user: { + select: { + id: true, + name: true, + email: true, + }, + }, + }, + }, + }, + }); + + logger.info(`Found ${organizations.length} organizations to process`); + + // Build email payloads for all admins/owners with TODO tasks + const emailPayloads = []; + + for (const org of organizations) { + logger.info(`Processing organization: ${org.name} (${org.id})`); + + // Get all TODO tasks for this organization + const todoTasks = await db.task.findMany({ + where: { + organizationId: org.id, + status: 'todo', + }, + select: { + id: true, + title: true, + }, + orderBy: { + createdAt: 'desc', + }, + }); + + // Skip if no TODO tasks + if (todoTasks.length === 0) { + logger.info(`No TODO tasks found for organization ${org.name}`); + continue; + } + + logger.info(`Found ${todoTasks.length} TODO tasks for organization ${org.name}`); + + // Build payload for each admin/owner + for (const member of org.members) { + if (!member.user.email || !member.user.name) { + logger.warn(`Skipping member ${member.id} - missing email or name`); + continue; + } + + // Skip internal trycomp.ai emails + if (member.user.email.includes('@trycomp.ai')) { + logger.info(`Skipping internal email: ${member.user.email}`); + continue; + } + + emailPayloads.push({ + payload: { + email: member.user.email, + userName: member.user.name, + organizationName: org.name, + organizationId: org.id, + tasks: todoTasks, + }, + }); + } + } + + // Batch trigger all emails with concurrency control + // Trigger.dev has a limit of 500 items per batchTrigger + if (emailPayloads.length > 0) { + const BATCH_SIZE = 500; + const batches = []; + + for (let i = 0; i < emailPayloads.length; i += BATCH_SIZE) { + batches.push(emailPayloads.slice(i, i + BATCH_SIZE)); + } + + logger.info(`Triggering ${emailPayloads.length} emails in ${batches.length} batch(es)`); + + try { + for (const batch of batches) { + await sendWeeklyTaskDigestEmailTask.batchTrigger(batch); + logger.info(`Triggered batch of ${batch.length} emails`); + } + + logger.info(`Successfully triggered all ${emailPayloads.length} weekly task digest emails`); + } catch (error) { + logger.error(`Failed to trigger batch email sends: ${error}`); + + return { + success: false, + timestamp: new Date().toISOString(), + organizationsProcessed: organizations.length, + totalAdminsProcessed: emailPayloads.length, + emailsTriggered: 0, + error: error instanceof Error ? error.message : String(error), + }; + } + } + + const summary = { + timestamp: new Date().toISOString(), + organizationsProcessed: organizations.length, + emailsTriggered: emailPayloads.length, + }; + + logger.info('Weekly task reminder job completed', summary); + + return summary; + }, +}); diff --git a/apps/portal/src/app/lib/auth.ts b/apps/portal/src/app/lib/auth.ts index 1fec11bc8..534b6705c 100644 --- a/apps/portal/src/app/lib/auth.ts +++ b/apps/portal/src/app/lib/auth.ts @@ -1,3 +1,4 @@ +import { env } from '@/env.mjs'; import { OTPVerificationEmail, sendEmail, sendInviteMemberEmail } from '@comp/email'; import { db } from '@db'; import { betterAuth } from 'better-auth'; @@ -5,7 +6,6 @@ import { prismaAdapter } from 'better-auth/adapters/prisma'; import { nextCookies } from 'better-auth/next-js'; import { emailOTP, multiSession, organization } from 'better-auth/plugins'; import { ac, admin, auditor, employee, owner } from './permissions'; -import { env } from '@/env.mjs'; export const auth = betterAuth({ database: prismaAdapter(db, { diff --git a/packages/email/emails/reminders/weekly-task-digest.tsx b/packages/email/emails/reminders/weekly-task-digest.tsx new file mode 100644 index 000000000..553542d8c --- /dev/null +++ b/packages/email/emails/reminders/weekly-task-digest.tsx @@ -0,0 +1,137 @@ +import { + Body, + Button, + Container, + Font, + Heading, + Html, + Link, + Preview, + Section, + Tailwind, + Text, +} from '@react-email/components'; +import { Footer } from '../../components/footer'; +import { Logo } from '../../components/logo'; + +interface Props { + email: string; + userName: string; + organizationName: string; + organizationId: string; + tasks: Array<{ + id: string; + title: string; + }>; +} + +const getTaskCountMessage = (count: number) => { + const plural = count !== 1 ? 's' : ''; + return `You have ${count} pending task${plural} that are not yet completed`; +}; + +export const WeeklyTaskDigestEmail = ({ + email, + userName, + organizationName, + organizationId, + tasks, +}: Props) => { + const baseUrl = process.env.NEXT_PUBLIC_APP_URL ?? 'https://app.trycomp.ai'; + const tasksUrl = `${baseUrl}/${organizationId}/tasks`; + const taskCountMessage = getTaskCountMessage(tasks.length); + + return ( + + + + + + + + + {taskCountMessage} + + + + + + Weekly Task Reminder + + + Hi {userName}, + + + {taskCountMessage} in {organizationName}: + + +
+
    + {tasks.map((task) => ( +
  • + + {task.title} + +
  • + ))} +
+
+ +
+ +
+ + + or copy and paste this URL into your browser{' '} + + {tasksUrl} + + + +
+
+ + This notification was intended for {email}. + +
+ +
+ +