diff --git a/src/http/routes/tus/custom-s3-store.ts b/src/http/routes/tus/custom-s3-store.ts new file mode 100644 index 00000000..96543c24 --- /dev/null +++ b/src/http/routes/tus/custom-s3-store.ts @@ -0,0 +1,50 @@ +import { S3Store, S3StoreOptions } from '@tus/s3-store' +import { Upload } from '@tus/server' +import http from 'http' +import { HeadObjectCommand } from '@aws-sdk/client-s3' + +export class CustomS3Store extends S3Store { + constructor(options: S3StoreOptions) { + super(options) + } + + async finish(req: http.IncomingMessage, id: string, offset: number): Promise { + try { + return await super.finish(req, id, offset) + } catch (error: any) { + // Check if the error is related to multipart completion failure + // RustFS or some S3 backends might return 500 or 400 even if the file is persisted + if ( + (error.statusCode === 500 || error.statusCode === 400) && + (error.message?.includes('One or more of the specified parts could not be found') || + error.message?.includes('Internal Server Error')) + ) { + // Attempt to check if the object actually exists + try { + const bucket = this.bucket + const key = (this as any).key(id) + + const headCommand = new HeadObjectCommand({ + Bucket: bucket, + Key: key, + }) + + const data = await (this as any).client.send(headCommand) + + // If we find the object, we assume success + return { + id, + offset, + size: data.ContentLength, + metadata: {}, // We might lose some metadata here but the upload is recovered + } + } catch (headError) { + // If HeadObject also fails, throw the original error + throw error + } + } + + throw error + } + } +} diff --git a/src/http/routes/tus/index.ts b/src/http/routes/tus/index.ts index 008e6948..b8ee7c47 100644 --- a/src/http/routes/tus/index.ts +++ b/src/http/routes/tus/index.ts @@ -18,7 +18,7 @@ import { SIGNED_URL_SUFFIX, } from './lifecycle' import { TenantConnection, PubSub } from '@internal/database' -import { S3Store } from '@tus/s3-store' +import { CustomS3Store } from './custom-s3-store' import { NodeHttpHandler } from '@smithy/node-http-handler' import { ROUTE_OPERATIONS } from '../operations' import * as https from 'node:https' @@ -60,7 +60,7 @@ type MultiPartRequest = http.IncomingMessage & { function createTusStore(agent: { httpsAgent: https.Agent; httpAgent: http.Agent }) { if (storageBackendType === 's3') { - return new S3Store({ + return new CustomS3Store({ partSize: tusPartSize * 1024 * 1024, // Each uploaded part will have ${tusPartSize}MB, expirationPeriodInMilliseconds: tusUrlExpiryMs, cache: new AlsMemoryKV(), @@ -248,14 +248,14 @@ const authenticatedRoutes = fastifyPlugin( }) fastify.addHook('preHandler', async (req) => { - ;(req.raw as MultiPartRequest).log = req.log - ;(req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ; (req.raw as MultiPartRequest).log = req.log + ; (req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.post( @@ -350,14 +350,14 @@ const publicRoutes = fastifyPlugin( ) fastify.addHook('preHandler', async (req) => { - ;(req.raw as MultiPartRequest).log = req.log - ;(req.raw as MultiPartRequest).upload = { - storage: req.storage, - owner: req.owner, - tenantId: req.tenantId, - db: req.db, - isUpsert: req.headers['x-upsert'] === 'true', - } + ; (req.raw as MultiPartRequest).log = req.log + ; (req.raw as MultiPartRequest).upload = { + storage: req.storage, + owner: req.owner, + tenantId: req.tenantId, + db: req.db, + isUpsert: req.headers['x-upsert'] === 'true', + } }) fastify.options(