summaryrefslogtreecommitdiff
path: root/src/server
diff options
context:
space:
mode:
Diffstat (limited to 'src/server')
-rw-r--r--src/server/client.zig57
-rw-r--r--src/server/main.zig126
-rw-r--r--src/server/message_parser.zig137
3 files changed, 92 insertions, 228 deletions
diff --git a/src/server/client.zig b/src/server/client.zig
index f74e6b3..5c8d0cd 100644
--- a/src/server/client.zig
+++ b/src/server/client.zig
@@ -32,17 +32,31 @@ pub fn start(self: *Client, io: std.Io, alloc: std.mem.Allocator, queue: *std.Io
for (msgs[0..len]) |msg| {
switch (msg) {
.@"+ok" => {
- try writeOk(self.to_client);
+ _ = try self.to_client.write("+OK\r\n");
},
.pong => {
- try writePong(self.to_client);
+ _ = try self.to_client.write("PONG\r\n");
},
.info => |info| {
- try writeInfo(self.to_client, info);
+ _ = try self.to_client.write("INFO ");
+ try std.json.Stringify.value(info, .{}, self.to_client);
+ _ = try self.to_client.write("\r\n");
},
.msg => |m| {
defer m.deinit(alloc);
- try writeMsg(self.to_client, m);
+ try self.to_client.print(
+ "MSG {s} {s} {s} {d}\r\n{s}\r\n",
+ .{
+ m.subject,
+ m.sid,
+ m.reply_to orelse "",
+ m.payload.len,
+ m.payload,
+ },
+ );
+ },
+ .@"-err" => |s| {
+ _ = try self.to_client.print("-ERR '{s}'\r\n", .{s});
},
else => |m| {
std.debug.panic("unimplemented write: {any}\n", .{m});
@@ -60,42 +74,7 @@ pub fn send(self: *Client, io: std.Io, msg: Message) !void {
}
pub fn next(self: *Client, allocator: std.mem.Allocator) !Message {
- // std.debug.print("in client awaiting next message\n", .{});
- // errdefer std.debug.print("actually it was canceled\n", .{});
- // defer std.debug.print("client returning next message!\n", .{});
return Message.next(allocator, self.from_client);
- // return self.send_queue.getOne(io);
-}
-
-fn writeOk(out: *std.Io.Writer) !void {
- _ = try out.write("+OK\r\n");
-}
-
-fn writeErr(out: *std.Io.Writer, msg: []const u8) !void {
- _ = try out.print("-ERR '{s}'\r\n", .{msg});
-}
-
-fn writePong(out: *std.Io.Writer) !void {
- _ = try out.write("PONG\r\n");
-}
-
-pub fn writeInfo(out: *std.Io.Writer, info: Message.ServerInfo) !void {
- _ = try out.write("INFO ");
- try std.json.Stringify.value(info, .{}, out);
- _ = try out.write("\r\n");
-}
-
-fn writeMsg(out: *std.Io.Writer, msg: Message.Msg) !void {
- try out.print(
- "MSG {s} {s} {s} {d}\r\n{s}\r\n",
- .{
- msg.subject,
- msg.sid,
- msg.reply_to orelse "",
- msg.payload.len,
- msg.payload,
- },
- );
}
test {
diff --git a/src/server/main.zig b/src/server/main.zig
index d8e8e61..dab0f0a 100644
--- a/src/server/main.zig
+++ b/src/server/main.zig
@@ -25,25 +25,45 @@ fn handleSigInt(sig: std.os.linux.SIG) callconv(.c) void {
keep_running.store(false, .monotonic);
}
-pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
+pub fn main(alloc: std.mem.Allocator, server_config: ServerInfo) !void {
// Configure the signal action
- // const act = std.posix.Sigaction{
- // .handler = .{ .handler = handleSigInt },
- // .mask = std.posix.sigemptyset(),
- // .flags = 0,
- // };
+ const act = std.posix.Sigaction{
+ .handler = .{ .handler = handleSigInt },
+ .mask = std.posix.sigemptyset(),
+ .flags = 0,
+ };
- // // Register the handler for SIGINT (Ctrl+C)
- // std.posix.sigaction(std.posix.SIG.INT, &act, null);
+ // Register the handler for SIGINT (Ctrl+C)
+ std.posix.sigaction(std.posix.SIG.INT, &act, null);
- var server: Server = .{
- .info = server_config,
- };
+ {
+ var dba: std.heap.DebugAllocator(.{}) = .init;
+ dba.backing_allocator = alloc;
+ defer _ = dba.deinit();
+ const gpa = if (builtin.mode == .Debug or builtin.mode == .ReleaseSafe) dba.allocator() else alloc;
+
+ var server: Server = .{
+ .info = server_config,
+ };
+
+ var threaded: std.Io.Threaded = .init(gpa, .{});
+ defer threaded.deinit();
+ const io = threaded.io();
- var threaded: std.Io.Threaded = .init(gpa, .{});
- defer threaded.deinit();
- const io = threaded.io();
+ var server_task = try io.concurrent(start, .{ &server, io, gpa });
+ defer server_task.cancel(io) catch {};
+
+ while (keep_running.load(.monotonic)) {
+ try io.sleep(.fromMilliseconds(1), .awake);
+ }
+
+ std.debug.print("\nShutting down...\n", .{});
+ server_task.cancel(io) catch {};
+ }
+ std.debug.print("Goodbye\n", .{});
+}
+pub fn start(server: *Server, io: std.Io, gpa: std.mem.Allocator) !void {
var tcp_server = try std.Io.net.IpAddress.listen(try std.Io.net.IpAddress.parse(
server.info.host,
server.info.port,
@@ -52,46 +72,20 @@ pub fn main(gpa: std.mem.Allocator, server_config: ServerInfo) !void {
var id: usize = 0;
// Run until SIGINT is handled, then exit gracefully
- while (keep_running.load(.monotonic)) : (id +%= 1) {
+ while (true) : (id +%= 1) {
std.debug.print("in server loop\n", .{});
if (server.clients.contains(id)) continue;
const stream = try tcp_server.accept(io);
std.debug.print("accepted connection\n", .{});
- _ = io.concurrent(handleConnection, .{ &server, gpa, io, id, stream }) catch {
+ _ = io.concurrent(handleConnection, .{ server, gpa, io, id, stream }) catch {
std.debug.print("could not start concurrent handler for {d}\n", .{id});
stream.close(io);
};
}
-
- std.debug.print("Exiting gracefully\n", .{});
-}
-
-fn processMsgs(server: *Server, io: std.Io, alloc: std.mem.Allocator) void {
- while (true) {
- const msg = server.msg_queue.getOne(io) catch break;
- defer msg.deinit(alloc);
-
- for (server.subscriptions.items) |subscription| {
- if (subjectMatches(subscription.subject, msg.subject)) {
- const client = server.clients.get(subscription.client_id) orelse {
- std.debug.print("trying to publish to a client that no longer exists: {d}", .{subscription.client_id});
- continue;
- };
- client.send(io, .{ .msg = .{
- .subject = msg.subject,
- .sid = subscription.sid,
- .reply_to = msg.reply_to,
- .payload = msg.payload,
- } }) catch continue;
- }
- }
- }
}
fn addClient(server: *Server, allocator: std.mem.Allocator, id: usize, client: *Client) !void {
- // server.clients.lockPointers();
try server.clients.put(allocator, id, client);
- // server.clients.unlockPointers();
}
fn removeClient(server: *Server, io: std.Io, allocator: std.mem.Allocator, id: usize) void {
@@ -185,13 +179,6 @@ fn handleConnection(
}
}
-// // Result is owned by the caller
-// fn subscribers(server: *Server, gpa: std.mem.Allocator, subject: []const u8) []ClientState {
-// var acc: std.ArrayList(ClientState) = .empty;
-
-// return acc.toOwnedSlice();
-// }
-
fn subjectMatches(expected: []const u8, actual: []const u8) bool {
return std.mem.eql(u8, expected, actual);
}
@@ -200,7 +187,7 @@ fn publishMessage(server: *Server, io: std.Io, alloc: std.mem.Allocator, source_
errdefer {
if (source_client.connect) |c| {
if (c.verbose) {
- source_client.send(io, .@"-err") catch {};
+ source_client.send(io, .{ .@"-err" = "Slow Consumer" }) catch {};
}
}
}
@@ -259,42 +246,3 @@ pub fn createId() []const u8 {
pub fn createName() []const u8 {
return "SERVERNAME";
}
-
-// TESTING
-
-// fn initTestServer() Server {
-// return .{
-// .info = .{
-// .server_id = "ABCD",
-// .server_name = "test server",
-// .version = "0.1.2",
-// .max_payload = 1234,
-// },
-// };
-// }
-
-// fn initTestClient(
-// io: std.Io,
-// allocator: std.mem.Allocator,
-// id: usize,
-// data_from: []const u8,
-// ) !struct {
-// Client,
-// *std.Io.Reader,
-// *std.Io.Writer,
-// } {
-// return .init(io, allocator, id, .{}, in, out);
-// }
-
-// test {
-// const gpa = std.testing.allocator;
-// const io = std.testing.io;
-
-// const server = initTestServer();
-// const client: Client = .init(
-// io,
-// gpa,
-// 1,
-// .{},
-// );
-// }
diff --git a/src/server/message_parser.zig b/src/server/message_parser.zig
index b156dd6..6561c4b 100644
--- a/src/server/message_parser.zig
+++ b/src/server/message_parser.zig
@@ -43,7 +43,7 @@ pub const Message = union(MessageType) {
ping,
pong,
@"+ok": void,
- @"-err": void,
+ @"-err": []const u8,
pub const ServerInfo = struct {
/// The unique identifier of the NATS server.
server_id: []const u8,
@@ -416,34 +416,6 @@ fn readSubject(alloc: std.mem.Allocator, in: *std.Io.Reader) ![]const u8 {
return subject_list.toOwnedSlice(alloc);
}
-fn parseJsonMessage(T: type, alloc: std.mem.Allocator, in: *std.Io.Reader) !T {
- var reader: std.json.Reader = .init(alloc, in);
- return std.json.innerParse(T, alloc, &reader, .{
- .max_value_len = std.json.default_max_value_len,
- .allocate = .alloc_always,
- });
-}
-
-fn parsePub(in: *std.Io.Reader) !Message.Pub {
- const subject = (try in.takeDelimiter(' ')) orelse return error.EndOfStream;
- const next = (try in.takeDelimiter('\r')) orelse return error.EndOfStream;
- var reply_to: ?[]const u8 = null;
- const bytes = std.fmt.parseUnsigned(usize, next, 10) catch blk: {
- reply_to = next;
- break :blk try std.fmt.parseUnsigned(usize, (try in.takeDelimiter(' ')) orelse return error.EndOfStream, 10);
- };
-
- in.toss(1); // LF
- const payload = try in.take(bytes);
-
- return .{
- .subject = subject,
- .reply_to = reply_to,
- .bytes = bytes,
- .payload = payload,
- };
-}
-
inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void {
if (!std.mem.eql(u8, try reader.take(expected.len), expected)) {
@branchHint(.unlikely);
@@ -451,74 +423,39 @@ inline fn expectStreamBytes(reader: *std.Io.Reader, expected: []const u8) !void
}
}
-// try returning error in debug mode, only null in release?
-// pub fn parseNextMessage(alloc: std.mem.Allocator, in: *std.Io.Reader) ?Message {
-// const message_type: MessageType = blk: {
-// var word: ["CONNECT".len]u8 = undefined;
-// var len: usize = 0;
-// for (&word, 0..) |*b, i| {
-// const byte = in.takeByte() catch return null;
-// if (std.ascii.isUpper(byte)) {
-// b.* = byte;
-// len = i + 1;
-// } else break;
-// }
-
-// break :blk Message.parse(word[0..len]) orelse return null;
-// };
-
-// // defer in.toss(2); // CRLF
-// return switch (message_type) {
-// .connect => blk: {
-// const value: ?Message = .{ .connect = parseJsonMessage(Message.Connect, alloc, in) catch return null };
-
-// break :blk value;
-// },
-// .@"pub" => .{ .@"pub" = parsePub(in) catch |err| std.debug.panic("{}", .{err}) },
-// .ping => .ping,
-// else => null,
-// };
-// }
-
-// test parseNextMessage {
-// const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":false,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43.0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r\nPUB hi 3\r\nfoo\r\n";
-// var reader: std.Io.Reader = .fixed(input);
-// var arena: std.heap.ArenaAllocator = .init(std.testing.allocator);
-// defer arena.deinit();
-// const gpa = arena.allocator();
-
-// {
-// const msg: Message = try Message.next(gpa, &reader);
-// const expected: Message = .{ .connect = .{
-// .connect = .{
-// .verbose = false,
-// .pedantic = false,
-// .tls_required = false,
-// .name = try gpa.dupe(u8, "NATS CLI Version v0.2.4"),
-// .lang = try gpa.dupe(u8, "go"),
-// .version = try gpa.dupe(u8, "1.43.0"),
-// .protocol = 1,
-// .echo = true,
-// .headers = true,
-// .no_responders = true,
-// },
-// .allocator = arena,
-// } };
-
-// try std.testing.expectEqualDeep(expected, msg);
-// }
-// {
-// const msg: Message = try Message.next(gpa, &reader);
-// const expected: Message = .{ .@"pub" = .{
-// .subject = "hi",
-// .payload = "foo",
-// } };
-// try std.testing.expectEqualDeep(expected, msg);
-// }
-// }
-
-// test "MessageType.parse performance" {
-// // Measure perf for parseMemEql
-// // Measure perf for parseStaticStringMap
-// // assert parse = fastest perf
-// }
+test "parsing a stream" {
+ const input = "CONNECT {\"verbose\":false,\"pedantic\":false,\"tls_required\":fa" ++
+ "lse,\"name\":\"NATS CLI Version v0.2.4\",\"lang\":\"go\",\"version\":\"1.43" ++
+ ".0\",\"protocol\":1,\"echo\":true,\"headers\":true,\"no_responders\":true}\r" ++
+ "\nPUB hi 3\r\nfoo\r\n";
+ var reader: std.Io.Reader = .fixed(input);
+ var arena: std.heap.ArenaAllocator = .init(std.testing.allocator);
+ defer arena.deinit();
+ const gpa = arena.allocator();
+
+ {
+ const msg: Message = try Message.next(gpa, &reader);
+ const expected: Message = .{ .connect = .{
+ .verbose = false,
+ .pedantic = false,
+ .tls_required = false,
+ .name = "NATS CLI Version v0.2.4",
+ .lang = "go",
+ .version = "1.43.0",
+ .protocol = 1,
+ .echo = true,
+ .headers = true,
+ .no_responders = true,
+ } };
+
+ try std.testing.expectEqualDeep(expected, msg);
+ }
+ {
+ const msg: Message = try Message.next(gpa, &reader);
+ const expected: Message = .{ .@"pub" = .{
+ .subject = "hi",
+ .payload = "foo",
+ } };
+ try std.testing.expectEqualDeep(expected, msg);
+ }
+}