diff options
Diffstat (limited to 'main.go')
-rw-r--r-- | main.go | 136 |
1 files changed, 72 insertions, 64 deletions
@@ -1,93 +1,101 @@ package main import ( + "context" "encoding/json" + "fmt" "log" - "os" - "os/signal" - "strings" + "log/slog" + "runtime" "time" - "github.com/gorilla/websocket" + _ "github.com/bluesky-social/indigo/api/bsky" + "github.com/bluesky-social/jetstream/pkg/client" + "github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel" + "github.com/bluesky-social/jetstream/pkg/models" "github.com/nats-io/nats.go" ) -type BlueskyPost struct { - Op string `json:"op"` - Repo string `json:"repo"` - Record struct { - Text string `json:"text"` - CreatedAt time.Time `json:"createdAt"` - } `json:"record"` -} - -func createNATSSubject(userID, postType string, timestamp time.Time) string { - parts := []string{"bsky", "feed", "post"} - if userID != "" { - parts = append(parts, "user", userID) - } - if postType != "" { - parts = append(parts, "type", postType) - } - parts = append(parts, "time", timestamp.Format("20060102")) - return strings.Join(parts, ".") -} - func main() { // Connect to NATS nc, err := nats.Connect(nats.DefaultURL) if err != nil { - log.Fatal("Error connecting to NATS:", err) + log.Fatalf("Error connecting to NATS: %v", err) } defer nc.Close() + log.Println("Connected to NATS") - // Connect to WebSocket - c, _, err := websocket.DefaultDialer.Dial("wss://jetstream2.us-east.bsky.network/subscribe", nil) - if err != nil { - log.Fatal("Error connecting to WebSocket:", err) + // Create Jetstream client with proper configuration + config := &client.ClientConfig{ + WebsocketURL: "wss://jetstream2.us-east.bsky.network/subscribe", } - defer c.Close() + logger := slog.Default() + scheduler := parallel.NewScheduler(runtime.NumCPU(), "nats-proxy", logger, func(ctx context.Context, e *models.Event) error { + if e != nil && e.Commit != nil && e.Commit.Collection != "" { + // Get the collection from the commit + collection := e.Commit.Collection + + // Determine the subject based on the collection + subject := fmt.Sprintf("bluesky.%s", collection) - // Handle WebSocket messages - go func() { - for { - _, message, err := c.ReadMessage() + // Convert the event to JSON + jsonData, err := json.Marshal(e) if err != nil { - log.Println("Error reading WebSocket message:", err) - return + log.Printf("Error marshaling event: %v", err) + return err } - var post BlueskyPost - if err := json.Unmarshal(message, &post); err != nil { - log.Println("Error unmarshaling JSON:", err) - continue + // Publish the event to NATS + err = nc.Publish(subject, jsonData) + if err != nil { + log.Printf("Error publishing to NATS: %v", err) + return err } + } - // Extract user ID from repo field - userID := strings.Split(post.Repo, ".")[0] + return nil + }) + log.Println("Created config, logger, scheduler") - // Determine post type (simplified for this example) - postType := "text" - if strings.Contains(post.Record.Text, "http") { - postType = "link" - } + jc, err := client.NewClient(config, logger, scheduler) + if err != nil { + log.Fatalf("Error creating Jetstream client: %v", err) + } - // Create a more detailed NATS subject - subject := createNATSSubject(userID, postType, post.Record.CreatedAt) + cursor := time.Now().UnixMicro() - // Publish the JSON message to NATS with the detailed subject - if err := nc.Publish(subject, message); err != nil { - log.Println("Error publishing to NATS:", err) - } else { - log.Printf("Published message to subject: %s", subject) - } - } - }() + if err := jc.ConnectAndRead(context.Background(), &cursor); err != nil { + log.Fatalf("failed to connect: %v", err) + } + + // // Process events + // err = jc.ConnectAndRead(context.Background(), func(evt *parallel.StreamEvent) error { + + // // Process specific event types + // switch collection { + // case "app.bsky.feed.post": + // var post bsky.FeedPost + // if err := json.Unmarshal(evt.Commit.Record, &post); err != nil { + // log.Printf("Error unmarshaling post: %v", err) + // return nil + // } + // log.Printf("Received post: %s", post.Text) + // case "app.bsky.feed.like": + // var like bsky.FeedLike + // if err := json.Unmarshal(evt.Commit.Record, &like); err != nil { + // log.Printf("Error unmarshaling like: %v", err) + // return nil + // } + // log.Printf("Received like for: %s", like.Subject.Uri) + // } - // Wait for interrupt signal to gracefully shut down - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - <-interrupt + // return nil + // }) - log.Println("Shutting down...") -}
\ No newline at end of file + // if err != nil { + // log.Fatalf("Error reading from Jetstream: %v", err) + // } + + // Keep the program running + select {} +} |