Skip to content

Comments

Improved adapter interface and explicit signal cancellation#854

Open
fenos wants to merge 3 commits intomasterfrom
improvement/adapter-interface
Open

Improved adapter interface and explicit signal cancellation#854
fenos wants to merge 3 commits intomasterfrom
improvement/adapter-interface

Conversation

@fenos
Copy link
Contributor

@fenos fenos commented Feb 9, 2026

What kind of change does this PR introduce?

Refactor

What is the current behavior?

  • Backend adapter methods (S3Backend, FileBackend) use positional parameters (e.g., read(bucket, key, version, headers, signal)), making call sites hard to read and fragile when parameters change.
  • Database methods use positional parameters or inline Pick<> types with no named input interfaces.
  • Abort signal is set once at the connection level via TenantConnection.setAbortSignal(), meaning all queries on a connection are cancellable, including cleanup queries in catch blocks, upload completion transactions, and internal maintenance queries. This has caused bugs where critical operations (like finalizing an upload) get cancelled when a client disconnects mid-request.
  • Storage and ObjectStorage methods use positional parameters.

What is the new behavior?

Single object parameter pattern with named input interfaces

All layers now use a single object parameter with named input interfaces:

Layer Before After
Backend read(bucket, key, version, headers) read({ bucket, key, version, headers })
Database findBucketById(id, columns, filters) findBucketById({ bucketId, columns, filters })
Storage findBucket(id, columns) findBucket({ bucketId, columns })
ObjectStorage deleteObject(name) deleteObject({ objectName })

Input interfaces are defined in:

  • src/storage/backend/adapter.tsReadObjectInput, WriteObjectInput, CopyObjectInput, etc.
  • src/storage/database/adapter.tsFindBucketByIdInput, DeleteObjectInput, CreateMultipartUploadInput, etc.

Per-query abort signal instead of per-connection

Abort signal now flows explicitly through method inputs instead of being bound to the connection:

Route → Storage/ObjectStorage → DB method → .abortOnSignal(signal) per query

What changed:

  • Removed setAbortSignal() / getAbortSignal() from TenantConnection
  • Removed connection-level signal setup from the db.ts plugin
  • All DB input interfaces extend a Cancellable base interface ({ signal?: AbortSignal })
  • withTransaction propagates signal into child transactions so inner queries auto-inherit
  • Removed the AbortController workaround in the uploader that existed to prevent upload finalization from being cancelled

Signal routing by route type:

Route type Signal source
Standard REST routes request.signals.disconnect.signal
S3 reads/lists/deletes ctx.signals.response
S3 uploads ctx.signals.body
TUS routes req.upload.signal (set from req.signals.disconnect.signal)

Intentionally unsignalled operations — catch blocks, upload completion (completeMultiPartUpload, onUploadFinish), and queue event handlers do not
receive a signal, ensuring they always run to completion regardless of client disconnect.

File renames

  • src/storage/backend/file.tssrc/storage/backend/file/file-adapter.ts
  • src/storage/backend/s3/adapter.tssrc/storage/backend/s3/s3-adapter.ts
  • src/storage/backend/s3/backup.tssrc/storage/backend/s3/s3-backup.ts

Additional context

  • Tests updated to match new single-object-parameter mock assertions

@coveralls
Copy link

Pull Request Test Coverage Report for Build 21828045238

Details

  • 2289 of 2593 (88.28%) changed or added relevant lines in 72 files are covered.
  • 12 unchanged lines in 6 files lost coverage.
  • Overall coverage increased (+0.5%) to 76.378%

Changes Missing Coverage Covered Lines Changed/Added Lines %
src/http/routes/s3/commands/list-parts.ts 11 12 91.67%
src/http/routes/tus/index.ts 7 8 87.5%
src/storage/backend/index.ts 3 4 75.0%
src/storage/events/iceberg/delete-iceberg-resources.ts 0 1 0.0%
src/storage/protocols/tus/file-store.ts 2 3 66.67%
src/storage/events/lifecycle/bucket-deleted.ts 0 2 0.0%
src/storage/backend/adapter.ts 135 138 97.83%
src/storage/scanner/scanner.ts 19 23 82.61%
src/http/routes/object/getObject.ts 12 17 70.59%
src/http/routes/s3/commands/abort-multipart-upload.ts 8 13 61.54%
Files with Coverage Reduction New Missed Lines %
src/http/routes/s3/commands/get-object.ts 1 73.65%
src/http/routes/tus/index.ts 1 85.08%
src/internal/database/connection.ts 1 72.06%
src/storage/object.ts 1 91.56%
src/internal/database/tenant.ts 2 83.65%
src/storage/database/knex.ts 6 81.92%
Totals Coverage Status
Change from base Build 21826996998: 0.5%
Covered Lines: 26731
Relevant Lines: 34695

💛 - Coveralls

}
this.filePath = path.isAbsolute(storageFilePath)
? storageFilePath
: path.resolve(__dirname, '..', '..', '..', storageFilePath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

due to nesting, this location would change. Is it intentional?

cacheControl: cacheControl || 'no-cache',
mimetype: contentType || 'application/octet-stream',
eTag,
lastModified: data.birthtime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculated value isn't used, wrong value here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added #858 to address with a test

Comment on lines 73 to 96
'getObject',
'putObject',
'deleteObject',
'listObjects',
'copyObject',
'headObject',
'createMultipartUpload',
'uploadPart',
'completeMultipartUpload',
'abortMultipartUpload',
'listMultipartUploads',
'listParts',
'getSignedUrl',
'createBucket',
'deleteBucket',
'listBuckets',
'getBucketLocation',
'getBucketVersioning',
'putBucketVersioning',
'getBucketLifecycleConfiguration',
'putBucketLifecycleConfiguration',
'deleteBucketLifecycle',
'uploadObject',
'privateAssetUrl',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since methods changed, we need to update this list

Comment on lines 464 to +467
const locationParts = key.split('/')
locationParts.shift() // tenant-id

bucket = keyParts.shift() || ''
resultBucket = keyParts.shift() || ''
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use locationParts here or remove it. Using smaller scope is better though

: undefined,
CopySourceIfNoneMatch: req.Headers['x-amz-copy-source-if-none-match'],
CopySourceIfUnmodifiedSince: req.Headers['x-amz-copy-source-if-unmodified-since']
? new Date(req.Headers['x-amz-copy-source-if-unmodified-since'])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably we need to track validating and short circuiting all these headers

const listPartsInput = new ListPartsCommand({
Bucket: bucketName,
Bucket: bucket,
Key: version ? key + '/' + version : key,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not relevant to this diff

Below we're using a helper withOptionalVersion but not here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also line 448

CopySource: `${storageS3Bucket}/${withOptionalVersion(sourceKey, sourceKeyVersion)}`,
UploadId: uploadId,
PartNumber: partNumber,
CopySource: `${bucket}/${withOptionalVersion(sourceKey, sourceKeyVersion)}`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

url encoding is missing here

bucketId: string
options?: {
prefix?: string
deltimeter?: string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delimeter

createAnalyticsBucket(data: Pick<Bucket, 'name'>): Promise<IcebergCatalog>
createAnalyticsBucket(input: CreateAnalyticsBucketInput): Promise<IcebergCatalog>

findBucketById<Filters extends FindBucketFilters = FindObjectFilters>(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it supposed to be FindBucketFilters for default?

headers?: BrowserCacheHeaders,
signal?: AbortSignal
): Promise<ObjectResponse> {
async read(input: ReadObjectInput): Promise<ObjectResponse> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error messages don't match method name anymore

Comment on lines +53 to 58
const tenantBuckets = await storage.listAnalyticsBuckets({
columns: 'id,name',
options: {
limit: 1000,
},
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs pagination, right? For now, it might be fine but it will break at some point

Comment on lines +93 to 97
key: storage.location.getKeyLocation({
tenantId,
bucketId: job.data.bucketId,
objectName: job.data.name,
}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be s3Key directly

Comment on lines +79 to +82
sortBy: {
column: sortBy?.column,
order: sortBy?.order,
},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sortBy: {
column: sortBy?.column,
order: sortBy?.order,
},
sortBy,

retry?: number
readOnly?: boolean
timeout?: number
signal?: AbortSignal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extends Cancellable?

tnx?: TNX
parentTnx?: TNX
parentConnection?: TenantConnection
signal?: AbortSignal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one to extend cancellable as well

Comment on lines 334 to 340
async listParts(
bucket: string,
key: string,
uploadId?: string,
maxParts?: number,
marker?: string
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar pattern for this method?

expect(S3Backend.prototype.copyObject).toHaveBeenCalled()
expect(S3Backend.prototype.deleteObjects).toHaveBeenCalled()
expect(S3Backend.prototype.copy).toHaveBeenCalled()
expect(S3Backend.prototype.removeMany).toHaveBeenCalled()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

success is checking removeMany but failures below are checking for remove mistakenly

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants