summaryrefslogtreecommitdiff
path: root/main.go
blob: d0479e6088c12e7a2b9c169c12334249d76dbb39 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package main

import (
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"strings"
	"time"

	"github.com/nats-io/nats.go"
)

// printHelp outputs the usage information.
func printHelp() {
	fmt.Println("Usage: HTTP-to-NATS proxy server")
	fmt.Println("\nThe following environment variables are supported:")
	fmt.Println("    NATS_URL - NATS connection URL (default: nats://127.0.0.1:4222)")
	fmt.Println("    NATS_USER - NATS username for authentication (optional)")
	fmt.Println("    NATS_PASSWORD - NATS password for authentication (optional)")
	fmt.Println("    NATS_TOKEN - NATS token for authentication (optional)")
	fmt.Println("    NATS_NKEY - NATS NKEY for authentication (optional)")
	fmt.Println("    NATS_NKEY_SEED - NATS NKEY seed for authentication (optional)")
	fmt.Println("    NATS_CREDS_FILE - Path to NATS credentials file (optional)")
	fmt.Println("    NATS_INBOX_PREFIX - Subject prefix for NATS messages (default: _INBOX)")
	fmt.Println("    HTTP_PORT - HTTP port to listen on (default: 8080)")
}

func main() {
	helpFlag := flag.Bool("help", false, "Display help information about available environment variables")
	flag.Parse()
	if *helpFlag {
		printHelp()
		os.Exit(0)
	}

	// Read NATS connection info from environment variables
	natsURL := os.Getenv("NATS_URL")
	if natsURL == "" {
		natsURL = nats.DefaultURL // defaults to "nats://127.0.0.1:4222"
	}
	natsUser := os.Getenv("NATS_USER")
	natsPassword := os.Getenv("NATS_PASSWORD")
	natsToken := os.Getenv("NATS_TOKEN")
	natsNkey := os.Getenv("NATS_NKEY")
	natsNkeySeed := os.Getenv("NATS_NKEY_SEED")
	natsCredsFile := os.Getenv("NATS_CREDS_FILE")
	natsInboxPrefix := os.Getenv("NATS_INBOX_PREFIX")

	// Read HTTP port from environment variables
	httpPort := os.Getenv("HTTP_PORT")
	if httpPort == "" {
		httpPort = "8080"
	}

	// Set up NATS connection options
	opts := []nats.Option{nats.Name("Web proxy")}

	if natsUser != "" && natsPassword != "" {
		opts = append(opts, nats.UserInfo(natsUser, natsPassword))
	} else if natsToken != "" {
		opts = append(opts, nats.Token(natsToken))
	} else if natsNkey != "" && natsNkeySeed != "" {
		log.Fatalln("NKEY connection not supported")
	} else if natsCredsFile != "" {
		opts = append(opts, nats.UserCredentials(natsCredsFile))
	}

	if natsInboxPrefix != "" {
		opts = append(opts, nats.CustomInboxPrefix(natsInboxPrefix))
	}

	nc, err := nats.Connect(natsURL, opts...)
	if err != nil {
		log.Fatal("Error connecting to NATS:", err)
	}
	defer nc.Close()

	// HTTP handler to proxy all requests to NATS
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		// Read the entire request body (if any)
		body, err := ioutil.ReadAll(r.Body)
		if err != nil {
			http.Error(w, "Error reading request body", http.StatusInternalServerError)
			return
		}
		defer r.Body.Close()

		// Create the NATS subject.
		// Remove the leading slash from the path and replace remaining slashes with dots.
		// The subject is prefixed with "http.<method>.", where <method> is lower-case.
		path := strings.TrimPrefix(r.URL.Path, "/")
		subjectPath := strings.ReplaceAll(path, "/", ".")
		subject := fmt.Sprintf("http.%s.%s", strings.ToLower(r.Method), subjectPath)

		log.Println("Forwarding HTTP", r.Method, "request on", r.URL.Path, "to NATS subject:", subject)

		// Create a new NATS message with the HTTP request body
		msg := nats.Msg{
			Subject: subject,
			Data:    body,
			Header:  nats.Header{},
		}

		// Copy over all the HTTP request headers to the NATS message headers
		for key, values := range r.Header {
			for _, value := range values {
				msg.Header.Add(key, value)
			}
		}

		// Add additional HTTP meta-data as headers
		msg.Header.Set("X-HTTP-Method", r.Method)
		msg.Header.Set("X-HTTP-Path", r.URL.Path)
		msg.Header.Set("X-HTTP-Query", r.URL.RawQuery)
		msg.Header.Set("X-Remote-Addr", r.RemoteAddr)

		// Send the NATS request and wait synchronously for a reply (timeout: 30 seconds)
		reply, err := nc.RequestMsg(&msg, 30*time.Second)
		if err != nil {
			http.Error(w, "Error processing request", http.StatusInternalServerError)
			log.Println("NATS request error:", err)
			return
		}

		// Set any response headers from the NATS reply on the HTTP response
		for key, values := range reply.Header {
			for _, value := range values {
				w.Header().Add(key, value)
			}
		}

		// Write the reply body (from NATS) back to the HTTP client
		w.Write(reply.Data)
	})

	// Start the HTTP server
	fmt.Println("Server is running on http://localhost:8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}