Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ module.exports = {
'no-plusplus': 'off',
'no-await-in-loop': 'off',
'no-restricted-syntax': 'off',
'no-return-await': 'off'
},
};
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"prisma.fileWatcher": true
"prisma.fileWatcher": true,
"prisma.pinToPrisma6": true
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- DropIndex
DROP INDEX "TranscriptionJob_analysis_entry_id_key";

-- CreateIndex
CREATE INDEX "TranscriptionJob_status_idx" ON "TranscriptionJob"("status");
6 changes: 4 additions & 2 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -125,19 +125,21 @@ model AnalysisEntry {
updated_at DateTime @default(now()) @updatedAt
transcription_segments Json?
full_transcript String?
transcriptionJob TranscriptionJob?
transcriptionJob TranscriptionJob[]

@@index([analysis_id, status])
}

model TranscriptionJob {
id Int @id @unique @default(autoincrement())
AnalysisEntry AnalysisEntry @relation(fields: [analysis_entry_id], references: [id])
analysis_entry_id String @unique
analysis_entry_id String
status TranscriptionJobStatus @default(PENDING)
created_at DateTime @default(now())
updated_at DateTime @default(now()) @updatedAt
language_code String

@@index([status])
}

enum TranscriptionJobStatus {
Expand Down
11 changes: 8 additions & 3 deletions server/config/logger.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import pino from 'pino';

const redactOptions = {
paths: ['context.userData.email', 'context.userData.password'],
paths: ['context.email', 'context.password'],
censor: '[REDACTED]',
};

const transport = pino.transport({
redact: redactOptions,
targets: [
{
target: 'pino-pretty',
Expand All @@ -18,4 +17,10 @@ const transport = pino.transport({
],
});

export const logger = pino(transport);
// Apply redaction at the logger level so sensitive fields are removed
export const logger = pino(
{
redact: redactOptions,
},
transport,
);
2 changes: 1 addition & 1 deletion server/config/loggerFunctions.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export const logError = (errorMessage, error, additionalInfo = 'N/A') => {

sendErrorLogsToTelegram(errorMessage, error);
} catch (err) {
return;
return; // Fail silently
}
};

Expand Down
89 changes: 66 additions & 23 deletions server/controllers/transcriptionController.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,88 @@
import { logError, logInfo } from '../config/loggerFunctions.js';
import { transcribeRecording } from '../integrations/whisper-asr-webservice/transcribe.js';
import {
getPendingTranscriptionJobsFromDb,
getFirstTranscriptionJobFromDbByStatus,
storeNormalizedTranscriptionInDb,
updateStatusSingleTranscriptionJobInDb,
} from '../models/transcriptionModel.js';
import { cleanUpTranscriptSegments } from '../utils/transcription/transcriptionNormalizer.js';

export const processPendingTranscriptionJobs = async () => {
const pendingTranscriptionJobs = await getPendingTranscriptionJobsFromDb();
// check if there is an ongoing transcription job

if (pendingTranscriptionJobs.length === 0) {
console.log('No pending transcription jobs found');
return;
let inProgressTranscriptionJob;

try {
inProgressTranscriptionJob = await getFirstTranscriptionJobFromDbByStatus('IN_PROGRESS');
} catch (error) {
return logError('error fetching first transcription job from DB', error);
}
if (inProgressTranscriptionJob) {
return console.log(`Transcription job id: ${inProgressTranscriptionJob.id} is in progress Skipping new jobs.`); // * Set as console log for registering but not cluttering logs
}

let pendingTranscriptionJob;

try {
pendingTranscriptionJob = await getFirstTranscriptionJobFromDbByStatus('PENDING');
} catch (error) {
return logError('Error fetching first pending transcription job from DB', error);
}

if (!pendingTranscriptionJob) {
return console.log('No pending transcription jobs found.'); // * Set as console log for registering but not cluttering logs
}

for (const transcriptionJob of pendingTranscriptionJobs) {
logInfo(`Processing transcription job for analysis entry ID: ${transcriptionJob.analysis_entry_id}`);
const {
id: transcriptionJobId,
analysis_entry_id: analysisEntryId,
} = pendingTranscriptionJob;

const { analysis_entry_id: analysisEntryId } = transcriptionJob;
logInfo(`Processing transcription job ${transcriptionJobId} for analysis entry ID: ${analysisEntryId}`);

try {
// Mark job as IN_PROGRESS before making async call to prevent re-queuing
await updateStatusSingleTranscriptionJobInDb(analysisEntryId, 'IN_PROGRESS');
logInfo(`Marked transcription job ${analysisEntryId} as IN_PROGRESS`);
try { // Mark job as IN_PROGRESS before making async call to prevent re-queuing
await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'IN_PROGRESS');
logInfo(`Marked transcription job ${transcriptionJobId} for analysis entry ID ${analysisEntryId} as IN_PROGRESS`);
} catch (error) {
return logError(`error updating status for transcription job ${transcriptionJobId} to IN_PROGRESS`, error);
}

const transcriptionJobResult = await transcribeRecording(transcriptionJob);
let transcriptionJobResult;

const { segments, text: fullText } = transcriptionJobResult;
try {
transcriptionJobResult = await transcribeRecording(pendingTranscriptionJob);
} catch (error) {
logError(`error transcribing recording for transcription job ${transcriptionJobId}`, error);
return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING');
}
const {
segments,
text: fullText,
} = transcriptionJobResult;

const cleanedUpSegments = await cleanUpTranscriptSegments(segments);
let cleanedUpSegments;

await storeNormalizedTranscriptionInDb(analysisEntryId, fullText, cleanedUpSegments);
try {
cleanedUpSegments = await cleanUpTranscriptSegments(segments);
} catch (error) {
logError(`error cleaning up transcript segments for transcription job ${transcriptionJobId}`, error);
return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING');
}

await updateStatusSingleTranscriptionJobInDb(analysisEntryId, 'COMPLETED');
try {
await storeNormalizedTranscriptionInDb(analysisEntryId, fullText, cleanedUpSegments);
} catch (error) {
logError(`error inserting normalized transcription from transcription job ${transcriptionJobId} in analysis entry ${analysisEntryId}`, error);
return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING');
}

logInfo(`Transcription job ${analysisEntryId} completed successfully`);
} catch (error) {
// Mark job back as PENDING to allow retry
await updateStatusSingleTranscriptionJobInDb(analysisEntryId, 'PENDING');
logError(`Error processing transcription job, ${analysisEntryId}`, error);
}
try {
await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'COMPLETED');
logInfo(`Marked transcription job ${transcriptionJobId} for analysis entry ID ${analysisEntryId} as COMPLETED`);
} catch (error) {
logError(`error updating status for transcription job ${transcriptionJobId} to COMPLETED`, error);
return await updateStatusSingleTranscriptionJobInDb(transcriptionJobId, 'PENDING');
}

return logInfo(`Transcription job ${transcriptionJobId} for analysis entry ID ${analysisEntryId} completed successfully`);
};
9 changes: 2 additions & 7 deletions server/cron/getPendingTranscriptionJobScheduler.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import { CronJob } from 'cron';
import { processPendingTranscriptionJobs } from '../controllers/transcriptionController.js';
import { logError } from '../config/loggerFunctions.js';

export const getPendingTranscriptionJobScheduler = new CronJob('*/15 * * * *', async () => {
try {
await processPendingTranscriptionJobs();
} catch (error) {
logError('Error processing transcription request', error);
}
export const getPendingTranscriptionJobScheduler = new CronJob('*/5 * * * *', () => {
processPendingTranscriptionJobs();
});
2 changes: 1 addition & 1 deletion server/integrations/s3-client/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export const externalS3Client = new S3Client({
});

export const internalS3Client = new S3Client({
region: process.env.S3_REGION, // irrelevant since miniIO doesnt takei into account
region: process.env.S3_REGION, // irrelevant since miniIO doesnt take into account
endpoint: process.env.S3_INTERNAL_ENDPOINT, // Container network endpoint
forcePathStyle: true, // Required for MinIO path-style URLs
credentials: {
Expand Down
17 changes: 6 additions & 11 deletions server/models/transcriptionModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ export const insertTranscriptionJobInDb = async (transcriptionRequest) => {
});
};

export const updateStatusSingleTranscriptionJobInDb = async (analysisEntryId, status) => {
export const updateStatusSingleTranscriptionJobInDb = async (transcriptionJobId, status) => {
const whereClause = {
analysis_entry_id: analysisEntryId,
id: transcriptionJobId,
};

await prisma.transcriptionJob.update({
Expand All @@ -35,23 +35,19 @@ export const storeNormalizedTranscriptionInDb = async (analysisEntryId, fullText
data: {
full_transcript: fullText,
transcription_segments: normalizedSegments,
transcriptionJob: {
update: {
status: 'COMPLETED',
},
},
},
});
};

export const getPendingTranscriptionJobsFromDb = async () => {
export const getFirstTranscriptionJobFromDbByStatus = async (status) => {
const whereClause = {
status: 'PENDING',
status: status,
};

const pendingTranscriptionJobs = await prisma.transcriptionJob.findMany({
const pendingTranscriptionJobs = await prisma.transcriptionJob.findFirst({
where: whereClause,
select: {
id: true,
analysis_entry_id: true,
language_code: true,
AnalysisEntry: {
Expand All @@ -60,7 +56,6 @@ export const getPendingTranscriptionJobsFromDb = async () => {
},
},
},
take: 10,
});

return pendingTranscriptionJobs;
Expand Down