satoru/cmd/satoru/progress_ws.go

161 lines
3.9 KiB
Go

package main
import (
"context"
"net/http"
"strings"
"time"
"github.com/gorilla/websocket"
"go.uber.org/zap"
)
var progressUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
origin := strings.TrimSpace(r.Header.Get("Origin"))
if origin == "" {
return true
}
return strings.Contains(origin, r.Host)
},
}
type liveSiteStatus struct {
SiteID int64 `json:"site_id"`
LastRunStatus string `json:"last_run_status"`
LastRunOutput string `json:"last_run_output"`
LastRunAt time.Time `json:"last_run_at,omitempty"`
}
type liveJobStatus struct {
JobID int64 `json:"job_id"`
SiteID int64 `json:"site_id"`
Type string `json:"type"`
Status string `json:"status"`
Summary string `json:"summary,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
}
type liveJobEvent struct {
JobID int64 `json:"job_id"`
SiteID int64 `json:"site_id"`
JobType string `json:"job_type"`
Level string `json:"level"`
Message string `json:"message"`
OccurredAt time.Time `json:"occurred_at"`
}
type liveProgressPayload struct {
Timestamp time.Time `json:"timestamp"`
Sites []liveSiteStatus `json:"sites"`
Jobs []liveJobStatus `json:"jobs"`
Events []liveJobEvent `json:"events"`
}
func (a *app) handleProgressWebSocket(w http.ResponseWriter, r *http.Request) {
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
conn, err := progressUpgrader.Upgrade(w, r, nil)
if err != nil {
a.log.Debug("progress ws upgrade failed", zap.Error(err))
return
}
defer conn.Close()
send := func() error {
payload, err := a.buildLiveProgressPayload(r.Context())
if err != nil {
return err
}
_ = conn.SetWriteDeadline(time.Now().Add(8 * time.Second))
return conn.WriteJSON(payload)
}
if err := send(); err != nil {
a.log.Debug("progress ws initial send failed", zap.Error(err))
return
}
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-r.Context().Done():
return
case <-ticker.C:
if err := send(); err != nil {
return
}
}
}
}
func (a *app) buildLiveProgressPayload(ctx context.Context) (liveProgressPayload, error) {
sites, err := a.store.ListSites(ctx)
if err != nil {
return liveProgressPayload{}, err
}
jobs, err := a.store.ListRecentJobs(ctx, 30)
if err != nil {
return liveProgressPayload{}, err
}
events, err := a.store.ListRecentJobEvents(ctx, 60)
if err != nil {
return liveProgressPayload{}, err
}
payload := liveProgressPayload{
Timestamp: time.Now().UTC(),
Sites: make([]liveSiteStatus, 0, len(sites)),
Jobs: make([]liveJobStatus, 0, len(jobs)),
Events: make([]liveJobEvent, 0, len(events)),
}
for _, site := range sites {
row := liveSiteStatus{
SiteID: site.ID,
LastRunStatus: strings.TrimSpace(site.LastRunStatus.String),
LastRunOutput: strings.TrimSpace(site.LastRunOutput.String),
}
if row.LastRunStatus == "" {
row.LastRunStatus = "pending"
}
if site.LastRunAt.Valid {
row.LastRunAt = site.LastRunAt.Time
}
payload.Sites = append(payload.Sites, row)
}
for _, job := range jobs {
row := liveJobStatus{
JobID: job.ID,
SiteID: job.SiteID,
Type: job.Type,
Status: job.Status,
Summary: strings.TrimSpace(job.Summary.String),
}
if job.StartedAt.Valid {
row.StartedAt = job.StartedAt.Time
}
if job.FinishedAt.Valid {
row.FinishedAt = job.FinishedAt.Time
}
payload.Jobs = append(payload.Jobs, row)
}
for _, ev := range events {
payload.Events = append(payload.Events, liveJobEvent{
JobID: ev.JobID,
SiteID: ev.SiteID,
JobType: ev.JobType,
Level: ev.Level,
Message: ev.Message,
OccurredAt: ev.OccurredAt,
})
}
return payload, nil
}