diff --git a/.eslintrc.cjs b/.eslintrc.cjs index b0a4d2d..9009f8a 100644 --- a/.eslintrc.cjs +++ b/.eslintrc.cjs @@ -47,5 +47,6 @@ module.exports = { 'no-plusplus': 'off', 'no-await-in-loop': 'off', 'no-restricted-syntax': 'off', + 'no-return-await': 'off' }, }; diff --git a/.vscode/settings.json b/.vscode/settings.json index ac583ab..d35d858 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "prisma.fileWatcher": true + "prisma.fileWatcher": true, + "prisma.pinToPrisma6": true } \ No newline at end of file diff --git a/prisma/migrations/20260112085631_remove_analysis_entry_id_uniqueness_to_allow_for_transcription_reruns/migration.sql b/prisma/migrations/20260112085631_remove_analysis_entry_id_uniqueness_to_allow_for_transcription_reruns/migration.sql new file mode 100644 index 0000000..134e669 --- /dev/null +++ b/prisma/migrations/20260112085631_remove_analysis_entry_id_uniqueness_to_allow_for_transcription_reruns/migration.sql @@ -0,0 +1,5 @@ +-- DropIndex +DROP INDEX "TranscriptionJob_analysis_entry_id_key"; + +-- CreateIndex +CREATE INDEX "TranscriptionJob_status_idx" ON "TranscriptionJob"("status"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index c75a00b..309e711 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -125,7 +125,7 @@ model AnalysisEntry { updated_at DateTime @default(now()) @updatedAt transcription_segments Json? full_transcript String? - transcriptionJob TranscriptionJob? + transcriptionJob TranscriptionJob[] @@index([analysis_id, status]) } @@ -133,11 +133,13 @@ model AnalysisEntry { model TranscriptionJob { id Int @id @unique @default(autoincrement()) AnalysisEntry AnalysisEntry @relation(fields: [analysis_entry_id], references: [id]) - analysis_entry_id String @unique + analysis_entry_id String status TranscriptionJobStatus @default(PENDING) created_at DateTime @default(now()) updated_at DateTime @default(now()) @updatedAt language_code String + + @@index([status]) } enum TranscriptionJobStatus { diff --git a/server/config/logger.js b/server/config/logger.js index 3f94ffe..24be7d4 100644 --- a/server/config/logger.js +++ b/server/config/logger.js @@ -1,12 +1,11 @@ import pino from 'pino'; const redactOptions = { - paths: ['context.userData.email', 'context.userData.password'], + paths: ['context.email', 'context.password'], censor: '[REDACTED]', }; const transport = pino.transport({ - redact: redactOptions, targets: [ { target: 'pino-pretty', @@ -18,4 +17,10 @@ const transport = pino.transport({ ], }); -export const logger = pino(transport); +// Apply redaction at the logger level so sensitive fields are removed +export const logger = pino( + { + redact: redactOptions, + }, + transport, +); diff --git a/server/config/loggerFunctions.js b/server/config/loggerFunctions.js index c12bbe3..4d0fd9e 100644 --- a/server/config/loggerFunctions.js +++ b/server/config/loggerFunctions.js @@ -16,7 +16,7 @@ export const logError = (errorMessage, error, additionalInfo = 'N/A') => { sendErrorLogsToTelegram(errorMessage, error); } catch (err) { - return; + return; // Fail silently } }; diff --git a/server/controllers/transcriptionController.js b/server/controllers/transcriptionController.js index 669d482..3441599 100644 --- a/server/controllers/transcriptionController.js +++ b/server/controllers/transcriptionController.js @@ -1,45 +1,88 @@ import { logError, logInfo } from '../config/loggerFunctions.js'; import { transcribeRecording } from '../integrations/whisper-asr-webservice/transcribe.js'; import { - getPendingTranscriptionJobsFromDb, + getFirstTranscriptionJobFromDbByStatus, storeNormalizedTranscriptionInDb, updateStatusSingleTranscriptionJobInDb, } from '../models/transcriptionModel.js'; import { cleanUpTranscriptSegments } from '../utils/transcription/transcriptionNormalizer.js'; export const processPendingTranscriptionJobs = async () => { - const pendingTranscriptionJobs = await getPendingTranscriptionJobsFromDb(); + // check if there is an ongoing transcription job - if (pendingTranscriptionJobs.length === 0) { - console.log('No pending transcription jobs found'); - return; + let inProgressTranscriptionJob; + + try { + inProgressTranscriptionJob = await getFirstTranscriptionJobFromDbByStatus('IN_PROGRESS'); + } catch (error) { + return logError('error fetching first transcription job from DB', error); + } + if (inProgressTranscriptionJob) { + return console.log(`Transcription job id: ${inProgressTranscriptionJob.id} is in progress Skipping new jobs.`); // * Set as console log for registering but not cluttering logs + } + + let pendingTranscriptionJob; + + try { + pendingTranscriptionJob = await getFirstTranscriptionJobFromDbByStatus('PENDING'); + } catch (error) { + return logError('Error fetching first pending transcription job from DB', error); + } + + if (!pendingTranscriptionJob) { + return console.log('No pending transcription jobs found.'); // * Set as console log for registering but not cluttering logs } - for (const transcriptionJob of pendingTranscriptionJobs) { - logInfo(`Processing transcription job for analysis entry ID: ${transcriptionJob.analysis_entry_id}`); + const { + id: transcriptionJobId, + analysis_entry_id: analysisEntryId, + } = pendingTranscriptionJob; - const { analysis_entry_id: analysisEntryId } = transcriptionJob; + logInfo(`Processing transcription job ${transcriptionJobId} for analysis entry ID: ${analysisEntryId}`); - try { - // Mark job as IN_PROGRESS before making async call to prevent re-queuing - await updateStatusSingleTranscriptionJobInDb(analysisEntryId, 'IN_PROGRESS'); - logInfo(`Marked transcription job ${analysisEntryId} as IN_PROGRESS`); + try { // Mark job as IN_PROGRESS before making async call to prevent re-queuing + await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'IN_PROGRESS'); + logInfo(`Marked transcription job ${transcriptionJobId} for analysis entry ID ${analysisEntryId} as IN_PROGRESS`); + } catch (error) { + return logError(`error updating status for transcription job ${transcriptionJobId} to IN_PROGRESS`, error); + } - const transcriptionJobResult = await transcribeRecording(transcriptionJob); + let transcriptionJobResult; - const { segments, text: fullText } = transcriptionJobResult; + try { + transcriptionJobResult = await transcribeRecording(pendingTranscriptionJob); + } catch (error) { + logError(`error transcribing recording for transcription job ${transcriptionJobId}`, error); + return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING'); + } + const { + segments, + text: fullText, + } = transcriptionJobResult; - const cleanedUpSegments = await cleanUpTranscriptSegments(segments); + let cleanedUpSegments; - await storeNormalizedTranscriptionInDb(analysisEntryId, fullText, cleanedUpSegments); + try { + cleanedUpSegments = await cleanUpTranscriptSegments(segments); + } catch (error) { + logError(`error cleaning up transcript segments for transcription job ${transcriptionJobId}`, error); + return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING'); + } - await updateStatusSingleTranscriptionJobInDb(analysisEntryId, 'COMPLETED'); + try { + await storeNormalizedTranscriptionInDb(analysisEntryId, fullText, cleanedUpSegments); + } catch (error) { + logError(`error inserting normalized transcription from transcription job ${transcriptionJobId} in analysis entry ${analysisEntryId}`, error); + return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING'); + } - logInfo(`Transcription job ${analysisEntryId} completed successfully`); - } catch (error) { - // Mark job back as PENDING to allow retry - await updateStatusSingleTranscriptionJobInDb(analysisEntryId, 'PENDING'); - logError(`Error processing transcription job, ${analysisEntryId}`, error); - } + try { + await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'COMPLETED'); + logInfo(`Marked transcription job ${transcriptionJobId} for analysis entry ID ${analysisEntryId} as COMPLETED`); + } catch (error) { + logError(`error updating status for transcription job ${transcriptionJobId} to COMPLETED`, error); + return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING'); } + + return logInfo(`Transcription job ${transcriptionJobId} for analysis entry ID ${analysisEntryId} completed successfully`); }; diff --git a/server/cron/getPendingTranscriptionJobScheduler.js b/server/cron/getPendingTranscriptionJobScheduler.js index a35aaa0..19aa2ab 100644 --- a/server/cron/getPendingTranscriptionJobScheduler.js +++ b/server/cron/getPendingTranscriptionJobScheduler.js @@ -1,11 +1,6 @@ import { CronJob } from 'cron'; import { processPendingTranscriptionJobs } from '../controllers/transcriptionController.js'; -import { logError } from '../config/loggerFunctions.js'; -export const getPendingTranscriptionJobScheduler = new CronJob('*/15 * * * *', async () => { - try { - await processPendingTranscriptionJobs(); - } catch (error) { - logError('Error processing transcription request', error); - } +export const getPendingTranscriptionJobScheduler = new CronJob('*/5 * * * *', () => { + processPendingTranscriptionJobs(); }); diff --git a/server/integrations/s3-client/s3.js b/server/integrations/s3-client/s3.js index 4d3c89a..0af61ee 100644 --- a/server/integrations/s3-client/s3.js +++ b/server/integrations/s3-client/s3.js @@ -12,7 +12,7 @@ export const externalS3Client = new S3Client({ }); export const internalS3Client = new S3Client({ - region: process.env.S3_REGION, // irrelevant since miniIO doesnt takei into account + region: process.env.S3_REGION, // irrelevant since miniIO doesnt take into account endpoint: process.env.S3_INTERNAL_ENDPOINT, // Container network endpoint forcePathStyle: true, // Required for MinIO path-style URLs credentials: { diff --git a/server/models/transcriptionModel.js b/server/models/transcriptionModel.js index ba52834..0d0e433 100644 --- a/server/models/transcriptionModel.js +++ b/server/models/transcriptionModel.js @@ -12,9 +12,9 @@ export const insertTranscriptionJobInDb = async (transcriptionRequest) => { }); }; -export const updateStatusSingleTranscriptionJobInDb = async (analysisEntryId, status) => { +export const updateStatusSingleTranscriptionJobInDb = async (transcriptionJobId, status) => { const whereClause = { - analysis_entry_id: analysisEntryId, + id: transcriptionJobId, }; await prisma.transcriptionJob.update({ @@ -35,23 +35,19 @@ export const storeNormalizedTranscriptionInDb = async (analysisEntryId, fullText data: { full_transcript: fullText, transcription_segments: normalizedSegments, - transcriptionJob: { - update: { - status: 'COMPLETED', - }, - }, }, }); }; -export const getPendingTranscriptionJobsFromDb = async () => { +export const getFirstTranscriptionJobFromDbByStatus = async (status) => { const whereClause = { - status: 'PENDING', + status: status, }; - const pendingTranscriptionJobs = await prisma.transcriptionJob.findMany({ + const pendingTranscriptionJobs = await prisma.transcriptionJob.findFirst({ where: whereClause, select: { + id: true, analysis_entry_id: true, language_code: true, AnalysisEntry: { @@ -60,7 +56,6 @@ export const getPendingTranscriptionJobsFromDb = async () => { }, }, }, - take: 10, }); return pendingTranscriptionJobs;