Skip to content

raitako-1/atingester

Repository files navigation

atingester: Bluesky Syncing Tools

This library for syncing data from the bluesky network.

Forked from @atproto/sync

NPM Github CI Status

This tool can subscribes...

  • Firehose is an authenticated stream of events used to efficiently sync user updates (posts, likes, follows, handle changes, etc).
  • Jetstream is a streaming service that consumes Firehose and converts it into lightweight, friendly JSON.
  • Turbostream is a real-time, hydrated repeater service built on top of Jetstream.

Links

Usage

It's basically the same as here.

Make sure to execute await initIngester() exactly once before calling Ingester, Jetstream, or similar classes!

Ingester ( Firehose / Jetstream / Turbostream )

import { Ingester, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'

const run = async () => {
  await initIngester()

  const ingester = new Ingester('Firehose', {
    idResolver: new IdResolver(),
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    /*
    getCursor?: () => Awaited<number | undefined>
    runner?: EventRunner // should only set getCursor *or* runner
    */
    service: 'wss://bsky.network',
    subscriptionReconnectDelay: 3000,
    unauthenticatedCommits: false,
    unauthenticatedHandles: false,
    compress: true,
    filterCollections: ['app.bsky.feed.post'],
    filterDids: ['did:plc:abcde....'],
    excludeIdentity: true,
    excludeAccount: true,
    excludeCommit: false,
    excludeSync: true,
  })

  ingester.start()

  await ingester.destroy()
}

run()

Firehose

import { Firehose, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'

const run = async () => {
  await initIngester()

  const firehose = new Firehose({
    idResolver: new IdResolver(),
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    service: 'wss://bsky.network',
    filterCollections: ['app.bsky.feed.post'],
    filterDids: ['did:plc:abcde....'],
    excludeIdentity: true,
    excludeAccount: true,
    excludeSync: true,
  })

  firehose.start()

  await firehose.destroy()
}

run()

Jetstream

import { Jetstream, initIngester } from 'atingester'
import { IdResolver } from '@atproto/identity'

const run = async () => {
  await initIngester()

  const jetstream = new Jetstream({
    idResolver: new IdResolver(),
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    service: 'wss://jetstream1.us-east.bsky.network',
    compress: true,
    filterCollections: ['app.bsky.feed.post'],
    filterDids: ['did:plc:abcde....'],
    excludeIdentity: true,
    excludeAccount: true,
  })

  jetstream.start()

  await jetstream.destroy()
}

run()

Turbostream

import { initIngester, Turbostream } from 'atingester'

const run = async () => {
  await initIngester()

  const turbostream = new Turbostream({
    handleEvent: (evt) => {
      if (evt.event === 'create') {
        console.log(evt.record.text)
      }
    },
    onInfo: (info) => {
      console.info(info)
    },
    onError: (err: Error) => {
      console.error(err)
    },
    service: 'wss://api.graze.social',
    filterDids: ['did:plc:abcde....'],
  })

  turbostream.start()

  await turbostream.destroy()
}

run()

Releases

No releases published

Packages

No packages published