package main import ( "context" "encoding/json" "fmt" "log" "log/slog" "os" "os/signal" "runtime" "time" _ "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" "github.com/bluesky-social/jetstream/pkg/models" "github.com/nats-io/nats.go" ) func main() { // Connect to NATS nc, err := nats.Connect(nats.DefaultURL) if err != nil { log.Fatalf("Error connecting to NATS: %v", err) } defer nc.Close() log.Println("Connected to NATS") // Create Jetstream client with proper configuration config := &client.ClientConfig{ WebsocketURL: "wss://jetstream2.us-east.bsky.network/subscribe", } logger := slog.Default() scheduler := parallel.NewScheduler(runtime.NumCPU(), "nats-proxy", logger, func(ctx context.Context, e *models.Event) error { if e != nil && e.Commit != nil && e.Commit.Collection != "" { // Get the collection from the commit collection := e.Commit.Collection // Determine the subject based on the collection subject := fmt.Sprintf("bluesky.%s", collection) // Convert the event to JSON jsonData, err := json.Marshal(e) if err != nil { log.Printf("Error marshaling event: %v", err) return err } // Publish the event to NATS err = nc.Publish(subject, jsonData) if err != nil { log.Printf("Error publishing to NATS: %v", err) return err } } return nil }) log.Println("Created config, logger, scheduler") jc, err := client.NewClient(config, logger, scheduler) if err != nil { log.Fatalf("Error creating Jetstream client: %v", err) } cursor := time.Now().UnixMicro() if err := jc.ConnectAndRead(context.Background(), &cursor); err != nil { log.Fatalf("failed to connect: %v", err) } if err != nil { log.Fatalf("Error reading from Jetstream: %v", err) } // Wait for interrupt signal to gracefully shut down interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) <-interrupt log.Println("Shutting down...") }