package main import ( "context" "encoding/json" "fmt" "log" "log/slog" "runtime" "time" _ "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" "github.com/bluesky-social/jetstream/pkg/models" "github.com/nats-io/nats.go" ) func main() { // Connect to NATS nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatalf("Error connecting to NATS: %v", err) } defer nc.Close() log.Println("Connected to NATS") // Create Jetstream client with proper configuration config := &client.ClientConfig{ WebsocketURL: "wss://jetstream2.us-east.bsky.network/subscribe", } logger := slog.Default() scheduler := parallel.NewScheduler(runtime.NumCPU(), "nats-proxy", logger, func(ctx context.Context, e *models.Event) error { if e != nil && e.Commit != nil && e.Commit.Collection != "" { // Get the collection from the commit collection := e.Commit.Collection // Determine the subject based on the collection subject := fmt.Sprintf("bluesky.%s", collection) // Convert the event to JSON jsonData, err := json.Marshal(e) if err != nil { log.Printf("Error marshaling event: %v", err) return err } // Publish the event to NATS err = nc.Publish(subject, jsonData) if err != nil { log.Printf("Error publishing to NATS: %v", err) return err } } return nil }) log.Println("Created config, logger, scheduler") jc, err := client.NewClient(config, logger, scheduler) if err != nil { log.Fatalf("Error creating Jetstream client: %v", err) } cursor := time.Now().UnixMicro() if err := jc.ConnectAndRead(context.Background(), &cursor); err != nil { log.Fatalf("failed to connect: %v", err) } // // Process events // err = jc.ConnectAndRead(context.Background(), func(evt *parallel.StreamEvent) error { // // Process specific event types // switch collection { // case "app.bsky.feed.post": // var post bsky.FeedPost // if err := json.Unmarshal(evt.Commit.Record, &post); err != nil { // log.Printf("Error unmarshaling post: %v", err) // return nil // } // log.Printf("Received post: %s", post.Text) // case "app.bsky.feed.like": // var like bsky.FeedLike // if err := json.Unmarshal(evt.Commit.Record, &like); err != nil { // log.Printf("Error unmarshaling like: %v", err) // return nil // } // log.Printf("Received like for: %s", like.Subject.Uri) // } // return nil // }) // if err != nil { // log.Fatalf("Error reading from Jetstream: %v", err) // } // Keep the program running select {} }