Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions packages/block-brokers/.aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import cors from 'cors'
import polka from 'polka'

/** @type {import('aegir').PartialOptions} */
const options = {
test: {
async before (options) {
const server = polka({
port: 0,
host: '127.0.0.1'
})
server.use(cors())
server.all('/ipfs/bafkreiefnkxuhnq3536qo2i2w3tazvifek4mbbzb6zlq3ouhprjce5c3aq', (req, res) => {
res.writeHead(200, {
'content-type': 'application/octet-stream'
})
res.end(Uint8Array.from([0, 1, 2, 0]))
})

await server.listen()
const { port } = server.server.address()

return {
server,
env: {
TRUSTLESS_GATEWAY: `http://127.0.0.1:${port}`
}
}
},
async after (options, before) {
await before.server.server.close()
}
}
}

export default options
9 changes: 9 additions & 0 deletions packages/block-brokers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,24 @@
"dependencies": {
"@helia/interface": "^4.1.0",
"@libp2p/interface": "^1.1.4",
"@libp2p/utils": "^5.2.6",
"@multiformats/multiaddr-matcher": "^1.2.0",
"@multiformats/multiaddr-to-uri": "^10.0.1",
"interface-blockstore": "^5.2.10",
"ipfs-bitswap": "^20.0.2",
"multiformats": "^13.1.0",
"p-defer": "^4.0.0",
"progress-events": "^1.0.0"
},
"devDependencies": {
"@libp2p/logger": "^4.0.7",
"@libp2p/peer-id-factory": "^4.0.7",
"@multiformats/multiaddr": "^12.1.14",
"@multiformats/uri-to-multiaddr": "^8.0.0",
"@types/sinon": "^17.0.3",
"aegir": "^42.2.5",
"cors": "^2.8.5",
"polka": "^0.5.2",
"sinon": "^17.0.1",
"sinon-ts": "^2.0.0"
}
Expand Down
150 changes: 145 additions & 5 deletions packages/block-brokers/src/trustless-gateway/broker.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,66 @@
import { DEFAULT_SESSION_MIN_PROVIDERS, DEFAULT_SESSION_MAX_PROVIDERS, DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY, DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT } from '@helia/interface'
import { PeerQueue } from '@libp2p/utils/peer-queue'
import { isPrivateIp } from '@libp2p/utils/private-ip'
import { DNS, HTTP, HTTPS } from '@multiformats/multiaddr-matcher'
import { multiaddrToUri } from '@multiformats/multiaddr-to-uri'
import pDefer from 'p-defer'
import { TrustlessGateway } from './trustless-gateway.js'
import { DEFAULT_TRUSTLESS_GATEWAYS } from './index.js'
import type { TrustlessGatewayBlockBrokerInit, TrustlessGatewayComponents, TrustlessGatewayGetBlockProgressEvents } from './index.js'
import type { BlockRetrievalOptions, BlockBroker } from '@helia/interface/blocks'
import type { Routing, BlockRetrievalOptions, BlockBroker, CreateSessionOptions } from '@helia/interface'
import type { Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

export interface CreateTrustlessGatewaySessionOptions extends CreateSessionOptions<TrustlessGatewayGetBlockProgressEvents> {
/**
* Specify the cache control header to send to the remote. 'only-if-cached'
* will prevent the gateway from fetching the content if they don't have it.
*
* @default only-if-cached
*/
cacheControl?: string
Comment on lines +19 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried about defaulting to asking gateways to only return content they have. will read more in the PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to stop them doing what gateways do, e.g. fetch content on your behalf. Otherwise it defeats the purpose of having a session.


/**
* By default we will only connect to peers with HTTPS addresses, pass true
* to also connect to HTTP addresses.
*
* @default false
*/
allowInsecure?: boolean

/**
* By default we will only connect to peers with public or DNS addresses, pass
* true to also connect to private addresses.
*
* @default false
*/
allowLocal?: boolean
}

/**
* A class that accepts a list of trustless gateways that are queried
* for blocks.
*/
export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGatewayGetBlockProgressEvents> {
private readonly components: TrustlessGatewayComponents
private readonly gateways: TrustlessGateway[]
private readonly routing: Routing
private readonly log: Logger

constructor (components: TrustlessGatewayComponents, init: TrustlessGatewayBlockBrokerInit = {}) {
this.components = components
this.log = components.logger.forComponent('helia:trustless-gateway-block-broker')
this.routing = components.routing
this.gateways = (init.gateways ?? DEFAULT_TRUSTLESS_GATEWAYS)
.map((gatewayOrUrl) => {
return new TrustlessGateway(gatewayOrUrl)
return new TrustlessGateway(gatewayOrUrl, components.logger)
})
}

addGateway (gatewayOrUrl: string): void {
this.gateways.push(new TrustlessGateway(gatewayOrUrl, this.components.logger))
}

async retrieve (cid: CID, options: BlockRetrievalOptions<TrustlessGatewayGetBlockProgressEvents> = {}): Promise<Uint8Array> {
// Loop through the gateways until we get a block or run out of gateways
// TODO: switch to toSorted when support is better
Expand All @@ -38,7 +78,7 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
this.log.error('failed to validate block for %c from %s', cid, gateway.url, err)
gateway.incrementInvalidBlocks()

throw new Error(`unable to validate block for CID ${cid} from gateway ${gateway.url}`)
throw new Error(`Block for CID ${cid} from gateway ${gateway.url} failed validation`)
}

return block
Expand All @@ -47,7 +87,7 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
if (err instanceof Error) {
aggregateErrors.push(err)
} else {
aggregateErrors.push(new Error(`unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
aggregateErrors.push(new Error(`Unable to fetch raw block for CID ${cid} from gateway ${gateway.url}`))
}
// if signal was aborted, exit the loop
if (options.signal?.aborted === true) {
Expand All @@ -57,6 +97,106 @@ export class TrustlessGatewayBlockBroker implements BlockBroker<TrustlessGateway
}
}

throw new AggregateError(aggregateErrors, `unable to fetch raw block for CID ${cid} from any gateway`)
if (aggregateErrors.length > 0) {
throw new AggregateError(aggregateErrors, `Unable to fetch raw block for CID ${cid} from any gateway`)
} else {
throw new Error(`Unable to fetch raw block for CID ${cid} from any gateway`)
}
}

async createSession (root: CID, options: CreateTrustlessGatewaySessionOptions = {}): Promise<BlockBroker<TrustlessGatewayGetBlockProgressEvents>> {
const gateways: string[] = []
const minProviders = options.minProviders ?? DEFAULT_SESSION_MIN_PROVIDERS
const maxProviders = options.minProviders ?? DEFAULT_SESSION_MAX_PROVIDERS
const deferred = pDefer<BlockBroker<TrustlessGatewayGetBlockProgressEvents>>()
const broker = new TrustlessGatewayBlockBroker(this.components, {
gateways
})

this.log('finding transport-ipfs-gateway-http providers for cid %c', root)

const queue = new PeerQueue({
concurrency: options.providerQueryConcurrency ?? DEFAULT_SESSION_PROVIDER_QUERY_CONCURRENCY
})

Promise.resolve().then(async () => {
for await (const provider of this.routing.findProviders(root, options)) {
const httpAddresses = provider.multiaddrs.filter(ma => {
if (HTTPS.matches(ma) || (options.allowInsecure === true && HTTP.matches(ma))) {
if (options.allowLocal === true) {
return true
}

if (DNS.matches(ma)) {
return true
}

return isPrivateIp(ma.toOptions().host) === false
}

return false
})

if (httpAddresses.length === 0) {
continue
}

this.log('found transport-ipfs-gateway-http provider %p for cid %c', provider.id, root)

void queue.add(async () => {
for (const ma of httpAddresses) {
let uri: string | undefined

try {
// /ip4/x.x.x.x/tcp/31337/http
// /ip4/x.x.x.x/tcp/31337/https
// etc
uri = multiaddrToUri(ma)

const resource = `${uri}/ipfs/${root.toString()}?format=raw`

// make sure the peer is available - HEAD support doesn't seem to
// be very widely implemented so as long as the remote responds
// we are happy they are valid
// https://specs.ipfs.tech/http-gateways/trustless-gateway/#head-ipfs-cid-path-params

// in the future we should be able to request `${uri}/.well-known/libp2p-http
// and discover an IPFS gateway from $.protocols['/ipfs/gateway'].path
// in the response
// https://github.com/libp2p/specs/pull/508/files
const response = await fetch(resource, {
method: 'HEAD',
headers: {
Accept: 'application/vnd.ipld.raw',
'Cache-Control': options.cacheControl ?? 'only-if-cached'
},
signal: AbortSignal.timeout(options.providerQueryTimeout ?? DEFAULT_SESSION_PROVIDER_QUERY_TIMEOUT)
})

this.log('HEAD %s %d', resource, response.status)
gateways.push(uri)
broker.addGateway(uri)

this.log('found %d transport-ipfs-gateway-http providers for cid %c', gateways.length, root)

if (gateways.length === minProviders) {
deferred.resolve(broker)
}

if (gateways.length === maxProviders) {
queue.clear()
}
} catch (err: any) {
this.log.error('could not fetch %c from %a', root, uri ?? ma, err)
}
}
})
}
})
.catch(err => {
this.log.error('error creating session for %c', root, err)
})

return deferred.promise
}
}
3 changes: 2 additions & 1 deletion packages/block-brokers/src/trustless-gateway/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TrustlessGatewayBlockBroker } from './broker.js'
import type { BlockBroker } from '@helia/interface/src/blocks.js'
import type { Routing, BlockBroker } from '@helia/interface'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressEvent } from 'progress-events'

Expand All @@ -22,6 +22,7 @@ export interface TrustlessGatewayBlockBrokerInit {
}

export interface TrustlessGatewayComponents {
routing: Routing
logger: ComponentLogger
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { ComponentLogger, Logger } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

/**
Expand Down Expand Up @@ -36,8 +37,11 @@ export class TrustlessGateway {
*/
#successes = 0

constructor (url: URL | string) {
private readonly log: Logger

constructor (url: URL | string, logger: ComponentLogger) {
this.url = url instanceof URL ? url : new URL(url)
this.log = logger.forComponent(`helia:trustless-gateway-block-broker:${this.url.hostname}`)
}

/**
Expand Down Expand Up @@ -67,6 +71,9 @@ export class TrustlessGateway {
},
cache: 'force-cache'
})

this.log('GET %s %d', gwUrl, res.status)

if (!res.ok) {
this.#errors++
throw new Error(`unable to fetch raw block for CID ${cid} from gateway ${this.url}`)
Expand Down
55 changes: 48 additions & 7 deletions packages/block-brokers/test/trustless-gateway.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
/* eslint-env mocha */

import { defaultLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { multiaddr } from '@multiformats/multiaddr'
import { uriToMultiaddr } from '@multiformats/uri-to-multiaddr'
import { expect } from 'aegir/chai'
import * as raw from 'multiformats/codecs/raw'
import Sinon from 'sinon'
import { type StubbedInstance, stubConstructor } from 'sinon-ts'
import { type StubbedInstance, stubConstructor, stubInterface } from 'sinon-ts'
import { TrustlessGatewayBlockBroker } from '../src/trustless-gateway/broker.js'
import { TrustlessGateway } from '../src/trustless-gateway/trustless-gateway.js'
import { createBlock } from './fixtures/create-block.js'
import type { BlockBroker } from '@helia/interface/blocks'
import type { Routing } from '@helia/interface'
import type { CID } from 'multiformats/cid'

describe('trustless-gateway-block-broker', () => {
let blocks: Array<{ cid: CID, block: Uint8Array }>
let gatewayBlockBroker: BlockBroker
let gatewayBlockBroker: TrustlessGatewayBlockBroker
let gateways: Array<StubbedInstance<TrustlessGateway>>
let routing: StubbedInstance<Routing>

// take a Record<gatewayIndex, (gateway: StubbedInstance<TrustlessGateway>) => void> and stub the gateways
// Record.default is the default handler
Expand All @@ -29,19 +33,21 @@ describe('trustless-gateway-block-broker', () => {
}

beforeEach(async () => {
routing = stubInterface<Routing>()
blocks = []

for (let i = 0; i < 10; i++) {
blocks.push(await createBlock(raw.code, Uint8Array.from([0, 1, 2, i])))
}

gateways = [
stubConstructor(TrustlessGateway, 'http://localhost:8080'),
stubConstructor(TrustlessGateway, 'http://localhost:8081'),
stubConstructor(TrustlessGateway, 'http://localhost:8082'),
stubConstructor(TrustlessGateway, 'http://localhost:8083')
stubConstructor(TrustlessGateway, 'http://localhost:8080', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8081', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8082', defaultLogger()),
stubConstructor(TrustlessGateway, 'http://localhost:8083', defaultLogger())
]
gatewayBlockBroker = new TrustlessGatewayBlockBroker({
routing,
logger: defaultLogger()
})
// must copy the array because the broker calls .sort which mutates in-place
Expand Down Expand Up @@ -150,4 +156,39 @@ describe('trustless-gateway-block-broker', () => {
expect(gateways[1].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false()
expect(gateways[2].getRawBlock.calledWith(cid1, Sinon.match.any)).to.be.false()
})

it('creates a session', async () => {
routing.findProviders.returns(async function * () {
// non-http provider
yield {
id: await createEd25519PeerId(),
multiaddrs: [
multiaddr('/ip4/132.32.25.6/tcp/1234')
]
}
// expired peer info
yield {
id: await createEd25519PeerId(),
multiaddrs: []
}
// http gateway
yield {
id: await createEd25519PeerId(),
multiaddrs: [
uriToMultiaddr(process.env.TRUSTLESS_GATEWAY ?? '')
]
}
}())

const sessionBlockstore = await gatewayBlockBroker.createSession?.(blocks[0].cid, {
minProviders: 1,
providerQueryConcurrency: 1,
allowInsecure: true,
allowLocal: true
})

expect(sessionBlockstore).to.be.ok()

await expect(sessionBlockstore?.retrieve?.(blocks[0].cid)).to.eventually.deep.equal(blocks[0].block)
})
})