Improved adapter interface and explicit signal cancellation#854
Improved adapter interface and explicit signal cancellation#854
Conversation
| } | ||
| this.filePath = path.isAbsolute(storageFilePath) | ||
| ? storageFilePath | ||
| : path.resolve(__dirname, '..', '..', '..', storageFilePath) |
There was a problem hiding this comment.
due to nesting, this location would change. Is it intentional?
| cacheControl: cacheControl || 'no-cache', | ||
| mimetype: contentType || 'application/octet-stream', | ||
| eTag, | ||
| lastModified: data.birthtime, |
There was a problem hiding this comment.
calculated value isn't used, wrong value here
| 'getObject', | ||
| 'putObject', | ||
| 'deleteObject', | ||
| 'listObjects', | ||
| 'copyObject', | ||
| 'headObject', | ||
| 'createMultipartUpload', | ||
| 'uploadPart', | ||
| 'completeMultipartUpload', | ||
| 'abortMultipartUpload', | ||
| 'listMultipartUploads', | ||
| 'listParts', | ||
| 'getSignedUrl', | ||
| 'createBucket', | ||
| 'deleteBucket', | ||
| 'listBuckets', | ||
| 'getBucketLocation', | ||
| 'getBucketVersioning', | ||
| 'putBucketVersioning', | ||
| 'getBucketLifecycleConfiguration', | ||
| 'putBucketLifecycleConfiguration', | ||
| 'deleteBucketLifecycle', | ||
| 'uploadObject', | ||
| 'privateAssetUrl', |
There was a problem hiding this comment.
since methods changed, we need to update this list
| const locationParts = key.split('/') | ||
| locationParts.shift() // tenant-id | ||
|
|
||
| bucket = keyParts.shift() || '' | ||
| resultBucket = keyParts.shift() || '' |
There was a problem hiding this comment.
We should use locationParts here or remove it. Using smaller scope is better though
| : undefined, | ||
| CopySourceIfNoneMatch: req.Headers['x-amz-copy-source-if-none-match'], | ||
| CopySourceIfUnmodifiedSince: req.Headers['x-amz-copy-source-if-unmodified-since'] | ||
| ? new Date(req.Headers['x-amz-copy-source-if-unmodified-since']) |
There was a problem hiding this comment.
probably we need to track validating and short circuiting all these headers
| const listPartsInput = new ListPartsCommand({ | ||
| Bucket: bucketName, | ||
| Bucket: bucket, | ||
| Key: version ? key + '/' + version : key, |
There was a problem hiding this comment.
Not relevant to this diff
Below we're using a helper withOptionalVersion but not here
| CopySource: `${storageS3Bucket}/${withOptionalVersion(sourceKey, sourceKeyVersion)}`, | ||
| UploadId: uploadId, | ||
| PartNumber: partNumber, | ||
| CopySource: `${bucket}/${withOptionalVersion(sourceKey, sourceKeyVersion)}`, |
There was a problem hiding this comment.
url encoding is missing here
| bucketId: string | ||
| options?: { | ||
| prefix?: string | ||
| deltimeter?: string |
| createAnalyticsBucket(data: Pick<Bucket, 'name'>): Promise<IcebergCatalog> | ||
| createAnalyticsBucket(input: CreateAnalyticsBucketInput): Promise<IcebergCatalog> | ||
|
|
||
| findBucketById<Filters extends FindBucketFilters = FindObjectFilters>( |
There was a problem hiding this comment.
isn't it supposed to be FindBucketFilters for default?
| headers?: BrowserCacheHeaders, | ||
| signal?: AbortSignal | ||
| ): Promise<ObjectResponse> { | ||
| async read(input: ReadObjectInput): Promise<ObjectResponse> { |
There was a problem hiding this comment.
error messages don't match method name anymore
| const tenantBuckets = await storage.listAnalyticsBuckets({ | ||
| columns: 'id,name', | ||
| options: { | ||
| limit: 1000, | ||
| }, | ||
| }) |
There was a problem hiding this comment.
This needs pagination, right? For now, it might be fine but it will break at some point
| key: storage.location.getKeyLocation({ | ||
| tenantId, | ||
| bucketId: job.data.bucketId, | ||
| objectName: job.data.name, | ||
| }), |
There was a problem hiding this comment.
this could be s3Key directly
| sortBy: { | ||
| column: sortBy?.column, | ||
| order: sortBy?.order, | ||
| }, |
There was a problem hiding this comment.
| sortBy: { | |
| column: sortBy?.column, | |
| order: sortBy?.order, | |
| }, | |
| sortBy, |
| retry?: number | ||
| readOnly?: boolean | ||
| timeout?: number | ||
| signal?: AbortSignal |
| tnx?: TNX | ||
| parentTnx?: TNX | ||
| parentConnection?: TenantConnection | ||
| signal?: AbortSignal |
There was a problem hiding this comment.
this one to extend cancellable as well
| async listParts( | ||
| bucket: string, | ||
| key: string, | ||
| uploadId?: string, | ||
| maxParts?: number, | ||
| marker?: string | ||
| ) { |
There was a problem hiding this comment.
similar pattern for this method?
| expect(S3Backend.prototype.copyObject).toHaveBeenCalled() | ||
| expect(S3Backend.prototype.deleteObjects).toHaveBeenCalled() | ||
| expect(S3Backend.prototype.copy).toHaveBeenCalled() | ||
| expect(S3Backend.prototype.removeMany).toHaveBeenCalled() |
There was a problem hiding this comment.
success is checking removeMany but failures below are checking for remove mistakenly
What kind of change does this PR introduce?
Refactor
What is the current behavior?
S3Backend,FileBackend) use positional parameters (e.g.,read(bucket, key, version, headers, signal)), making call sites hard to read and fragile when parameters change.Pick<>types with no named input interfaces.TenantConnection.setAbortSignal(), meaning all queries on a connection are cancellable, including cleanup queries in catch blocks, upload completion transactions, and internal maintenance queries. This has caused bugs where critical operations (like finalizing an upload) get cancelled when a client disconnects mid-request.StorageandObjectStoragemethods use positional parameters.What is the new behavior?
Single object parameter pattern with named input interfaces
All layers now use a single object parameter with named input interfaces:
read(bucket, key, version, headers)read({ bucket, key, version, headers })findBucketById(id, columns, filters)findBucketById({ bucketId, columns, filters })findBucket(id, columns)findBucket({ bucketId, columns })deleteObject(name)deleteObject({ objectName })Input interfaces are defined in:
src/storage/backend/adapter.ts—ReadObjectInput,WriteObjectInput,CopyObjectInput, etc.src/storage/database/adapter.ts—FindBucketByIdInput,DeleteObjectInput,CreateMultipartUploadInput, etc.Per-query abort signal instead of per-connection
Abort signal now flows explicitly through method inputs instead of being bound to the connection:
Route → Storage/ObjectStorage → DB method → .abortOnSignal(signal) per query
What changed:
setAbortSignal()/getAbortSignal()fromTenantConnectiondb.tspluginCancellablebase interface ({ signal?: AbortSignal })withTransactionpropagates signal into child transactions so inner queries auto-inheritAbortControllerworkaround in the uploader that existed to prevent upload finalization from being cancelledSignal routing by route type:
request.signals.disconnect.signalctx.signals.responsectx.signals.bodyreq.upload.signal(set fromreq.signals.disconnect.signal)Intentionally unsignalled operations — catch blocks, upload completion (
completeMultiPartUpload,onUploadFinish), and queue event handlers do notreceive a signal, ensuring they always run to completion regardless of client disconnect.
File renames
src/storage/backend/file.ts→src/storage/backend/file/file-adapter.tssrc/storage/backend/s3/adapter.ts→src/storage/backend/s3/s3-adapter.tssrc/storage/backend/s3/backup.ts→src/storage/backend/s3/s3-backup.tsAdditional context