From 2e80560ea193587618d3990099f5b994a351f47c Mon Sep 17 00:00:00 2001 From: peterino2 Date: Mon, 9 Feb 2026 21:50:43 -0800 Subject: [PATCH] saving --- .gitignore | 5 + build.zig | 48 ++++++++ build.zig.zon | 12 ++ src/logging.zig | 3 + src/root.zig | 6 + src/udp.zig | 162 +++++++++++++++++++++++++ test/client.zig | 53 ++++++++ test/server.zig | 47 ++++++++ webtest/assets.go | 6 + webtest/go.mod | 9 ++ webtest/go.sum | 8 ++ webtest/hub.go | 95 +++++++++++++++ webtest/main.go | 19 +++ webtest/processes.go | 156 ++++++++++++++++++++++++ webtest/server.go | 232 ++++++++++++++++++++++++++++++++++++ webtest/static/app.js | 188 +++++++++++++++++++++++++++++ webtest/static/tailwind.css | 179 ++++++++++++++++++++++++++++ webtest/templates.go | 89 ++++++++++++++ webtest/types.go | 8 ++ 19 files changed, 1325 insertions(+) create mode 100644 .gitignore create mode 100644 build.zig create mode 100644 build.zig.zon create mode 100644 src/logging.zig create mode 100644 src/root.zig create mode 100644 src/udp.zig create mode 100644 test/client.zig create mode 100644 test/server.zig create mode 100644 webtest/assets.go create mode 100644 webtest/go.mod create mode 100644 webtest/go.sum create mode 100644 webtest/hub.go create mode 100644 webtest/main.go create mode 100644 webtest/processes.go create mode 100644 webtest/server.go create mode 100644 webtest/static/app.js create mode 100644 webtest/static/tailwind.css create mode 100644 webtest/templates.go create mode 100644 webtest/types.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ab145f2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ + +zig-out +.zig-cache +webtest/webtest.exe +webtest/webtest diff --git a/build.zig b/build.zig new file mode 100644 index 0000000..b199fed --- /dev/null +++ b/build.zig @@ -0,0 +1,48 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const optimize = b.standardOptimizeOption(.{}); + const target = b.standardTargetOptions(.{}); + const zargs_dep = b.dependency("zargs", .{ + .target = target, + .optimize = optimize, + }); + const zargs_mod = zargs_dep.module("zargs"); + + const mod = b.addModule("crasa-transport", .{ + .root_source_file = b.path("src/root.zig"), + .target = target, + .optimize = optimize, + .link_libc = true, + }); + + const tests = b.addTest(.{ .root_module = mod }); + const testStep = b.step("test", "runs tests associated with the crasa-transportprotocol"); + testStep.dependOn(&tests.step); + + const server_mod = b.createModule(.{ + .root_source_file = b.path("test/server.zig"), + .target = target, + .optimize = optimize, + }); + server_mod.addImport("crasa-transport", mod); + server_mod.addImport("zargs", zargs_mod); + const server_exe = b.addExecutable(.{ + .name = "test-server", + .root_module = server_mod, + }); + b.installArtifact(server_exe); + + const client_mod = b.createModule(.{ + .root_source_file = b.path("test/client.zig"), + .target = target, + .optimize = optimize, + }); + client_mod.addImport("crasa-transport", mod); + client_mod.addImport("zargs", zargs_mod); + const client_exe = b.addExecutable(.{ + .name = "test-client", + .root_module = client_mod, + }); + b.installArtifact(client_exe); +} diff --git a/build.zig.zon b/build.zig.zon new file mode 100644 index 0000000..408dd54 --- /dev/null +++ b/build.zig.zon @@ -0,0 +1,12 @@ +.{ + .name = .transport, + .version = "0.15.2", + .dependencies = .{ + .zargs = .{ + .url = "git+https://git.peterino.com/peterino/zargs.git#d980c5bfe9a2328e7e335f72fccc7ef96dd3d27d", + .hash = "zargs-0.0.0-CRr7fB6LAQAb5pI7v2FlN9z0cNDDRAT3GkIJ80GBYGxD", + }, + }, + .paths = .{""}, + .fingerprint = 0x66ab212ecbb3a7e9, +} diff --git a/src/logging.zig b/src/logging.zig new file mode 100644 index 0000000..7382e14 --- /dev/null +++ b/src/logging.zig @@ -0,0 +1,3 @@ +const std = @import("std"); + +pub const log = std.log.scoped(.transport); diff --git a/src/root.zig b/src/root.zig new file mode 100644 index 0000000..b604a2c --- /dev/null +++ b/src/root.zig @@ -0,0 +1,6 @@ +test "hello world" {} + +pub const udp = @import("udp.zig"); +pub const logging = @import("logging.zig"); + +pub const UdpEngine = udp.UdpEngine; diff --git a/src/udp.zig b/src/udp.zig new file mode 100644 index 0000000..2735a19 --- /dev/null +++ b/src/udp.zig @@ -0,0 +1,162 @@ +// a bidirectional reliable udp transport +// +// I'm going to call this the CRU protocol +// +// stands for crasa-reliable-udp +// +// packet layout of the payload + +// this results in a non blocking transport engine where you can poll for when new messages are +// available. And the messages are sent through as fast as they can be, mangled/unacked messages +// are replayed by the server over time. +// +// messages will arrive from the server to the client /eventually/ +// +// they will arrive out of order as well. +pub const UdpEngine = struct { + allocator: std.mem.Allocator, + port: u16 = 6700, + signalExit: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), + address: std.net.Address, + + pub fn create(allocator: std.mem.Allocator) !*@This() { + const self = try allocator.create(@This()); + + self.* = .{ + .allocator = allocator, + .address = try std.net.Address.parseIp4("0.0.0.0", 5000), + }; + + return self; + } + + // requires a memory safe allocator, as this is the Allocator + // that will be used to create the backing allocator on the working thread. + pub fn serve(self: *@This(), port: u16) !void { + self.port = port; + self.address = try std.net.Address.parseIp4("0.0.0.0", port); + const thread = try std.Thread.spawn(.{}, startServer, .{self}); + thread.detach(); + } + + pub fn connect(self: *@This(), host: []const u8, port: u16) !void { + log.info("parsing host '{s}'", .{host}); + const resolved_host = if (std.mem.eql(u8, host, "localhost")) + "127.0.0.1" + else + host; + self.address = try std.net.Address.resolveIp(resolved_host, port); + self.port = port; + const thread = try std.Thread.spawn(.{}, startClient, .{self}); + thread.detach(); + } + + pub fn startServer(self: *@This()) void { + const sockfd = posix.socket( + self.address.any.family, + posix.SOCK.DGRAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK, + posix.IPPROTO.UDP, + ) catch |err| { + log.err("server socket error: {any}", .{err}); + return; + }; + defer posix.close(sockfd); + + posix.bind(sockfd, &self.address.any, self.address.getOsSockLen()) catch |err| { + log.err("server bind error: {any}", .{err}); + return; + }; + + log.info("udp server listening on port {d}", .{self.address.getPort()}); + + var buffer: [1024]u8 = undefined; + while (!self.signalExit.load(.monotonic)) { + var src_addr: posix.sockaddr = undefined; + + var src_len: posix.socklen_t = @sizeOf(posix.sockaddr); + const n = posix.recvfrom(sockfd, buffer[0..], 0, &src_addr, &src_len) catch |err| switch (err) { + error.WouldBlock => { + std.Thread.sleep(20 * std.time.ns_per_ms); + continue; + }, + else => { + log.err("server recv error: {any}", .{err}); + break; + }, + }; + + const addr = std.net.Address.initPosix(@alignCast(&src_addr)); + log.info("server recv {d} bytes from {f}", .{ n, addr }); + + const msg = buffer[0..n]; + if (std.mem.eql(u8, msg, "HELLO")) { + _ = posix.sendto(sockfd, "HELLO", 0, &src_addr, src_len) catch |err| { + log.err("server send error: {any}", .{err}); + continue; + }; + log.info("server replied HELLO", .{}); + } + } + } + + pub fn startClient(self: *@This()) void { + const sockfd = posix.socket( + self.address.any.family, + posix.SOCK.DGRAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK, + posix.IPPROTO.UDP, + ) catch |err| { + log.err("client socket error: {any}", .{err}); + return; + }; + defer posix.close(sockfd); + + const local_addr = std.net.Address.parseIp4("0.0.0.0", 0) catch |err| { + log.err("client local addr parse error: {any}", .{err}); + return; + }; + posix.bind(sockfd, &local_addr.any, local_addr.getOsSockLen()) catch |err| { + log.err("client bind error: {any}", .{err}); + return; + }; + + _ = posix.sendto(sockfd, "HELLO", 0, &self.address.any, self.address.getOsSockLen()) catch |err| { + log.err("client send error: {any}", .{err}); + return; + }; + log.info("client sent HELLO to {f}", .{self.address}); + + var buffer: [1024]u8 = undefined; + while (!self.signalExit.load(.monotonic)) { + var src_addr: posix.sockaddr = undefined; + var src_len: posix.socklen_t = @sizeOf(posix.sockaddr); + const n = posix.recvfrom(sockfd, buffer[0..], 0, &src_addr, &src_len) catch |err| switch (err) { + error.WouldBlock => { + std.Thread.sleep(20 * std.time.ns_per_ms); + continue; + }, + else => { + log.err("client recv error: {any}", .{err}); + break; + }, + }; + + const msg = buffer[0..n]; + log.info("client recv {d} bytes", .{n}); + if (std.mem.eql(u8, msg, "HELLO")) { + log.info("client received HELLO reply", .{}); + } + } + } + + pub fn shouldExit(self: *@This()) bool { + return self.signalExit.load(.monotonic); + } + + pub fn destroy(self: *@This()) void { + self.allocator.destroy(self); + } +}; + +const std = @import("std"); +const posix = std.posix; +const log = @import("logging.zig").log; diff --git a/test/client.zig b/test/client.zig new file mode 100644 index 0000000..e2e4ff0 --- /dev/null +++ b/test/client.zig @@ -0,0 +1,53 @@ +const std = @import("std"); +const transport = @import("crasa-transport"); +const zargs = @import("zargs"); +const log = transport.logging.log; + +const Config = struct { + host: []const u8 = "127.0.0.1", + port: u16 = 9700, + + pub const meta = .{ + .port = .{ + .short = 'p', + .help = "UDP port to connect on", + }, + .host = .{ + .short = 'H', + .help = "Hostname to resolve", + }, + }; +}; + +pub fn main() !void { + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + zargs.setAllocator(allocator); + defer zargs.shutdown(); + + const config: Config = try zargs.parse(Config); + const argv = try std.process.argsAlloc(allocator); + defer std.process.argsFree(allocator, argv); + + if (zargs.isHelp()) { + const help_text = try zargs.getUsageAlloc("test-client"); + defer allocator.free(help_text); + std.debug.print("{s}", .{help_text}); + return; + } + + log.info("client startup argc={d} port={d}", .{ argv.len, config.port }); + for (argv, 0..) |arg, i| { + log.info("client argv[{d}]={s}", .{ i, arg }); + } + + const engine = try transport.UdpEngine.create(allocator); + defer engine.destroy(); + + try engine.connect(config.host, config.port); + + while (!engine.shouldExit()) { + std.Thread.sleep(500 * std.time.ns_per_ms); + } +} diff --git a/test/server.zig b/test/server.zig new file mode 100644 index 0000000..32f0ca8 --- /dev/null +++ b/test/server.zig @@ -0,0 +1,47 @@ +const std = @import("std"); +const transport = @import("crasa-transport"); +const zargs = @import("zargs"); +const log = transport.logging.log; + +const Config = struct { + port: u16 = 9700, + + pub const meta = .{ + .port = .{ + .short = 'p', + .help = "UDP port", + }, + }; +}; + +pub fn main() !void { + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + defer _ = gpa.deinit(); + const allocator = gpa.allocator(); + zargs.setAllocator(allocator); + defer zargs.shutdown(); + + const config = try zargs.parse(Config); + const argv = try std.process.argsAlloc(allocator); + defer std.process.argsFree(allocator, argv); + + if (zargs.isHelp()) { + const help_text = try zargs.getUsageAlloc("test-server"); + defer allocator.free(help_text); + std.debug.print("{s}", .{help_text}); + return; + } + + log.info("server startup argc={d} port={d}", .{ argv.len, config.port }); + for (argv, 0..) |arg, i| { + log.info("server argv[{d}]={s}", .{ i, arg }); + } + + const engine = try transport.UdpEngine.create(allocator); + defer engine.destroy(); + try engine.serve(config.port); + + while (!engine.shouldExit()) { + std.Thread.sleep(500 * std.time.ns_per_ms); + } +} diff --git a/webtest/assets.go b/webtest/assets.go new file mode 100644 index 0000000..2dc590c --- /dev/null +++ b/webtest/assets.go @@ -0,0 +1,6 @@ +package main + +import "embed" + +//go:embed static/* +var staticFiles embed.FS diff --git a/webtest/go.mod b/webtest/go.mod new file mode 100644 index 0000000..b5abae6 --- /dev/null +++ b/webtest/go.mod @@ -0,0 +1,9 @@ +module crasad/transport/webtest + +go 1.23.0 + +require ( + github.com/a-h/templ v0.3.865 + github.com/go-chi/chi/v5 v5.2.3 + github.com/gorilla/websocket v1.5.3 +) diff --git a/webtest/go.sum b/webtest/go.sum new file mode 100644 index 0000000..229e5d6 --- /dev/null +++ b/webtest/go.sum @@ -0,0 +1,8 @@ +github.com/a-h/templ v0.3.865 h1:nYn5EWm9EiXaDgWcMQaKiKvrydqgxDUtT1+4zU2C43A= +github.com/a-h/templ v0.3.865/go.mod h1:oLBbZVQ6//Q6zpvSMPTuBK0F3qOtBdFBcGRspcT+VNQ= +github.com/go-chi/chi/v5 v5.2.3 h1:WQIt9uxdsAbgIYgid+BpYc+liqQZGMHRaUwp0JUcvdE= +github.com/go-chi/chi/v5 v5.2.3/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/webtest/hub.go b/webtest/hub.go new file mode 100644 index 0000000..f3297a8 --- /dev/null +++ b/webtest/hub.go @@ -0,0 +1,95 @@ +package main + +import ( + "encoding/json" + "log" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type hub struct { + mu sync.Mutex + clients map[*websocket.Conn]struct{} + history []logMessage + maxHistory int +} + +func newHub() *hub { + return &hub{ + clients: make(map[*websocket.Conn]struct{}), + history: make([]logMessage, 0, 512), + maxHistory: 2000, + } +} + +func (h *hub) add(c *websocket.Conn) { + h.mu.Lock() + h.clients[c] = struct{}{} + snapshot := append([]logMessage(nil), h.history...) + clientCount := len(h.clients) + h.mu.Unlock() + + log.Printf("ws client connected remote=%s clients=%d", c.RemoteAddr(), clientCount) + h.replay(c, snapshot) +} + +func (h *hub) remove(c *websocket.Conn) { + h.mu.Lock() + defer h.mu.Unlock() + delete(h.clients, c) + log.Printf("ws client disconnected remote=%s clients=%d", c.RemoteAddr(), len(h.clients)) +} + +func (h *hub) broadcast(msg logMessage) { + log.Printf("message process=%s stream=%s line=%q time=%s", msg.Process, msg.Stream, msg.Line, msg.Time) + + payload, err := json.Marshal(msg) + if err != nil { + log.Printf("broadcast marshal error process=%s stream=%s err=%v", msg.Process, msg.Stream, err) + return + } + + h.mu.Lock() + defer h.mu.Unlock() + + h.history = append(h.history, msg) + if len(h.history) > h.maxHistory { + h.history = h.history[len(h.history)-h.maxHistory:] + } + + for c := range h.clients { + _ = c.SetWriteDeadline(time.Now().Add(2 * time.Second)) + if err := c.WriteMessage(websocket.TextMessage, payload); err != nil { + log.Printf("broadcast write error remote=%s err=%v", c.RemoteAddr(), err) + _ = c.Close() + delete(h.clients, c) + } + } +} + +func (h *hub) replay(c *websocket.Conn, snapshot []logMessage) { + for _, msg := range snapshot { + payload, err := json.Marshal(msg) + if err != nil { + continue + } + _ = c.SetWriteDeadline(time.Now().Add(2 * time.Second)) + if err := c.WriteMessage(websocket.TextMessage, payload); err != nil { + log.Printf("ws replay failed remote=%s err=%v", c.RemoteAddr(), err) + return + } + } + log.Printf("ws replay complete remote=%s messages=%d", c.RemoteAddr(), len(snapshot)) +} + +func (h *hub) status(process, line string) { + log.Printf("status process=%s line=%s", process, line) + h.broadcast(logMessage{ + Process: process, + Stream: "status", + Line: line, + Time: time.Now().Format(time.RFC3339), + }) +} diff --git a/webtest/main.go b/webtest/main.go new file mode 100644 index 0000000..c9d7812 --- /dev/null +++ b/webtest/main.go @@ -0,0 +1,19 @@ +package main + +import ( + "log" + "os" + "runtime" +) + +func main() { + if runtime.GOOS == "windows" { + log.SetFlags(log.LstdFlags) + } + log.SetOutput(os.Stdout) + log.SetPrefix("webtest ") + + if err := run(); err != nil { + log.Fatal(err) + } +} diff --git a/webtest/processes.go b/webtest/processes.go new file mode 100644 index 0000000..ef721b8 --- /dev/null +++ b/webtest/processes.go @@ -0,0 +1,156 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "log" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "sync" + "syscall" + "time" +) + +func findRepoRoot() (string, error) { + cwd, err := os.Getwd() + if err != nil { + return "", err + } + + candidates := []string{ + cwd, + filepath.Clean(filepath.Join(cwd, "..")), + } + log.Printf("repo root discovery cwd=%s candidates=%v", cwd, candidates) + + for _, dir := range candidates { + if _, err := os.Stat(filepath.Join(dir, "build.zig")); err == nil { + log.Printf("repo root found path=%s", dir) + return dir, nil + } + } + + return "", fmt.Errorf("could not find repo root with build.zig from %s", cwd) +} + +func runBuildInstall(root string, h *hub) error { + start := time.Now() + cmd := exec.Command("zig", "build", "install") + cmd.Dir = root + log.Printf("build start cmd=%q dir=%s", strings.Join(cmd.Args, " "), root) + h.status("build", "running zig build install") + + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + if err := cmd.Start(); err != nil { + return err + } + + var wg sync.WaitGroup + wg.Add(2) + go streamPipe("build", "stdout", stdout, h, &wg) + go streamPipe("build", "stderr", stderr, h, &wg) + wg.Wait() + + if err := cmd.Wait(); err != nil { + log.Printf("build failed duration=%s err=%v", time.Since(start), err) + return err + } + log.Printf("build success duration=%s", time.Since(start)) + h.status("build", "zig build install completed") + return nil +} + +func streamPipe(procName, streamName string, r io.Reader, h *hub, wg *sync.WaitGroup) { + defer wg.Done() + scanner := bufio.NewScanner(r) + for scanner.Scan() { + h.broadcast(logMessage{ + Process: procName, + Stream: streamName, + Line: scanner.Text(), + Time: time.Now().Format(time.RFC3339), + }) + } + if err := scanner.Err(); err != nil { + log.Printf("stream scanner error process=%s stream=%s err=%v", procName, streamName, err) + } +} + +func launchProcess(ctx context.Context, root, name, binPath string, args []string, h *hub) (*exec.Cmd, error) { + log.Printf("launch requested name=%s bin=%s args=%v dir=%s", name, binPath, args, root) + cmd := exec.CommandContext(ctx, binPath, args...) + cmd.Dir = root + + stdout, err := cmd.StdoutPipe() + if err != nil { + return nil, err + } + stderr, err := cmd.StderrPipe() + if err != nil { + return nil, err + } + + if err := cmd.Start(); err != nil { + return nil, err + } + h.status(name, fmt.Sprintf("started pid=%d args=%v", cmd.Process.Pid, args)) + + var wg sync.WaitGroup + wg.Add(2) + go streamPipe(name, "stdout", stdout, h, &wg) + go streamPipe(name, "stderr", stderr, h, &wg) + go func() { wg.Wait() }() + + return cmd, nil +} + +func stopProcess(cmd *exec.Cmd) { + if cmd == nil || cmd.Process == nil { + return + } + if cmd.ProcessState != nil && cmd.ProcessState.Exited() { + return + } + + if runtime.GOOS == "windows" { + _ = cmd.Process.Kill() + return + } + + _ = cmd.Process.Signal(syscall.SIGTERM) + done := make(chan struct{}) + go func() { + _, _ = cmd.Process.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(2 * time.Second): + _ = cmd.Process.Kill() + } +} + +func mustBinPath(root, name string) (string, error) { + suffix := "" + if runtime.GOOS == "windows" { + suffix = ".exe" + } + + p := filepath.Join(root, "zig-out", "bin", name+suffix) + if _, err := os.Stat(p); err != nil { + return "", fmt.Errorf("missing binary %s: %w", p, err) + } + return p, nil +} diff --git a/webtest/server.go b/webtest/server.go new file mode 100644 index 0000000..47ada8e --- /dev/null +++ b/webtest/server.go @@ -0,0 +1,232 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "os/exec" + "os/signal" + "strconv" + "sync" + "syscall" + "time" + + "github.com/a-h/templ" + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + "github.com/gorilla/websocket" +) + +func run() error { + log.Printf("startup orchestrator pid=%d", os.Getpid()) + + root, err := findRepoRoot() + if err != nil { + return err + } + + h := newHub() + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, + } + + r := chi.NewRouter() + r.Use(middleware.RealIP) + r.Use(middleware.RequestID) + r.Use(middleware.Recoverer) + + r.Get("/", templ.Handler(indexPage()).ServeHTTP) + + r.Get("/ws", func(w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Printf("ws upgrade failed remote=%s err=%v", r.RemoteAddr, err) + return + } + h.add(conn) + defer func() { + h.remove(conn) + _ = conn.Close() + }() + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + }) + + staticServer := http.FileServer(http.FS(staticFiles)) + r.Handle("/static/*", http.StripPrefix("/", staticServer)) + + httpServer := &http.Server{ + Addr: "127.0.0.1:5050", + Handler: r, + ReadHeaderTimeout: 5 * time.Second, + } + + go func() { + log.Printf("http server listen addr=http://127.0.0.1:5050") + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("http server failed: %v", err) + } + }() + + if err := runBuildInstall(root, h); err != nil { + return err + } + + serverBin, err := mustBinPath(root, "test-server") + if err != nil { + return err + } + clientBin, err := mustBinPath(root, "test-client") + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + serverCmd, err := launchProcess(ctx, root, "server", serverBin, []string{"--port", "5000"}, h) + if err != nil { + return err + } + + var ( + clientMu sync.Mutex + nextClientID = 1 + clients = map[int]*exec.Cmd{} + ) + + launchClient := func() (int, error) { + clientMu.Lock() + id := nextClientID + nextClientID += 1 + clientMu.Unlock() + + name := fmt.Sprintf("client-%d", id) + cmd, err := launchProcess(ctx, root, name, clientBin, []string{"--port=5000", "--host=localhost"}, h) + if err != nil { + return 0, err + } + + clientMu.Lock() + clients[id] = cmd + clientMu.Unlock() + + go func(clientID int, clientName string, clientCmd *exec.Cmd) { + err := clientCmd.Wait() + clientMu.Lock() + delete(clients, clientID) + clientMu.Unlock() + if err != nil { + h.status("orchestrator", fmt.Sprintf("%s exited with error: %v", clientName, err)) + return + } + h.status("orchestrator", fmt.Sprintf("%s exited cleanly", clientName)) + }(id, name, cmd) + + return id, nil + } + + stopClient := func(id int) bool { + clientMu.Lock() + cmd, ok := clients[id] + if ok { + delete(clients, id) + } + clientMu.Unlock() + if !ok { + return false + } + stopProcess(cmd) + return true + } + + stopAllClients := func() { + clientMu.Lock() + copied := make([]*exec.Cmd, 0, len(clients)) + for _, cmd := range clients { + copied = append(copied, cmd) + } + clients = map[int]*exec.Cmd{} + clientMu.Unlock() + for _, cmd := range copied { + stopProcess(cmd) + } + } + + r.Post("/api/clients", func(w http.ResponseWriter, r *http.Request) { + id, err := launchClient() + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + name := fmt.Sprintf("client-%d", id) + h.status("orchestrator", fmt.Sprintf("spawned %s", name)) + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(map[string]any{ + "id": id, + "name": name, + }) + }) + + r.Delete("/api/clients/{id}", func(w http.ResponseWriter, r *http.Request) { + idText := chi.URLParam(r, "id") + id, err := strconv.Atoi(idText) + if err != nil { + http.Error(w, "invalid client id", http.StatusBadRequest) + return + } + if !stopClient(id) { + http.Error(w, "client not found", http.StatusNotFound) + return + } + h.status("orchestrator", fmt.Sprintf("stopped client-%d", id)) + w.WriteHeader(http.StatusNoContent) + }) + + if _, err := launchClient(); err != nil { + stopProcess(serverCmd) + return err + } + + h.status("orchestrator", "server and initial client launched") + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + waitCh := make(chan string, 1) + go func() { + if err := serverCmd.Wait(); err != nil { + waitCh <- "server exited with error: " + err.Error() + return + } + waitCh <- "server exited cleanly" + }() + + for { + select { + case sig := <-sigCh: + h.status("orchestrator", "signal: "+sig.String()) + goto shutdown + case line := <-waitCh: + h.status("orchestrator", line) + h.status("orchestrator", "server exited; orchestrator staying alive until interrupted") + } + } + +shutdown: + cancel() + stopProcess(serverCmd) + stopAllClients() + h.status("orchestrator", "shutdown complete") + + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer shutdownCancel() + _ = httpServer.Shutdown(shutdownCtx) + return nil +} diff --git a/webtest/static/app.js b/webtest/static/app.js new file mode 100644 index 0000000..191835f --- /dev/null +++ b/webtest/static/app.js @@ -0,0 +1,188 @@ +(() => { + const ws = new WebSocket(`ws://${location.host}/ws`); + + const clientTabs = document.getElementById("client-tabs"); + const clientPanels = document.getElementById("client-panels"); + const cardTemplate = document.getElementById("client-card-template"); + const tabTemplate = document.getElementById("client-tab-template"); + const addClientBtn = document.getElementById("add-client"); + const leftTabServer = document.getElementById("left-tab-server"); + const leftTabSystem = document.getElementById("left-tab-system"); + const leftPaneServer = document.getElementById("left-pane-server"); + const leftPaneSystem = document.getElementById("left-pane-system"); + const clients = new Map(); + const closedTabs = new Set(); + let activeClient = null; + + function append(target, text, stream, severity) { + const pane = document.getElementById(target); + if (!pane) return; + const line = document.createElement("div"); + line.className = [stream || "stdout", severity].filter(Boolean).join(" "); + line.textContent = text; + pane.appendChild(line); + pane.scrollTop = pane.scrollHeight; + } + + function appendStatus(text) { + append("system-log", text, "status"); + } + + function setLeftTab(name) { + const serverActive = name === "server"; + leftTabServer.classList.toggle("active", serverActive); + leftTabSystem.classList.toggle("active", !serverActive); + leftPaneServer.hidden = !serverActive; + leftPaneSystem.hidden = serverActive; + } + + function parseClientId(process) { + if (!process.startsWith("client-")) return null; + const raw = process.slice("client-".length); + if (!/^\d+$/.test(raw)) return null; + return Number(raw); + } + + async function closeClient(process) { + const id = parseClientId(process); + if (id == null) return; + await fetch(`/api/clients/${id}`, { method: "DELETE" }); + removeClient(process); + closedTabs.add(process); + } + + function setActiveClient(process) { + activeClient = process; + for (const [name, meta] of clients.entries()) { + const active = name === process; + meta.tab.classList.toggle("active", active); + meta.panel.classList.toggle("active", active); + meta.panel.hidden = !active; + } + } + + function removeClient(process) { + const meta = clients.get(process); + if (!meta) return; + meta.tab.remove(); + meta.panel.remove(); + clients.delete(process); + + if (activeClient === process) { + const next = clients.keys().next(); + if (!next.done) setActiveClient(next.value); + else activeClient = null; + } + } + + function closeTab(process) { + const meta = clients.get(process); + if (!meta) return; + meta.tab.remove(); + meta.panel.remove(); + clients.delete(process); + closedTabs.add(process); + + if (activeClient === process) { + const next = clients.keys().next(); + if (!next.done) setActiveClient(next.value); + else activeClient = null; + } + } + + function ensureClient(process) { + if (closedTabs.has(process)) return; + if (clients.has(process)) return; + + const tabClone = tabTemplate.content.cloneNode(true); + const tab = tabClone.querySelector(".client-tab"); + const tabSelect = tabClone.querySelector(".client-tab-select"); + tabSelect.textContent = process; + + const cardClone = cardTemplate.content.cloneNode(true); + const panel = cardClone.querySelector(".client-card"); + const name = cardClone.querySelector(".client-name"); + const stdout = cardClone.querySelector(".client-stdout"); + const stderr = cardClone.querySelector(".client-stderr"); + const closeTabBtn = cardClone.querySelector(".btn-close-tab"); + const killBtn = cardClone.querySelector(".btn-kill"); + + name.textContent = process; + stdout.id = `${process}-stdout`; + stderr.id = `${process}-stderr`; + panel.hidden = true; + + tabSelect.addEventListener("click", () => setActiveClient(process)); + closeTabBtn.addEventListener("click", async () => closeClient(process)); + killBtn.addEventListener("click", async () => closeClient(process)); + + clientTabs.appendChild(tabClone); + clientPanels.appendChild(cardClone); + clients.set(process, { tab, panel }); + + if (!activeClient) { + setActiveClient(process); + } + } + + function appendToProcess(process, stream, text) { + const safeStream = stream === "stderr" ? "stderr" : "stdout"; + let severity = ""; + if (safeStream === "stderr") { + const lower = text.toLowerCase(); + if (lower.startsWith("[") && lower.includes("] ")) { + const idx = lower.indexOf("] "); + const raw = lower.slice(idx + 2).trimStart(); + if (raw.startsWith("error")) severity = "stderr-error"; + else if (raw.startsWith("warning")) severity = "stderr-warning"; + else severity = "stderr-default"; + } else if (lower.startsWith("error")) { + severity = "stderr-error"; + } else if (lower.startsWith("warning")) { + severity = "stderr-warning"; + } else { + severity = "stderr-default"; + } + } + append(`${process}-${safeStream}`, text, stream, severity); + } + + addClientBtn.addEventListener("click", async () => { + const res = await fetch("/api/clients", { method: "POST" }); + if (!res.ok) return; + const payload = await res.json(); + if (payload && payload.name) { + closedTabs.delete(payload.name); + ensureClient(payload.name); + setActiveClient(payload.name); + } + }); + + leftTabServer.addEventListener("click", () => setLeftTab("server")); + leftTabSystem.addEventListener("click", () => setLeftTab("system")); + setLeftTab("server"); + + ws.onopen = () => { + appendStatus("[orchestrator/status] websocket connected"); + }; + + ws.onmessage = (event) => { + const msg = JSON.parse(event.data); + const line = `[${msg.process}/${msg.stream}] ${msg.line}`; + if (msg.process === "server") { + appendToProcess("server", msg.stream, line); + return; + } + if (msg.process.startsWith("client-")) { + closedTabs.delete(msg.process); + ensureClient(msg.process); + appendToProcess(msg.process, msg.stream, line); + return; + } + appendStatus(line); + }; + + ws.onclose = () => { + appendStatus("[orchestrator/status] websocket disconnected"); + }; +})(); diff --git a/webtest/static/tailwind.css b/webtest/static/tailwind.css new file mode 100644 index 0000000..591c48f --- /dev/null +++ b/webtest/static/tailwind.css @@ -0,0 +1,179 @@ +/* Vendored local Tailwind-style utility subset for webtest. */ +*, +::before, +::after { + box-sizing: border-box; +} + +html, +body { + margin: 0; + padding: 0; +} + +body { + font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Helvetica, Arial, sans-serif; + line-height: 1.5; +} + +.min-h-screen { min-height: 100vh; } +.mx-auto { margin-left: auto; margin-right: auto; } +.max-w-7xl { max-width: 80rem; } +.p-4 { padding: 1rem; } +.md\:p-6 { padding: 1rem; } +.mb-4 { margin-bottom: 1rem; } +.text-xl { font-size: 1.25rem; line-height: 1.75rem; } +.font-semibold { font-weight: 600; } +.font-medium { font-weight: 500; } +.tracking-tight { letter-spacing: -0.025em; } +.grid { display: grid; } +.grid-cols-1 { grid-template-columns: repeat(1, minmax(0, 1fr)); } +.md\:grid-cols-2 { grid-template-columns: repeat(1, minmax(0, 1fr)); } +.gap-4 { gap: 1rem; } +.h-\[80vh\] { height: 80vh; } +.flex { display: flex; } +.flex-col { flex-direction: column; } +.flex-1 { flex: 1 1 0%; } +.min-h-0 { min-height: 0; } +.rounded-lg { border-radius: 0.5rem; } +.border { border-width: 1px; border-style: solid; } +.border-b { border-bottom-width: 1px; border-bottom-style: solid; } +.px-3 { padding-left: 0.75rem; padding-right: 0.75rem; } +.py-2 { padding-top: 0.5rem; padding-bottom: 0.5rem; } +.p-3 { padding: 0.75rem; } +.overflow-auto { overflow: auto; } +.text-sm { font-size: 0.875rem; line-height: 1.25rem; } + +.bg-slate-950 { background-color: #020617; } +.bg-slate-900 { background-color: #0f172a; } +.text-slate-100 { color: #f1f5f9; } +.border-slate-700 { border-color: #334155; } + +.layout { + display: grid; + grid-template-columns: 1fr; + gap: 1rem; +} + +.clients-header { + display: flex; + justify-content: space-between; + align-items: center; +} + +.pane-tabs { + display: flex; + gap: 0.5rem; + align-items: center; +} + +.pane-tab { + border: 1px solid #334155; + border-radius: 0.5rem; + background: #1e293b; + color: #cbd5e1; + padding: 0.28rem 0.65rem; + cursor: pointer; +} + +.pane-tab.active { + background: #334155; + color: #f8fafc; +} + +.btn-add { + border: 1px solid #334155; + border-radius: 0.5rem; + background: #1e293b; + color: #e2e8f0; + padding: 0.3rem 0.7rem; + cursor: pointer; +} + +.client-tabs { + display: flex; + gap: 0.5rem; + overflow-x: auto; + padding: 0.75rem; + border-bottom: 1px solid #334155; +} + +.client-tab { + display: inline-flex; + align-items: center; + gap: 0.4rem; +} + +.client-tab-select { + border: 1px solid #334155; + border-radius: 0.5rem; + background: #1e293b; + color: #cbd5e1; + padding: 0.28rem 0.65rem; + cursor: pointer; + white-space: nowrap; +} + +.client-tab.active .client-tab-select { + background: #334155; + color: #f8fafc; +} + +.client-panels { + flex: 1; + min-height: 0; + overflow: auto; + padding: 0.75rem; +} + +.client-card { + min-height: 0; + display: flex; + flex-direction: column; +} + +.client-card[hidden] { + display: none !important; +} + +.client-card-header { + display: flex; + justify-content: space-between; + align-items: center; +} + +.client-actions { + display: inline-flex; + gap: 0.45rem; +} + +.btn-close-tab { + border: 1px solid #475569; + border-radius: 0.45rem; + background: transparent; + color: #cbd5e1; + padding: 0.2rem 0.6rem; + cursor: pointer; +} + +.btn-kill { + border: 1px solid #7f1d1d; + border-radius: 0.45rem; + background: #991b1b; + color: #fee2e2; + padding: 0.2rem 0.6rem; + cursor: pointer; +} + +.stdout { color: #a5d6ff; } +.stderr { color: #cbd5e1; } +.status { color: #7ee787; } +.stderr-default { color: #cbd5e1; } +.stderr-warning { color: #facc15; background: rgba(250, 204, 21, 0.12); } +.stderr-error { color: #f87171; background: rgba(248, 113, 113, 0.12); } + +@media (min-width: 768px) { + .layout { grid-template-columns: minmax(360px, 0.9fr) minmax(520px, 1.1fr); } + .md\:grid-cols-2 { grid-template-columns: repeat(2, minmax(0, 1fr)); } + .md\:p-6 { padding: 1.5rem; } +} diff --git a/webtest/templates.go b/webtest/templates.go new file mode 100644 index 0000000..efa9c9f --- /dev/null +++ b/webtest/templates.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "io" + + "github.com/a-h/templ" +) + +func indexPage() templ.Component { + return templ.ComponentFunc(func(_ context.Context, w io.Writer) error { + _, err := io.WriteString(w, ` + + + + + Transport WebTest + + + +
+

Transport WebTest

+
+
+
+ + +
+
+
+
stdout
+
+
+
+
stderr
+
+
+
+ +
+
+
+ clients + +
+
+
+
+
+
+ + + + + + +`) + return err + }) +} diff --git a/webtest/types.go b/webtest/types.go new file mode 100644 index 0000000..19edabb --- /dev/null +++ b/webtest/types.go @@ -0,0 +1,8 @@ +package main + +type logMessage struct { + Process string `json:"process"` + Stream string `json:"stream"` + Line string `json:"line"` + Time string `json:"time"` +}