1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"log"
"log/slog"
"os"
"os/signal"
"runtime"
"time"
_ "github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/jetstream/pkg/client"
"github.com/bluesky-social/jetstream/pkg/client/schedulers/parallel"
"github.com/bluesky-social/jetstream/pkg/models"
"github.com/nats-io/nats.go"
)
// printHelp outputs the usage information.
func printHelp() {
fmt.Println("Usage: HTTP-to-NATS proxy server")
fmt.Println("\nThe following environment variables are supported:")
fmt.Println(" NATS_URL - NATS connection URL (default: nats://127.0.0.1:4222)")
fmt.Println(" NATS_USER - NATS username for authentication (optional)")
fmt.Println(" NATS_PASSWORD - NATS password for authentication (optional)")
fmt.Println(" NATS_TOKEN - NATS token for authentication (optional)")
fmt.Println(" NATS_NKEY - NATS NKEY for authentication (optional)")
fmt.Println(" NATS_NKEY_SEED - NATS NKEY seed for authentication (optional)")
fmt.Println(" NATS_CREDS_FILE - Path to NATS credentials file (optional)")
}
func main() {
helpFlag := flag.Bool("help", false, "Display help information about available environment variables")
flag.Parse()
if *helpFlag {
printHelp()
os.Exit(0)
}
// Read NATS connection info from environment variables
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL // defaults to "nats://127.0.0.1:4222"
}
natsUser := os.Getenv("NATS_USER")
natsPassword := os.Getenv("NATS_PASSWORD")
natsToken := os.Getenv("NATS_TOKEN")
natsNkey := os.Getenv("NATS_NKEY")
natsNkeySeed := os.Getenv("NATS_NKEY_SEED")
natsCredsFile := os.Getenv("NATS_CREDS_FILE")
// Set up NATS connection options
opts := []nats.Option{nats.Name("Web proxy")}
if natsUser != "" && natsPassword != "" {
opts = append(opts, nats.UserInfo(natsUser, natsPassword))
} else if natsToken != "" {
opts = append(opts, nats.Token(natsToken))
} else if natsNkey != "" && natsNkeySeed != "" {
log.Fatalln("NKEY connection not supported")
} else if natsCredsFile != "" {
opts = append(opts, nats.UserCredentials(natsCredsFile))
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
log.Fatal("Error connecting to NATS:", err)
}
defer nc.Close()
log.Println("Connected to NATS")
// Create Jetstream client with proper configuration
config := &client.ClientConfig{
WebsocketURL: "wss://jetstream1.us-east.bsky.network/subscribe",
}
logger := slog.Default()
scheduler := parallel.NewScheduler(runtime.NumCPU(), "nats-proxy", logger, func(ctx context.Context, e *models.Event) error {
if e != nil && e.Commit != nil && e.Commit.Collection != "" {
// Get the collection from the commit
collection := e.Commit.Collection
// Determine the subject based on the collection
subject := fmt.Sprintf("bluesky.%s", collection)
// Convert the event to JSON
jsonData, err := json.Marshal(e)
if err != nil {
log.Printf("Error marshaling event: %v", err)
return err
}
// Publish the event to NATS
err = nc.Publish(subject, jsonData)
if err != nil {
log.Printf("Error publishing to NATS: %v", err)
return err
}
}
return nil
})
log.Println("Created config, logger, scheduler")
go func() {
cursor := time.Now().UnixMicro()
totalServers := 2 // See: https://github.com/bluesky-social/jetstream?tab=readme-ov-file#public-instances
for i := 0; true; i++ {
jc, err := client.NewClient(config, logger, scheduler)
if err != nil {
log.Fatalf("Error creating Jetstream client: %v", err)
}
if err := jc.ConnectAndRead(context.Background(), &cursor); err != nil {
log.Printf("failed to connect: %v\n", err)
}
time.Sleep(1 * time.Second)
// alternate between available jetstream servers
config.WebsocketURL = fmt.Sprintf("wss://jetstream%d.us-east.bsky.network/subscribe", i%totalServers+1)
log.Printf("connecting to %s instead\n", config.WebsocketURL)
}
}()
// Wait for interrupt signal to gracefully shut down
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
<-interrupt
log.Println("Shutting down...")
}
|