package main import ( "encoding/json" "log" "sync" "time" "github.com/gorilla/websocket" ) type hub struct { mu sync.Mutex clients map[*websocket.Conn]struct{} history []logMessage maxHistory int } func newHub() *hub { return &hub{ clients: make(map[*websocket.Conn]struct{}), history: make([]logMessage, 0, 512), maxHistory: 2000, } } func (h *hub) add(c *websocket.Conn) { h.mu.Lock() h.clients[c] = struct{}{} snapshot := append([]logMessage(nil), h.history...) clientCount := len(h.clients) h.mu.Unlock() log.Printf("ws client connected remote=%s clients=%d", c.RemoteAddr(), clientCount) h.replay(c, snapshot) } func (h *hub) remove(c *websocket.Conn) { h.mu.Lock() defer h.mu.Unlock() delete(h.clients, c) log.Printf("ws client disconnected remote=%s clients=%d", c.RemoteAddr(), len(h.clients)) } func (h *hub) broadcast(msg logMessage) { log.Printf("message process=%s stream=%s line=%q time=%s", msg.Process, msg.Stream, msg.Line, msg.Time) payload, err := json.Marshal(msg) if err != nil { log.Printf("broadcast marshal error process=%s stream=%s err=%v", msg.Process, msg.Stream, err) return } h.mu.Lock() defer h.mu.Unlock() h.history = append(h.history, msg) if len(h.history) > h.maxHistory { h.history = h.history[len(h.history)-h.maxHistory:] } for c := range h.clients { _ = c.SetWriteDeadline(time.Now().Add(2 * time.Second)) if err := c.WriteMessage(websocket.TextMessage, payload); err != nil { log.Printf("broadcast write error remote=%s err=%v", c.RemoteAddr(), err) _ = c.Close() delete(h.clients, c) } } } func (h *hub) replay(c *websocket.Conn, snapshot []logMessage) { for _, msg := range snapshot { payload, err := json.Marshal(msg) if err != nil { continue } _ = c.SetWriteDeadline(time.Now().Add(2 * time.Second)) if err := c.WriteMessage(websocket.TextMessage, payload); err != nil { log.Printf("ws replay failed remote=%s err=%v", c.RemoteAddr(), err) return } } log.Printf("ws replay complete remote=%s messages=%d", c.RemoteAddr(), len(snapshot)) } func (h *hub) status(process, line string) { log.Printf("status process=%s line=%s", process, line) h.broadcast(logMessage{ Process: process, Stream: "status", Line: line, Time: time.Now().Format(time.RFC3339), }) }