medhir.com

a more complete bluesky feed generator

Bluesky, the recently trending Twitter-like platform, offers a powerful promise for the future of social media. Especially in modern times, when our attention is a battleground for large corporations and nation states alike, we really need better tools to make our information diets more transparent.

a couple yellow birds perched on sunflowers in front of a bright blue sky

Bluesky offers quite a few of those tools, including:

  • An event firehose that allows anyone with an Internet connection to inspect real time interactions on the network
  • The ability for developers to create new custom feeds, giving users choice on what algorithms they use to filter content on the network.
  • "Stackable" moderation through labelers users can subscribe to customize how content is displayed.

I used these tools to create Alt Text Hotties, a feed encouraging accessibility best practices by displaying selfies that include alt text from across the entire network.

While the team has offered up some boilerplate templates to help developers get started with creating their own custom feeds, I didn't find anything that could be considered "feature complete", and wanted to share the learnings from developing my own custom feed.

I started with this minimal Go-based template (thank you Jaz!) and extended the functionality to create dynamic feeds. This fork is published on GitHub and the rest of this post serves as a deep-dive for anyone that wishes to use this template to create their own feeds.

Before diving into the code, we'll begin with an overview of how the protocol Bluesky is built on top of works.

the AT protocol

Bluesky is built on top of the Authenticated Transfer protocol, or atproto for short. In theory, atproto can be used for a range of social media apps (not just a Twitter clone) and potentially pave the way towards decentralized social media becoming mainstream.

In practice, the protocol is a work in progress and there are a few limitations today that need to be addressed.

Despite the caveats — Bluesky is one of the few places with decent enough network effects to make it interesting, and atproto provides a radically open view into the network’s machinery in a way that other social media platforms do not.

Each user on atproto is represented by a unique Decentralized Identifier (DID) and has their own Personal Data Server (PDS) that stores all their interactions on the network.

diagram of a user storing account activity in their PDS. this can include things such as likes, follows, post creation, deletion, etc. Users interact with the network my committing data records to their PDS

Relays are responsible for monitoring every PDS and providing a synchronized view of activity across the network. These relays are what provides the “firehose” of Bluesky data.

diagram depicting the flow of data between multiple repositories, a single relay, and multiple clients subscribing to the data firehose provided by the relay data flow from multiple PDS instances and a relay

Subscribing to one of these relays is where we start the process of generating a feed.

working with the event firehose

Rather than listening to the firehose directly, the project makes use of jetstream, which provides a simplified JSON view of events streaming through Bluesky’s relays. Using jetstream drastically decreases the overall bandwidth flowing through the service and is sufficient for the purposes of creating a custom feed.

In service/pkg/stream/subscriber.go, we connect to jetstream through a web socket connection at the following URI:

const jetstreamUri = "wss://jetstream.atproto.tools/subscribe"

We use the client interface to connect to the web socket and start reading events from the stream.

func (s *subscriber) connect() error {
	config := client.DefaultClientConfig()
	config.WebsocketURL = jetstreamUri
	config.Compress = true
	s.sched = sequential.NewScheduler("jetstream", s.log, func(ctx context.Context, event *models.Event) error {
		return s.handleCommit(ctx, event, s.db)
	})
	c, err := client.NewClient(config, s.log, s.sched)
	if err != nil {
		s.log.Warn(fmt.Sprintf("failed to create client: %s", err.Error()))
		return err
	}
	// Every 5 seconds print the events read and bytes read and average event size
	go func() { ... }()
	return c.ConnectAndRead(s.ctx, nil)
}

Notice the call to s.handleCommit(). This is within a callback function that gets called for every event that is received through the websocket connection.

Take a look at the handleCommit implementation. Here, we inspect the commit and make decisions based on what kind of event it is. This is apparent with the switch statements on event.Commit.Operation and event.Commit.Collection.

There's several constants defined for the collection types we're interested in processing:

const (
	CollectionKindFeedPost    = "app.bsky.feed.post"
	CollectionKindFeedRepost  = "app.bsky.feed.repost"
	CollectionKindFeedLike    = "app.bsky.feed.like"
)

These constants describe the different types of data records that Bluesky uses as defined by atproto’s Lexicon schema language. Seem familiar? These are the same records that are committed to a user’s PDS.

commit handlers

Now take a look at pkg/stream/handlers.go.

Here, we have a variety of methods that handle events dependent on what commit operation it is and what collection type that commit is for. For instance, here’s the handler for post deletion:

func (s *subscriber) handleDeletePost(event *models.Event) error {
	rkey := event.Commit.RKey
	if err := s.db.DeletePost(rkey); err != nil {
		s.log.Warn(fmt.Sprintf("failed to delete post from DB: %s", err.Error()))
		return err
	}
	return nil
}

These can be modified to your liking depending on how you want to structure your feed.

post classification and persistence

The handleCreatePost method contains most of the interesting logic. In this method, we:

  • inspect a post's contents
  • check if the post contains images
  • if yes, run those images through a classifier
  • saving matching posts in a database

Since most machine learning tools (like PyTorch, transformers, etc) exist in the Python ecosystem, we wrap calls to the image classification model in a separate container as a REST interface.

The Go service makes POST calls to the classifier at http://classifier:12000/classify, passing in an image_url parameter. When called, the Python service loads the image and runs it through a transformer-based classifier.

In this example, we use the CLIP transformer model in classifier/app.py to label images bird or not_bird:

def classify_bird(image):
    """Classify if image contains a bird using CLIP."""
    inputs = processor(
        images=image,
        text=["a photo containing a bird", "a photo not containing a bird"],
        return_tensors="pt",
        padding=True
    ).to(device)
 
    with torch.no_grad():
        outputs = model(**inputs)
        logits_per_image = outputs.logits_per_image
        probs = torch.nn.functional.softmax(logits_per_image, dim=1)
 
        is_bird = probs[0][0].item() > probs[0][1].item()
        label = 'bird' if is_bird else 'not_bird'
        confidence = float(max(probs[0]).item())
 
        return label, confidence

There is another method in app.py called classify_image that serves as our POST request handler, and returns JSON in the response that looks like this:

{
    “label”: “is_bird”,
    “confidence”: 0.89
}

handleCreatePost will check this response for the proper label, and save posts if the confidence exceeds 85%:

for _, img := range post.Embed.EmbedImages.Images {
	response, err := s.classify(event.Did, img)
	if err != nil {
		s.log.Warn(fmt.Sprintf("failed to classify image: %s", err.Error()))
		continue
	}
	// if post contains picture with high confidence, add to DB
	if response.Label == "bird" && response.Confidence > 0.85 {
		did := event.Did
		rkey := event.Commit.RKey
		postURL := fmt.Sprintf("https://bsky.app/profile/%s/post/%s", did, rkey)
		s.log.Info("Bird Identified")
		s.log.Info(fmt.Sprintf("Post URL: %s", postURL))
		s.log.Info(fmt.Sprintf("Confidence: %f", response.Confidence))
		err := s.db.AddPost(did, rkey, postURL)
		if err != nil {
			s.log.Warn(fmt.Sprintf("failed to add post to DB: %s", err.Error()))
			continue
		}
		s.log.Info(fmt.Sprintf("Added post to DB: %s", rkey))
		// only add one record per post, skip other images
		break
	}
}

serving feeds

Within the feedrouter package, the interface we expect every feed to satisfy is defined:

type Feed interface {
	GetPage(ctx context.Context, feed string, userDID string, limit int64, cursor string) (feedPosts []*appbsky.FeedDefs_SkeletonFeedPost, newCursor *string, err error)
	Describe(ctx context.Context) ([]appbsky.FeedDescribeFeedGenerator_Feed, error)
}

To create a dynamic feed, we'll retrieve the posts previously persisted from the firehose, matching our "is it a bird" criteria.

We've already seen examples of leveraging the db package to add / remove posts from the firehose. Let's take a look at the interface:

type DB interface {
	AddPost(did, rkey, uri string) error
	DeletePost(rkey string) error
	AddLike(did, rkey, postRkey string) error
	DeleteLike(rkey string) error
	AddRepost(did, rkey, postRkey string) error
	DeleteRepost(rkey string) error
 
	MostRecentWithCursor(limit int64, cursor int64) ([]string, error)
	MostPopularWithCursor(limit int64, cursor int64) ([]string, error)
}

The last two methods are what we'll use to retrieve posts. Notice how these methods expect limit and cursor integers. These are same values passed into GetPage to serve a feed with pagination.

Now let's register two dynamic feeds leveraging these db functions in our main program. We'll use the NewDynamicFeed method in main.go, like so:

dbInstance, err := db.NewDB(ctx)
if err != nil {
	log.Fatalf("Failed to create DB: %v", err)
}
logger := slog.Default()
 
// register dynamic feeds
justBirdsFeed, justBirdsFeedAliases := dynamic.NewDynamicFeed(ctx, feedActorDID, "JustBirds", dbInstance.MostRecentWithCursor, logger)
feedRouter.AddFeed(justBirdsFeedAliases, justBirdsFeed)
mostPopularBirds, mostPopularBirdsAliases := dynamic.NewDynamicFeed(ctx, feedActorDID, "MostPopularBirds", dbInstance.MostPopularWithCursor, logger)
feedRouter.AddFeed(mostPopularBirdsAliases, mostPopularBirds)

Taking a look at the first feed we register, the returned justBirdsFeed contains both GetPage and Describe methods to satisfy the feed interface, which can then be leveraged by feedRouter to serve up our newly implemented feeds.

testing

To run the feed generator, the main dependency you'll need is Docker. With Docker installed, create a .env file based on .env.example. For now, the main variable to change is FEED_ACTOR_DID, which will be the DID associated with the feed you're publishing.

You can find the DID for your Bluesky handle with the following command:

curl "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=your_handle.bsky.social"

With FEED_ACTOR_DID populated, you can start the feed generator's services by running:

make up

Then you can see which posts get returned by one of the dynamic feeds like so:

curl "http://localhost:9032/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://did:plc:replace-me-with-your-did/app.bsky.feed.generator/JustBirds"

This will return a list of AT URIs and a cursor. The AT URI looks something like this:

at://did:plc:abc123/app.bsky.feed.post/jxi27dkl5

To see the post on Bluesky, use the DID and the post record to format the URL in your browser like so:

https://bsky.app/profile/did:plc:abc123]/post/jxi27dkl5

deployment

While creating a full cloud deployment is outside the scope of this article, examining AT URIs isn't as fun as seeing the feed in Bluesky's app.

If you'd like to see the feed on Bluesky without a full deployment, you can use a proxy to provide a public endpoint to the local feedgen service. Here, we'll use ngrok.

Once you have ngrok installed and created an account, register an auth token (from the account page) like so:

ngrok config add-authtoken your-auth-token

Then, you can create a tunnel to your local feedgen service by running:

ngrok http http://localhost:9032

This will create a secure endpoint that forwards traffic to your local feedgen service:

screenshot of ngrok running, showing the https endpoint that forwards traffic to the local feed generator

This https URL will serve as the SERVICE_ENDPOINT found in .env. Once you've set this variable, you can use these instructions in the official Bluesky feed generator template to publish your feed.

And that's about it. Hopefully you found this guide useful and a reasonable starting point to develop your own custom feeds. If you did find it useful, be sure to give a star to the repo on Github. Happy developing!

references