This library for syncing data from the bluesky network.
Forked from @atproto/sync
- Firehose is an authenticated stream of events used to efficiently sync user updates (posts, likes, follows, handle changes, etc).
- Jetstream is a streaming service that consumes Firehose and converts it into lightweight, friendly JSON.
- Turbostream is a real-time, hydrated repeater service built on top of Jetstream.
- JetstreamProxy is a server that proxies Jetstream. Using this with atingester can further reduce network traffic.
- feed-generator is an example of using atingester with cursor.
- flow-measurement-bot is an example of using atingester.
It's basically the same as here.
Make sure to execute await initIngester() exactly once before calling Ingester, Jetstream, or similar classes!
import { Ingester, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'
const run = async () => {
await initIngester()
const ingester = new Ingester('Firehose', {
idResolver: new IdResolver(),
handleEvent: (evt) => {
if (evt.event === 'create') {
console.log(evt.record.text)
}
},
onInfo: (info) => {
console.info(info)
},
onError: (err: Error) => {
console.error(err)
},
/*
getCursor?: () => Awaited<number | undefined>
runner?: EventRunner // should only set getCursor *or* runner
*/
service: 'wss://bsky.network',
subscriptionReconnectDelay: 3000,
unauthenticatedCommits: false,
unauthenticatedHandles: false,
compress: true,
filterCollections: ['app.bsky.feed.post'],
filterDids: ['did:plc:abcde....'],
excludeIdentity: true,
excludeAccount: true,
excludeCommit: false,
excludeSync: true,
})
ingester.start()
await ingester.destroy()
}
run()import { Firehose, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'
const run = async () => {
await initIngester()
const firehose = new Firehose({
idResolver: new IdResolver(),
handleEvent: (evt) => {
if (evt.event === 'create') {
console.log(evt.record.text)
}
},
onInfo: (info) => {
console.info(info)
},
onError: (err: Error) => {
console.error(err)
},
service: 'wss://bsky.network',
filterCollections: ['app.bsky.feed.post'],
filterDids: ['did:plc:abcde....'],
excludeIdentity: true,
excludeAccount: true,
excludeSync: true,
})
firehose.start()
await firehose.destroy()
}
run()import { Jetstream, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'
const run = async () => {
await initIngester()
const jetstream = new Jetstream({
idResolver: new IdResolver(),
handleEvent: (evt) => {
if (evt.event === 'create') {
console.log(evt.record.text)
}
},
onInfo: (info) => {
console.info(info)
},
onError: (err: Error) => {
console.error(err)
},
service: 'wss://jetstream1.us-east.bsky.network',
compress: true,
filterCollections: ['app.bsky.feed.post'],
filterDids: ['did:plc:abcde....'],
excludeIdentity: true,
excludeAccount: true,
})
jetstream.start()
await jetstream.destroy()
}
run()import { initIngester, Turbostream } from 'atingester'
const run = async () => {
await initIngester()
const turbostream = new Turbostream({
handleEvent: (evt) => {
if (evt.event === 'create') {
console.log(evt.record.text)
}
},
onInfo: (info) => {
console.info(info)
},
onError: (err: Error) => {
console.error(err)
},
service: 'wss://api.graze.social',
filterDids: ['did:plc:abcde....'],
})
turbostream.start()
await turbostream.destroy()
}
run()