1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
|
package main
import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/micro"
)
// printHelp outputs the usage information.
func printHelp() {
fmt.Println("Usage: HTTP-to-NATS proxy server")
fmt.Println("\nThe following environment variables are supported:")
fmt.Println(" NATS_URL - NATS connection URL (default: nats://127.0.0.1:4222)")
fmt.Println(" NATS_USER - NATS username for authentication (optional)")
fmt.Println(" NATS_PASSWORD - NATS password for authentication (optional)")
fmt.Println(" NATS_TOKEN - NATS token for authentication (optional)")
fmt.Println(" NATS_NKEY - NATS NKEY for authentication (optional)")
fmt.Println(" NATS_NKEY_SEED - NATS NKEY seed for authentication (optional)")
fmt.Println(" NATS_CREDS_FILE - Path to NATS credentials file (optional)")
fmt.Println(" NATS_INBOX_PREFIX - Subject prefix for NATS messages (default: _INBOX)")
fmt.Println(" HTTP_PORT - HTTP port to listen on (default: 8080)")
}
// URL to NATS subject conversion
func URLToNATS(urlPath string) (string, error) {
segments := strings.Split(strings.Trim(urlPath, "/"), "/")
for i, seg := range segments {
// Decode existing encoding first to prevent double-encoding
unescaped, err := url.PathUnescape(seg)
if err != nil {
return "", fmt.Errorf("failed to unescape segment: %w", err)
}
// Encode special NATS-sensitive characters
encoded := url.PathEscape(unescaped)
encoded = strings.ReplaceAll(encoded, ".", "%2E") // Critical for token separation
encoded = strings.ReplaceAll(encoded, "*", "%2A") // Wildcard protection
encoded = strings.ReplaceAll(encoded, ">", "%3E") // Wildcard protection
segments[i] = encoded
}
return strings.Join(segments, "."), nil
}
// NATS subject to URL conversion
func NATSToURL(natsSubject string) (string, error) {
tokens := strings.Split(natsSubject, ".")
for i, token := range tokens {
// Reverse the special character encoding
decoded := strings.ReplaceAll(token, "%2E", ".")
decoded = strings.ReplaceAll(decoded, "%2A", "*")
decoded = strings.ReplaceAll(decoded, "%3E", ">")
// Unescape remaining URL encoding
unescaped, err := url.PathUnescape(decoded)
if err != nil {
return "", fmt.Errorf("failed to unescape token: %w", err)
}
tokens[i] = unescaped
}
return "/" + strings.Join(tokens, "/"), nil
}
func main() {
helpFlag := flag.Bool("help", false, "Display help information about available environment variables")
flag.Parse()
if *helpFlag {
printHelp()
os.Exit(0)
}
// Read NATS connection info from environment variables
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL // defaults to "nats://127.0.0.1:4222"
}
natsUser := os.Getenv("NATS_USER")
natsPassword := os.Getenv("NATS_PASSWORD")
natsToken := os.Getenv("NATS_TOKEN")
natsNkey := os.Getenv("NATS_NKEY")
natsNkeySeed := os.Getenv("NATS_NKEY_SEED")
natsCredsFile := os.Getenv("NATS_CREDS_FILE")
natsInboxPrefix := os.Getenv("NATS_INBOX_PREFIX")
// Read HTTP port from environment variables
httpPort := os.Getenv("HTTP_PORT")
if httpPort == "" {
httpPort = "8080"
}
// Set up NATS connection options
opts := []nats.Option{nats.Name("Web proxy")}
if natsUser != "" && natsPassword != "" {
opts = append(opts, nats.UserInfo(natsUser, natsPassword))
} else if natsToken != "" {
opts = append(opts, nats.Token(natsToken))
} else if natsNkey != "" && natsNkeySeed != "" {
log.Fatalln("NKEY connection not supported")
} else if natsCredsFile != "" {
opts = append(opts, nats.UserCredentials(natsCredsFile))
}
if natsInboxPrefix != "" {
opts = append(opts, nats.CustomInboxPrefix(natsInboxPrefix))
}
nc, err := nats.Connect(natsURL, opts...)
if err != nil {
log.Fatal("Error connecting to NATS:", err)
}
defer nc.Close()
// HTTP handler to proxy all requests to NATS
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// Read the entire request body (if any)
body, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "Error reading request body", http.StatusInternalServerError)
return
}
defer r.Body.Close()
// Create the NATS subject.
// Remove the leading slash from the path and replace remaining slashes with dots.
// The subject is prefixed with "http.<host>.<method>.", where <method> is lower-case.
// Extract host and reverse domain components
host := r.Host
// Remove port if present
if h, _, err := net.SplitHostPort(host); err == nil {
host = h
}
// Use "_" instead of "." in the domain to make it a single token.
domainParts := strings.ReplaceAll(host, ".", "_")
// Process path component
path := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/"), "/")
subjectPath, err := URLToNATS(path)
if err != nil {
http.Error(w, "Error converting endpoint to NATS subject", http.StatusInternalServerError)
log.Println("Could not convert endpoint to NATS subject", err)
return
}
// Build final subject
subjectBase := "http"
subjectParts := []string{subjectBase, domainParts, strings.ToLower(r.Method)}
if subjectPath != "" {
subjectParts = append(subjectParts, subjectPath)
}
subject := strings.Join(subjectParts, ".")
// Create a new NATS message with the HTTP request body
msg := nats.Msg{
Subject: subject,
Data: body,
Header: nats.Header{},
}
// Copy over all the HTTP request headers to the NATS message headers
for key, values := range r.Header {
for _, value := range values {
msg.Header.Add(key, value)
}
}
// Add additional HTTP meta-data as headers
msg.Header.Set("X-HTTP-Method", r.Method)
msg.Header.Set("X-HTTP-Path", r.URL.Path)
msg.Header.Set("X-HTTP-Query", r.URL.RawQuery)
msg.Header.Set("X-HTTP-Host", r.Host)
msg.Header.Set("X-Remote-Addr", r.RemoteAddr)
// Send the NATS request and wait synchronously for a reply (timeout: 30 seconds)
reply, err := nc.RequestMsg(&msg, 30*time.Second)
if err != nil {
log.Println("NATS request error:", err)
// Handle specific NATS error cases
if err == nats.ErrNoResponders || strings.Contains(err.Error(), "no responders") {
http.Error(w, "No service available to handle request", http.StatusNotFound)
} else {
http.Error(w, "Error processing request", http.StatusInternalServerError)
}
return
}
// Set any response headers from the NATS reply on the HTTP response
for key, values := range reply.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// Write the reply body (from NATS) back to the HTTP client
w.Write(reply.Data)
})
_, err = micro.AddService(nc, micro.Config{
Name: "http-nats-proxy",
Version: "0.1.0",
Endpoint: µ.EndpointConfig{
Subject: "http.*.*.proxy.>",
Handler: micro.HandlerFunc(func(natsReq micro.Request) {
// http.host.method.proxy.host.endpoint.>
httpPath := natsReq.Headers()["X-HTTP-Path"][0]
httpReqURL := fmt.Sprintf("https:/%s", strings.TrimPrefix(httpPath, "/proxy"))
httpBody := bytes.NewReader(natsReq.Data())
httpMethod := natsReq.Headers()["X-HTTP-Method"][0]
// Create a new request
httpReq, err := http.NewRequest(httpMethod, httpReqURL, httpBody)
if err != nil {
log.Fatal(err)
}
client := &http.Client{}
resp, err := client.Do(httpReq)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
resBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Fatal(err)
}
natsReq.Respond(resBody)
}),
},
})
if err != nil {
log.Fatal("Could not make NATS microservice:", err)
}
// Start the HTTP server
fmt.Println("Server is running on http://localhost:8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
|