Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions apis/api-media/src/workers/processVideoUploads/service/service.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { Job } from 'bullmq'
import { Logger } from 'pino'

import { prismaMock } from '../../../../test/prismaMock'
import { getVideo } from '../../../schema/mux/video/service'
import { jobName as processVideoDownloadsNowJobName } from '../../processVideoDownloads/config'
import { queue as processVideoDownloadsQueue } from '../../processVideoDownloads/queue'

import { ProcessVideoUploadJobData, service } from './service'

jest.mock('../../../schema/mux/video/service', () => ({
getVideo: jest.fn()
}))

jest.mock('../../processVideoDownloads/queue', () => ({
queue: {
add: jest.fn()
}
}))

const mockLogger = {
info: jest.fn(),
warn: jest.fn(),
error: jest.fn()
} as unknown as Logger

const mockJob = {
data: {
videoId: 'video-id',
edition: 'standard',
languageId: '529',
version: 1,
muxVideoId: 'mux-video-id',
metadata: {
durationMs: 120000,
duration: 120,
width: 1920,
height: 1080
},
originalFilename: 'test.mp4'
}
} as Job<ProcessVideoUploadJobData>

describe('processVideoUploads service', () => {
beforeEach(() => {
jest.clearAllMocks()
})

it('creates or updates variant when mux video is ready', async () => {
prismaMock.muxVideo.findUnique.mockResolvedValue({
id: 'mux-video-id',
assetId: 'asset-id'
} as any)
;(getVideo as jest.Mock).mockResolvedValue({
status: 'ready',
duration: 120,
playback_ids: [{ id: 'playback-id', policy: 'public' }]
})

prismaMock.muxVideo.update.mockResolvedValue({} as any)
prismaMock.video.findUnique.mockResolvedValue({ slug: 'video-slug' } as any)
prismaMock.videoVariant.findFirst.mockResolvedValue({
id: 'variant-id',
slug: 'variant-slug'
} as any)
prismaMock.videoVariant.update.mockResolvedValue({} as any)

await service(mockJob, mockLogger)

expect(getVideo).toHaveBeenCalledWith('asset-id', false)
expect(prismaMock.muxVideo.update).toHaveBeenCalledWith({
where: { id: 'mux-video-id' },
data: {
playbackId: 'playback-id',
readyToStream: true,
duration: 120,
downloadable: true
}
})
expect(processVideoDownloadsQueue.add).toHaveBeenCalledWith(
processVideoDownloadsNowJobName,
{
videoId: 'mux-video-id',
assetId: 'asset-id',
isUserGenerated: false
},
{
jobId: 'download:mux-video-id',
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
removeOnFail: { age: 432000, count: 50 }
}
)
expect(prismaMock.videoVariant.update).toHaveBeenCalledWith({
where: { id: 'variant-id' },
data: expect.objectContaining({
muxVideoId: 'mux-video-id',
downloadable: true,
published: true,
version: 1
})
})
})

it('logs and stops when mux video processing errored', async () => {
prismaMock.muxVideo.findUnique.mockResolvedValue({
id: 'mux-video-id',
assetId: 'asset-id'
} as any)
;(getVideo as jest.Mock).mockResolvedValue({
status: 'errored',
playback_ids: []
})

await service(mockJob, mockLogger)

expect(mockLogger.error).toHaveBeenCalledWith(
{ muxVideoId: 'mux-video-id', assetId: 'asset-id', status: 'errored' },
'Mux video processing errored'
)
expect(mockLogger.error).toHaveBeenCalledWith(
{
videoId: 'video-id',
muxVideoId: 'mux-video-id',
finalStatus: 'errored'
},
'Video upload processing failed due to Mux error'
)
expect(prismaMock.muxVideo.update).not.toHaveBeenCalled()
expect(processVideoDownloadsQueue.add).not.toHaveBeenCalled()
expect(prismaMock.videoVariant.update).not.toHaveBeenCalled()
expect(prismaMock.videoVariant.create).not.toHaveBeenCalled()
})
})
43 changes: 37 additions & 6 deletions apis/api-media/src/workers/processVideoUploads/service/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ async function getLanguageSlug(
return `${videoSlug}/${data.language.slug}`
} catch (error) {
logger?.error(
`Failed to get language slug for language ${languageId}: ${error as Error}`
{ error, languageId, videoSlug },
'Failed to get language slug for variant'
)
throw new Error(`Failed to create slug for variant: ${error as Error}`)
} finally {
Expand All @@ -82,7 +83,7 @@ async function getLanguageSlug(
}
}

interface ProcessVideoUploadJobData {
export interface ProcessVideoUploadJobData {
videoId: string
edition: string
languageId: string
Expand Down Expand Up @@ -141,15 +142,24 @@ export async function service(
logger
})
}
if (finalStatus !== 'ready') {
if (finalStatus === 'errored') {
logger?.error(
{ videoId, muxVideoId, finalStatus },
'Video upload processing failed due to Mux error'
)
return
}

if (finalStatus === 'timeout') {
logger?.warn(
{ videoId, muxVideoId, finalStatus },
'Video upload processing failed due to timeout'
)
}
} catch (error) {
logger?.error(
`Video upload processing failed for video ${videoId} and mux video ${muxVideoId}: ${error as Error}`
{ error, videoId, muxVideoId },
'Video upload processing failed'
)
throw error
}
Expand All @@ -160,6 +170,7 @@ async function waitForMuxVideoCompletion(
logger?: Logger
): Promise<
| { finalStatus: 'ready'; playbackId: string }
| { finalStatus: 'errored'; playbackId: null }
| { finalStatus: 'timeout'; playbackId: null }
> {
const maxAttempts = 480 // 120 minutes (480 * 15 seconds)
Expand All @@ -179,6 +190,18 @@ async function waitForMuxVideoCompletion(
try {
const muxVideoAsset = await getVideo(muxVideo.assetId, false)

if (muxVideoAsset.status === 'errored') {
logger?.error(
{
muxVideoId: muxVideo.id,
assetId: muxVideo.assetId,
status: muxVideoAsset.status
},
'Mux video processing errored'
)
return { finalStatus: 'errored', playbackId: null }
}

const playbackId = muxVideoAsset?.playback_ids?.[0].id
if (playbackId != null && muxVideoAsset.status === 'ready') {
await prisma.muxVideo.update({
Expand Down Expand Up @@ -235,6 +258,7 @@ async function waitForMuxVideoCompletion(
logger?.info(
{
muxVideoId: muxVideo.id,
assetId: muxVideo.assetId,
status: muxVideoAsset.status,
attempts: attempts + 1,
elapsedMinutes
Expand All @@ -247,7 +271,13 @@ async function waitForMuxVideoCompletion(
await new Promise((resolve) => setTimeout(resolve, intervalMs))
} catch (error) {
logger?.error(
`Error checking Mux video status for mux video ${muxVideo.id} and asset ${muxVideo.assetId} on attempt ${attempts + 1}: ${error as Error}`
{
error,
muxVideoId: muxVideo.id,
assetId: muxVideo.assetId,
attempt: attempts + 1
},
'Error checking Mux video status'
)
attempts++
await new Promise((resolve) => setTimeout(resolve, intervalMs))
Expand Down Expand Up @@ -362,7 +392,8 @@ async function createVideoVariant({
}
} catch (error) {
logger?.error(
`Failed to create video variant for video ${videoId} and mux video ${muxVideoId}: ${error as Error}`
{ error, videoId, muxVideoId, variantId, languageId, edition },
'Failed to create video variant'
)
throw error
}
Expand Down