From a3b96819381dbd1e06e220abeb8ab75107cb236a Mon Sep 17 00:00:00 2001 From: Peter Li Date: Sat, 7 Feb 2026 19:42:05 -0800 Subject: [PATCH] refactor + job running --- cmd/satoru/auth_session.go | 92 ++++++++++ cmd/satoru/input.go | 88 +++++++++ cmd/satoru/jobs.go | 182 ++++++++++++++++++ cmd/satoru/main.go | 354 ++++++------------------------------ cmd/satoru/scanner.go | 175 ++++++++++++++++++ internal/store/store.go | 237 +++++++++++++++++++++++- internal/webui/dashboard.go | 54 +++++- web/static/app.css | 9 + 8 files changed, 886 insertions(+), 305 deletions(-) create mode 100644 cmd/satoru/auth_session.go create mode 100644 cmd/satoru/input.go create mode 100644 cmd/satoru/jobs.go create mode 100644 cmd/satoru/scanner.go diff --git a/cmd/satoru/auth_session.go b/cmd/satoru/auth_session.go new file mode 100644 index 0000000..cc4af22 --- /dev/null +++ b/cmd/satoru/auth_session.go @@ -0,0 +1,92 @@ +package main + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "encoding/base64" + "encoding/hex" + "net/http" + "time" + + "satoru/internal/store" +) + +func (a *app) issueSession(w http.ResponseWriter, r *http.Request, userID int64) error { + token, err := generateToken() + if err != nil { + return err + } + + expiresAt := time.Now().Add(sessionTTL) + if err := a.store.CreateSession(r.Context(), userID, hashToken(token), expiresAt); err != nil { + return err + } + + setSessionCookie(w, r, token, expiresAt) + return nil +} + +func setSessionCookie(w http.ResponseWriter, r *http.Request, token string, expiresAt time.Time) { + http.SetCookie(w, &http.Cookie{ + Name: sessionCookieName, + Value: token, + Path: "/", + HttpOnly: true, + SameSite: http.SameSiteLaxMode, + Secure: r.TLS != nil, + Expires: expiresAt, + }) +} + +func clearSessionCookie(w http.ResponseWriter) { + http.SetCookie(w, &http.Cookie{ + Name: sessionCookieName, + Value: "", + Path: "/", + HttpOnly: true, + SameSite: http.SameSiteLaxMode, + MaxAge: -1, + Expires: time.Unix(0, 0), + }) +} + +func (a *app) currentUser(ctx context.Context, r *http.Request) (store.User, string, error) { + c, err := r.Cookie(sessionCookieName) + if err != nil || c.Value == "" { + return store.User{}, "", http.ErrNoCookie + } + user, err := a.store.UserBySessionTokenHash(ctx, hashToken(c.Value)) + if err != nil { + return store.User{}, "", err + } + return user, c.Value, nil +} + +func (a *app) currentUserWithRollingSession(w http.ResponseWriter, r *http.Request) (store.User, error) { + user, token, err := a.currentUser(r.Context(), r) + if err != nil { + return store.User{}, err + } + + expiresAt := time.Now().Add(sessionTTL) + if err := a.store.TouchSessionByTokenHash(r.Context(), hashToken(token), expiresAt); err != nil { + clearSessionCookie(w) + return store.User{}, err + } + setSessionCookie(w, r, token, expiresAt) + return user, nil +} + +func generateToken() (string, error) { + buf := make([]byte, 32) + if _, err := rand.Read(buf); err != nil { + return "", err + } + return base64.RawURLEncoding.EncodeToString(buf), nil +} + +func hashToken(token string) string { + sum := sha256.Sum256([]byte(token)) + return hex.EncodeToString(sum[:]) +} diff --git a/cmd/satoru/input.go b/cmd/satoru/input.go new file mode 100644 index 0000000..379b9b3 --- /dev/null +++ b/cmd/satoru/input.go @@ -0,0 +1,88 @@ +package main + +import ( + "errors" + "regexp" + "strconv" + "strings" + + "satoru/internal/store" + "satoru/internal/webui" +) + +var usernamePattern = regexp.MustCompile(`^[a-z0-9._-]{3,32}$`) + +func normalizeUsername(v string) string { + return strings.ToLower(strings.TrimSpace(v)) +} + +func validUsername(v string) bool { + return usernamePattern.MatchString(v) +} + +func defaultWorkflowStages() []webui.WorkflowStage { + return []webui.WorkflowStage{ + {Title: "Pull from Edge over SSH", Description: "Satoru connects to Linux edge hosts using local keys and pulls approved paths."}, + {Title: "Stage on Backup Server", Description: "Pulled data lands on the backup host first, keeping edge systems isolated from B2."}, + {Title: "Restic to B2", Description: "Restic runs centrally on this Satoru instance and uploads snapshots to Backblaze B2."}, + {Title: "Audit and Recover", Description: "Each site records run output/status for operational visibility before full job history is added."}, + } +} + +func parsePathList(raw string) []string { + split := strings.FieldsFunc(raw, func(r rune) bool { + return r == '\n' || r == ',' || r == ';' + }) + out := make([]string, 0, len(split)) + for _, item := range split { + item = strings.TrimSpace(item) + if item == "" { + continue + } + out = append(out, item) + } + return out +} + +func buildTargets(directoryPaths, sqlitePaths []string) []store.SiteTarget { + seen := map[string]struct{}{} + out := make([]store.SiteTarget, 0, len(directoryPaths)+len(sqlitePaths)) + + for _, p := range directoryPaths { + addTarget(&out, seen, store.SiteTarget{Path: p, Mode: "directory"}) + } + for _, p := range sqlitePaths { + addTarget(&out, seen, store.SiteTarget{Path: p, Mode: "sqlite_dump"}) + } + return out +} + +func addTarget(out *[]store.SiteTarget, seen map[string]struct{}, t store.SiteTarget) { + key := t.Mode + "\x00" + t.Path + if _, ok := seen[key]; ok { + return + } + seen[key] = struct{}{} + *out = append(*out, t) +} + +func targetsAreValid(targets []store.SiteTarget) bool { + for _, t := range targets { + if t.Path == "" || !strings.HasPrefix(t.Path, "/") { + return false + } + } + return true +} + +func parsePort(raw string) (int, error) { + raw = strings.TrimSpace(raw) + if raw == "" { + return 22, nil + } + port, err := strconv.Atoi(raw) + if err != nil || port < 1 || port > 65535 { + return 0, errors.New("invalid port") + } + return port, nil +} diff --git a/cmd/satoru/jobs.go b/cmd/satoru/jobs.go new file mode 100644 index 0000000..8b6f735 --- /dev/null +++ b/cmd/satoru/jobs.go @@ -0,0 +1,182 @@ +package main + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "satoru/internal/store" +) + +const ( + jobTypePreflight = "preflight" + jobPollInterval = 2 * time.Second + jobWorkers = 3 +) + +func (a *app) startJobWorkers(ctx context.Context) { + var wg sync.WaitGroup + for i := 0; i < jobWorkers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + a.runWorkerLoop(ctx, workerID) + }(i + 1) + } + <-ctx.Done() + wg.Wait() +} + +func (a *app) runWorkerLoop(ctx context.Context, workerID int) { + ticker := time.NewTicker(jobPollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + job, ok, err := a.store.TryStartNextQueuedJob(ctx) + if err != nil { + log.Printf("worker %d: failed to start job: %v", workerID, err) + continue + } + if !ok { + continue + } + a.executeJob(ctx, job) + } + } +} + +func (a *app) executeJob(ctx context.Context, job store.Job) { + site, err := a.store.SiteByID(ctx, job.SiteID) + if err != nil { + _ = a.store.CompleteJob(ctx, job.ID, "failed", "failed to load site") + return + } + + _ = a.store.UpdateSiteRunResult(ctx, site.ID, "running", fmt.Sprintf("Job #%d running (%s)", job.ID, job.Type), time.Now()) + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Job started"}) + + switch job.Type { + case jobTypePreflight: + status, summary := a.runPreflightJob(ctx, job, site) + _ = a.store.CompleteJob(ctx, job.ID, status, summary) + _ = a.store.UpdateSiteRunResult(ctx, site.ID, status, summary, time.Now()) + default: + summary := "unknown job type" + _ = a.store.CompleteJob(ctx, job.ID, "failed", summary) + _ = a.store.UpdateSiteRunResult(ctx, site.ID, "failed", summary, time.Now()) + } +} + +func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Site) (string, string) { + failures := 0 + warnings := 0 + + requiredLocal := []string{"ssh", "rsync", "restic", "gzip"} + for _, tool := range requiredLocal { + if _, err := exec.LookPath(tool); err != nil { + failures++ + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: fmt.Sprintf("local tool missing: %s", tool)}) + } + } + if failures > 0 { + return "failed", fmt.Sprintf("preflight failed: %d local tool checks failed", failures) + } + + if err := sshCheck(ctx, site, "echo preflight-ok"); err != nil { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "ssh connectivity failed: " + err.Error()}) + return "failed", "preflight failed: ssh connectivity" + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "ssh connectivity ok"}) + + for _, t := range site.Targets { + var err error + switch t.Mode { + case "directory": + err = sshCheck(ctx, site, fmt.Sprintf("test -d -- %s", shellQuote(t.Path))) + case "sqlite_dump": + err = sqlitePreflightCheck(ctx, site, t.Path) + default: + err = fmt.Errorf("unknown target mode: %s", t.Mode) + } + + if err != nil { + warnings++ + msg := fmt.Sprintf("target %s (%s): %s", t.Path, t.Mode, err.Error()) + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: msg}) + } else { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: fmt.Sprintf("target ok: %s (%s)", t.Path, t.Mode)}) + } + } + + switch { + case warnings == 0: + return "success", fmt.Sprintf("preflight passed (%d targets)", len(site.Targets)) + case warnings == len(site.Targets): + return "failed", fmt.Sprintf("preflight failed (%d/%d target checks failed)", warnings, len(site.Targets)) + default: + return "warning", fmt.Sprintf("preflight warning (%d/%d target checks failed)", warnings, len(site.Targets)) + } +} + +func sshCheck(ctx context.Context, site store.Site, remoteCmd string) error { + cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + + target := fmt.Sprintf("%s@%s", site.SSHUser, site.Host) + cmd := exec.CommandContext(cmdCtx, "ssh", "-p", strconv.Itoa(site.Port), target, remoteCmd) + out, err := cmd.CombinedOutput() + if err == nil { + return nil + } + msg := strings.TrimSpace(string(out)) + if msg == "" { + msg = err.Error() + } + return errors.New(msg) +} + +func sqlitePreflightCheck(ctx context.Context, site store.Site, dbPath string) error { + quoted := shellQuote(dbPath) + cmd := strings.Join([]string{ + "sqlite3 --version >/dev/null", + "test -r -- " + quoted, + "tmp=$(mktemp /tmp/satoru-preflight.XXXXXX)", + "sqlite3 " + quoted + " \".backup $tmp\"", + "rm -f -- \"$tmp\"", + }, " && ") + return sshCheck(ctx, site, cmd) +} + +func (a *app) enqueuePreflightJob(ctx context.Context, siteID int64) (store.Job, error) { + job, err := a.store.CreateJob(ctx, siteID, jobTypePreflight) + if err != nil { + return store.Job{}, err + } + _ = a.store.UpdateSiteRunResult(ctx, siteID, "queued", fmt.Sprintf("Job #%d queued (preflight)", job.ID), time.Now()) + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Job queued"}) + return job, nil +} + +func (a *app) latestJobForSite(ctx context.Context, siteID int64) (store.Job, error) { + jobs, err := a.store.ListRecentJobs(ctx, 200) + if err != nil { + return store.Job{}, err + } + for _, j := range jobs { + if j.SiteID == siteID { + return j, nil + } + } + return store.Job{}, sql.ErrNoRows +} diff --git a/cmd/satoru/main.go b/cmd/satoru/main.go index 437a1fa..252d933 100644 --- a/cmd/satoru/main.go +++ b/cmd/satoru/main.go @@ -2,19 +2,12 @@ package main import ( "context" - "crypto/rand" - "crypto/sha256" "database/sql" - "encoding/base64" - "encoding/hex" "errors" - "fmt" "log" "net/http" "os" - "os/exec" "path/filepath" - "regexp" "strconv" "strings" "time" @@ -54,6 +47,7 @@ func main() { defer cancel() go a.startSiteScanLoop(ctx) + go a.startJobWorkers(ctx) r := chi.NewRouter() @@ -65,6 +59,8 @@ func main() { r.Post("/account/password", a.handlePasswordSubmit) r.Post("/sites", a.handleSiteCreate) r.Post("/sites/{id}/run", a.handleSiteRun) + r.Post("/sites/{id}/update", a.handleSiteUpdate) + r.Post("/sites/{id}/delete", a.handleSiteDelete) r.Get("/signup", a.handleSignupPage) r.Post("/signup", a.handleSignupSubmit) r.Get("/signin", a.handleSigninPage) @@ -93,6 +89,7 @@ func (a *app) handleHome(w http.ResponseWriter, r *http.Request) { data := webui.DashboardData{ Now: time.Now(), + ScanInterval: scanInterval, User: user, Sites: sites, RuntimeChecks: runtimeChecks(), @@ -315,8 +312,7 @@ func (a *app) handleSiteRun(w http.ResponseWriter, r *http.Request) { return } - site, err := a.store.SiteByID(r.Context(), id) - if err != nil { + if _, err := a.store.SiteByID(r.Context(), id); err != nil { if errors.Is(err, sql.ErrNoRows) { http.NotFound(w, r) return @@ -325,314 +321,80 @@ func (a *app) handleSiteRun(w http.ResponseWriter, r *http.Request) { return } - status, output := runSSHHello(r.Context(), site) - if err := a.store.UpdateSiteRunResult(r.Context(), site.ID, status, output, time.Now()); err != nil { - http.Error(w, "failed to store run result", http.StatusInternalServerError) + if _, err := a.enqueuePreflightJob(r.Context(), id); err != nil { + http.Error(w, "failed to queue job", http.StatusInternalServerError) return } - http.Redirect(w, r, "/?msg=site-ran", http.StatusSeeOther) + http.Redirect(w, r, "/?msg=job-queued", http.StatusSeeOther) } -func (a *app) issueSession(w http.ResponseWriter, r *http.Request, userID int64) error { - token, err := generateToken() +func (a *app) handleSiteUpdate(w http.ResponseWriter, r *http.Request) { + if _, err := a.currentUserWithRollingSession(w, r); err != nil { + http.Redirect(w, r, "/signin", http.StatusSeeOther) + return + } + id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) if err != nil { - return err + http.Error(w, "invalid site id", http.StatusBadRequest) + return + } + if err := r.ParseForm(); err != nil { + http.Error(w, "invalid form", http.StatusBadRequest) + return } - expiresAt := time.Now().Add(sessionTTL) - if err := a.store.CreateSession(r.Context(), userID, hashToken(token), expiresAt); err != nil { - return err - } - - setSessionCookie(w, r, token, expiresAt) - return nil -} - -func setSessionCookie(w http.ResponseWriter, r *http.Request, token string, expiresAt time.Time) { - http.SetCookie(w, &http.Cookie{ - Name: sessionCookieName, - Value: token, - Path: "/", - HttpOnly: true, - SameSite: http.SameSiteLaxMode, - Secure: r.TLS != nil, - Expires: expiresAt, - }) -} - -func clearSessionCookie(w http.ResponseWriter) { - http.SetCookie(w, &http.Cookie{ - Name: sessionCookieName, - Value: "", - Path: "/", - HttpOnly: true, - SameSite: http.SameSiteLaxMode, - MaxAge: -1, - Expires: time.Unix(0, 0), - }) -} - -func (a *app) currentUser(ctx context.Context, r *http.Request) (store.User, string, error) { - c, err := r.Cookie(sessionCookieName) - if err != nil || c.Value == "" { - return store.User{}, "", http.ErrNoCookie - } - user, err := a.store.UserBySessionTokenHash(ctx, hashToken(c.Value)) + sshUser := strings.TrimSpace(r.FormValue("ssh_user")) + host := strings.TrimSpace(r.FormValue("host")) + port, err := parsePort(r.FormValue("port")) if err != nil { - return store.User{}, "", err + http.Redirect(w, r, "/?msg=site-invalid-port", http.StatusSeeOther) + return + } + directoryPaths := parsePathList(r.FormValue("directory_paths")) + sqlitePaths := parsePathList(r.FormValue("sqlite_paths")) + targets := buildTargets(directoryPaths, sqlitePaths) + if sshUser == "" || host == "" || len(targets) == 0 { + http.Redirect(w, r, "/?msg=site-invalid", http.StatusSeeOther) + return + } + if !targetsAreValid(targets) { + http.Redirect(w, r, "/?msg=site-invalid-path", http.StatusSeeOther) + return } - return user, c.Value, nil -} -func (a *app) currentUserWithRollingSession(w http.ResponseWriter, r *http.Request) (store.User, error) { - user, token, err := a.currentUser(r.Context(), r) + site, err := a.store.UpdateSite(r.Context(), id, sshUser, host, port, targets) if err != nil { - return store.User{}, err - } - - expiresAt := time.Now().Add(sessionTTL) - if err := a.store.TouchSessionByTokenHash(r.Context(), hashToken(token), expiresAt); err != nil { - clearSessionCookie(w) - return store.User{}, err - } - setSessionCookie(w, r, token, expiresAt) - return user, nil -} - -func generateToken() (string, error) { - buf := make([]byte, 32) - if _, err := rand.Read(buf); err != nil { - return "", err - } - return base64.RawURLEncoding.EncodeToString(buf), nil -} - -func hashToken(token string) string { - sum := sha256.Sum256([]byte(token)) - return hex.EncodeToString(sum[:]) -} - -var usernamePattern = regexp.MustCompile(`^[a-z0-9._-]{3,32}$`) - -func normalizeUsername(v string) string { - return strings.ToLower(strings.TrimSpace(v)) -} - -func validUsername(v string) bool { - return usernamePattern.MatchString(v) -} - -func runtimeChecks() []webui.RuntimeCheck { - tools := []string{"restic", "rsync", "ssh"} - out := make([]webui.RuntimeCheck, 0, len(tools)) - for _, name := range tools { - path, err := exec.LookPath(name) - if err != nil { - out = append(out, webui.RuntimeCheck{Name: name, Installed: false, Details: "not found in PATH"}) - continue - } - out = append(out, webui.RuntimeCheck{Name: name, Installed: true, Details: path}) - } - return out -} - -func runSSHHello(ctx context.Context, site store.Site) (string, string) { - target := fmt.Sprintf("%s@%s", site.SSHUser, site.Host) - cmdCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - - cmd := exec.CommandContext(cmdCtx, "ssh", "-p", strconv.Itoa(site.Port), target, "echo hello from satoru") - out, err := cmd.CombinedOutput() - output := strings.TrimSpace(string(out)) - if output == "" { - output = "(no output)" - } - if err != nil { - return "failed", output - } - return "ok", output -} - -func defaultWorkflowStages() []webui.WorkflowStage { - return []webui.WorkflowStage{ - {Title: "Pull from Edge over SSH", Description: "Satoru connects to Linux edge hosts using local keys and pulls approved paths."}, - {Title: "Stage on Backup Server", Description: "Pulled data lands on the backup host first, keeping edge systems isolated from B2."}, - {Title: "Restic to B2", Description: "Restic runs centrally on this Satoru instance and uploads snapshots to Backblaze B2."}, - {Title: "Audit and Recover", Description: "Each site records run output/status for operational visibility before full job history is added."}, - } -} - -func parsePathList(raw string) []string { - split := strings.FieldsFunc(raw, func(r rune) bool { - return r == '\n' || r == ',' || r == ';' - }) - out := make([]string, 0, len(split)) - for _, item := range split { - item = strings.TrimSpace(item) - if item == "" { - continue - } - out = append(out, item) - } - return out -} - -func buildTargets(directoryPaths, sqlitePaths []string) []store.SiteTarget { - out := make([]store.SiteTarget, 0, len(directoryPaths)+len(sqlitePaths)) - for _, p := range directoryPaths { - out = append(out, store.SiteTarget{Path: p, Mode: "directory"}) - } - for _, p := range sqlitePaths { - out = append(out, store.SiteTarget{Path: p, Mode: "sqlite_dump"}) - } - return out -} - -func targetsAreValid(targets []store.SiteTarget) bool { - for _, t := range targets { - if t.Path == "" || !strings.HasPrefix(t.Path, "/") { - return false - } - } - return true -} - -func parsePort(raw string) (int, error) { - raw = strings.TrimSpace(raw) - if raw == "" { - return 22, nil - } - port, err := strconv.Atoi(raw) - if err != nil || port < 1 || port > 65535 { - return 0, errors.New("invalid port") - } - return port, nil -} - -func (a *app) startSiteScanLoop(ctx context.Context) { - a.scanAllSites(ctx) - ticker := time.NewTicker(scanLoopTick) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): + if errors.Is(err, sql.ErrNoRows) { + http.NotFound(w, r) return - case <-ticker.C: - a.scanDueSites(ctx) } - } -} - -func (a *app) scanAllSites(ctx context.Context) { - sites, err := a.store.ListSites(ctx) - if err != nil { - log.Printf("scan loop: failed to list sites: %v", err) - return - } - for _, site := range sites { - a.scanSiteNow(ctx, site.ID) - } -} - -func (a *app) scanDueSites(ctx context.Context) { - sites, err := a.store.ListSites(ctx) - if err != nil { - log.Printf("scan loop: failed to list sites: %v", err) + http.Error(w, "failed to update site", http.StatusInternalServerError) return } - now := time.Now() - for _, site := range sites { - if site.LastScanAt.Valid && site.LastScanAt.Time.Add(scanInterval).After(now) { - continue - } - a.scanSiteNow(ctx, site.ID) - } + scanCtx, cancel := context.WithTimeout(context.Background(), 45*time.Second) + a.scanSiteNow(scanCtx, site.ID) + cancel() + http.Redirect(w, r, "/?msg=site-updated", http.StatusSeeOther) } -func (a *app) scanSiteNow(ctx context.Context, siteID int64) { - site, err := a.store.SiteByID(ctx, siteID) - if err != nil { - log.Printf("scan site %d: load failed: %v", siteID, err) +func (a *app) handleSiteDelete(w http.ResponseWriter, r *http.Request) { + if _, err := a.currentUserWithRollingSession(w, r); err != nil { + http.Redirect(w, r, "/signin", http.StatusSeeOther) return } - - scannedAt := time.Now() - success := 0 - failures := 0 - updated := make([]store.SiteTarget, 0, len(site.Targets)) - for _, target := range site.Targets { - size, outErr := queryTargetSize(ctx, site, target) - target.LastScanAt = sql.NullTime{Time: scannedAt, Valid: true} - if outErr != nil { - failures++ - target.LastSizeByte = sql.NullInt64{} - target.LastError = sql.NullString{String: outErr.Error(), Valid: true} - } else { - success++ - target.LastSizeByte = sql.NullInt64{Int64: size, Valid: true} - target.LastError = sql.NullString{} - } - updated = append(updated, target) - } - - state := "ok" - switch { - case len(site.Targets) == 0: - state = "failed" - case failures == len(site.Targets): - state = "failed" - case failures > 0: - state = "partial" - } - notes := fmt.Sprintf("%d/%d targets scanned", success, len(site.Targets)) - if err := a.store.UpdateSiteScanResult(ctx, site.ID, state, notes, scannedAt, updated); err != nil { - log.Printf("scan site %d: update failed: %v", siteID, err) - } -} - -func queryTargetSize(ctx context.Context, site store.Site, target store.SiteTarget) (int64, error) { - targetAddr := fmt.Sprintf("%s@%s", site.SSHUser, site.Host) - cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Second) - defer cancel() - - remote := remoteSizeCommand(target) - cmd := exec.CommandContext(cmdCtx, "ssh", "-p", strconv.Itoa(site.Port), targetAddr, remote) - out, err := cmd.CombinedOutput() - output := strings.TrimSpace(string(out)) + id, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) if err != nil { - if output == "" { - output = err.Error() + http.Error(w, "invalid site id", http.StatusBadRequest) + return + } + if err := a.store.DeleteSite(r.Context(), id); err != nil { + if errors.Is(err, sql.ErrNoRows) { + http.NotFound(w, r) + return } - return 0, errors.New(output) + http.Error(w, "failed to delete site", http.StatusInternalServerError) + return } - size, ok := extractLastInteger(output) - if !ok { - return 0, errors.New("empty size output") - } - return size, nil -} - -func remoteSizeCommand(target store.SiteTarget) string { - path := shellQuote(target.Path) - if target.Mode == "sqlite_dump" { - return fmt.Sprintf("stat -c%%s -- %s", path) - } - return fmt.Sprintf("du -sb -- %s | awk '{print $1}'", path) -} - -func shellQuote(s string) string { - return "'" + strings.ReplaceAll(s, "'", `'\''`) + "'" -} - -func extractLastInteger(output string) (int64, bool) { - fields := strings.Fields(output) - for i := len(fields) - 1; i >= 0; i-- { - v, err := strconv.ParseInt(fields[i], 10, 64) - if err == nil { - return v, true - } - } - return 0, false + http.Redirect(w, r, "/?msg=site-deleted", http.StatusSeeOther) } diff --git a/cmd/satoru/scanner.go b/cmd/satoru/scanner.go new file mode 100644 index 0000000..e6a7ee8 --- /dev/null +++ b/cmd/satoru/scanner.go @@ -0,0 +1,175 @@ +package main + +import ( + "context" + "database/sql" + "errors" + "fmt" + "log" + "os/exec" + "strconv" + "strings" + "time" + + "satoru/internal/store" + "satoru/internal/webui" +) + +func runtimeChecks() []webui.RuntimeCheck { + tools := []string{"restic", "rsync", "ssh"} + out := make([]webui.RuntimeCheck, 0, len(tools)) + for _, name := range tools { + path, err := exec.LookPath(name) + if err != nil { + out = append(out, webui.RuntimeCheck{Name: name, Installed: false, Details: "not found in PATH"}) + continue + } + out = append(out, webui.RuntimeCheck{Name: name, Installed: true, Details: path}) + } + return out +} + +func runSSHHello(ctx context.Context, site store.Site) (string, string) { + target := fmt.Sprintf("%s@%s", site.SSHUser, site.Host) + cmdCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + cmd := exec.CommandContext(cmdCtx, "ssh", "-p", strconv.Itoa(site.Port), target, "echo hello from satoru") + out, err := cmd.CombinedOutput() + output := strings.TrimSpace(string(out)) + if output == "" { + output = "(no output)" + } + if err != nil { + return "failed", output + } + return "ok", output +} + +func (a *app) startSiteScanLoop(ctx context.Context) { + a.scanAllSites(ctx) + ticker := time.NewTicker(scanLoopTick) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + a.scanDueSites(ctx) + } + } +} + +func (a *app) scanAllSites(ctx context.Context) { + sites, err := a.store.ListSites(ctx) + if err != nil { + log.Printf("scan loop: failed to list sites: %v", err) + return + } + for _, site := range sites { + a.scanSiteNow(ctx, site.ID) + } +} + +func (a *app) scanDueSites(ctx context.Context) { + sites, err := a.store.ListSites(ctx) + if err != nil { + log.Printf("scan loop: failed to list sites: %v", err) + return + } + + now := time.Now() + for _, site := range sites { + if site.LastScanAt.Valid && site.LastScanAt.Time.Add(scanInterval).After(now) { + continue + } + a.scanSiteNow(ctx, site.ID) + } +} + +func (a *app) scanSiteNow(ctx context.Context, siteID int64) { + site, err := a.store.SiteByID(ctx, siteID) + if err != nil { + log.Printf("scan site %d: load failed: %v", siteID, err) + return + } + + scannedAt := time.Now() + success := 0 + failures := 0 + updated := make([]store.SiteTarget, 0, len(site.Targets)) + for _, target := range site.Targets { + size, outErr := queryTargetSize(ctx, site, target) + target.LastScanAt = sql.NullTime{Time: scannedAt, Valid: true} + if outErr != nil { + failures++ + target.LastSizeByte = sql.NullInt64{} + target.LastError = sql.NullString{String: outErr.Error(), Valid: true} + } else { + success++ + target.LastSizeByte = sql.NullInt64{Int64: size, Valid: true} + target.LastError = sql.NullString{} + } + updated = append(updated, target) + } + + state := "ok" + switch { + case len(site.Targets) == 0: + state = "failed" + case failures == len(site.Targets): + state = "failed" + case failures > 0: + state = "partial" + } + notes := fmt.Sprintf("%d/%d targets scanned", success, len(site.Targets)) + if err := a.store.UpdateSiteScanResult(ctx, site.ID, state, notes, scannedAt, updated); err != nil { + log.Printf("scan site %d: update failed: %v", siteID, err) + } +} + +func queryTargetSize(ctx context.Context, site store.Site, target store.SiteTarget) (int64, error) { + targetAddr := fmt.Sprintf("%s@%s", site.SSHUser, site.Host) + cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + + remote := remoteSizeCommand(target) + cmd := exec.CommandContext(cmdCtx, "ssh", "-p", strconv.Itoa(site.Port), targetAddr, remote) + out, err := cmd.CombinedOutput() + output := strings.TrimSpace(string(out)) + if err != nil { + if output == "" { + output = err.Error() + } + return 0, errors.New(output) + } + size, ok := extractLastInteger(output) + if !ok { + return 0, errors.New("empty size output") + } + return size, nil +} + +func remoteSizeCommand(target store.SiteTarget) string { + path := shellQuote(target.Path) + if target.Mode == "sqlite_dump" { + return fmt.Sprintf("stat -c%%s -- %s", path) + } + return fmt.Sprintf("du -sb -- %s | awk '{print $1}'", path) +} + +func shellQuote(s string) string { + return "'" + strings.ReplaceAll(s, "'", `'\''`) + "'" +} + +func extractLastInteger(output string) (int64, bool) { + fields := strings.Fields(output) + for i := len(fields) - 1; i >= 0; i-- { + v, err := strconv.ParseInt(fields[i], 10, 64) + if err == nil { + return v, true + } + } + return 0, false +} diff --git a/internal/store/store.go b/internal/store/store.go index d74f18b..c343692 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -46,6 +46,24 @@ type SiteTarget struct { LastError sql.NullString } +type Job struct { + ID int64 + SiteID int64 + Type string + Status string + Summary sql.NullString + CreatedAt time.Time + StartedAt sql.NullTime + FinishedAt sql.NullTime +} + +type JobEvent struct { + JobID int64 + Level string + Message string + OccurredAt time.Time +} + func Open(path string) (*Store, error) { dsn := fmt.Sprintf("file:%s?_pragma=foreign_keys(1)", path) db, err := sql.Open("sqlite", dsn) @@ -110,7 +128,29 @@ CREATE TABLE IF NOT EXISTS site_targets ( mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump')), last_size_bytes INTEGER, last_scan_at DATETIME, - last_error TEXT + last_error TEXT, + UNIQUE(site_id, path, mode) +);` + + const jobsSQL = ` +CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE, + type TEXT NOT NULL, + status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'success', 'warning', 'failed')), + summary TEXT, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + started_at DATETIME, + finished_at DATETIME +);` + + const jobEventsSQL = ` +CREATE TABLE IF NOT EXISTS job_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE, + level TEXT NOT NULL, + message TEXT NOT NULL, + occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP );` if _, err := s.db.ExecContext(ctx, usersSQL); err != nil { @@ -125,6 +165,12 @@ CREATE TABLE IF NOT EXISTS site_targets ( if _, err := s.db.ExecContext(ctx, siteTargetsSQL); err != nil { return err } + if _, err := s.db.ExecContext(ctx, jobsSQL); err != nil { + return err + } + if _, err := s.db.ExecContext(ctx, jobEventsSQL); err != nil { + return err + } return nil } @@ -263,6 +309,61 @@ func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int, return s.SiteByID(ctx, id) } +func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string, port int, targets []SiteTarget) (Site, error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return Site{}, err + } + defer tx.Rollback() + + res, err := tx.ExecContext(ctx, `UPDATE sites SET ssh_user = ?, host = ?, port = ? WHERE id = ?`, sshUser, host, port, id) + if err != nil { + return Site{}, err + } + affected, err := res.RowsAffected() + if err != nil { + return Site{}, err + } + if affected == 0 { + return Site{}, sql.ErrNoRows + } + + if _, err := tx.ExecContext(ctx, `DELETE FROM site_targets WHERE site_id = ?`, id); err != nil { + return Site{}, err + } + for _, t := range targets { + if _, err := tx.ExecContext( + ctx, + `INSERT INTO site_targets (site_id, path, mode) VALUES (?, ?, ?)`, + id, + t.Path, + t.Mode, + ); err != nil { + return Site{}, err + } + } + + if err := tx.Commit(); err != nil { + return Site{}, err + } + return s.SiteByID(ctx, id) +} + +func (s *Store) DeleteSite(ctx context.Context, id int64) error { + res, err := s.db.ExecContext(ctx, `DELETE FROM sites WHERE id = ?`, id) + if err != nil { + return err + } + affected, err := res.RowsAffected() + if err != nil { + return err + } + if affected == 0 { + return sql.ErrNoRows + } + return nil +} + func (s *Store) ListSites(ctx context.Context) ([]Site, error) { const q = ` SELECT id, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes @@ -320,6 +421,123 @@ func (s *Store) UpdateSiteRunResult(ctx context.Context, id int64, status, outpu return err } +func (s *Store) CreateJob(ctx context.Context, siteID int64, jobType string) (Job, error) { + res, err := s.db.ExecContext( + ctx, + `INSERT INTO jobs (site_id, type, status) VALUES (?, ?, 'queued')`, + siteID, + jobType, + ) + if err != nil { + return Job{}, err + } + id, err := res.LastInsertId() + if err != nil { + return Job{}, err + } + return s.JobByID(ctx, id) +} + +func (s *Store) JobByID(ctx context.Context, id int64) (Job, error) { + const q = ` +SELECT id, site_id, type, status, summary, created_at, started_at, finished_at +FROM jobs +WHERE id = ?` + return scanJob(s.db.QueryRowContext(ctx, q, id)) +} + +func (s *Store) TryStartNextQueuedJob(ctx context.Context) (Job, bool, error) { + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return Job{}, false, err + } + defer tx.Rollback() + + var id int64 + if err := tx.QueryRowContext(ctx, `SELECT id FROM jobs WHERE status = 'queued' ORDER BY id ASC LIMIT 1`).Scan(&id); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return Job{}, false, nil + } + return Job{}, false, err + } + + res, err := tx.ExecContext(ctx, `UPDATE jobs SET status = 'running', started_at = ? WHERE id = ? AND status = 'queued'`, time.Now().UTC().Format(time.RFC3339), id) + if err != nil { + return Job{}, false, err + } + affected, err := res.RowsAffected() + if err != nil { + return Job{}, false, err + } + if affected == 0 { + return Job{}, false, nil + } + + job, err := scanJob(tx.QueryRowContext(ctx, ` +SELECT id, site_id, type, status, summary, created_at, started_at, finished_at +FROM jobs +WHERE id = ?`, id)) + if err != nil { + return Job{}, false, err + } + + if err := tx.Commit(); err != nil { + return Job{}, false, err + } + return job, true, nil +} + +func (s *Store) CompleteJob(ctx context.Context, jobID int64, status, summary string) error { + _, err := s.db.ExecContext( + ctx, + `UPDATE jobs SET status = ?, summary = ?, finished_at = ? WHERE id = ?`, + status, + summary, + time.Now().UTC().Format(time.RFC3339), + jobID, + ) + return err +} + +func (s *Store) AddJobEvent(ctx context.Context, event JobEvent) error { + _, err := s.db.ExecContext( + ctx, + `INSERT INTO job_events (job_id, level, message) VALUES (?, ?, ?)`, + event.JobID, + event.Level, + event.Message, + ) + return err +} + +func (s *Store) ListRecentJobs(ctx context.Context, limit int) ([]Job, error) { + if limit <= 0 { + limit = 20 + } + rows, err := s.db.QueryContext(ctx, ` +SELECT id, site_id, type, status, summary, created_at, started_at, finished_at +FROM jobs +ORDER BY id DESC +LIMIT ?`, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []Job + for rows.Next() { + job, err := scanJob(rows) + if err != nil { + return nil, err + } + out = append(out, job) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + func (s *Store) UpdateSiteScanResult(ctx context.Context, siteID int64, state, notes string, scannedAt time.Time, targets []SiteTarget) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { @@ -368,6 +586,23 @@ type scanner interface { Scan(dest ...any) error } +func scanJob(row scanner) (Job, error) { + var job Job + if err := row.Scan( + &job.ID, + &job.SiteID, + &job.Type, + &job.Status, + &job.Summary, + &job.CreatedAt, + &job.StartedAt, + &job.FinishedAt, + ); err != nil { + return Job{}, err + } + return job, nil +} + func scanUser(row scanner) (User, error) { var user User var isAdmin int diff --git a/internal/webui/dashboard.go b/internal/webui/dashboard.go index e6bb653..2720a87 100644 --- a/internal/webui/dashboard.go +++ b/internal/webui/dashboard.go @@ -27,6 +27,7 @@ type WorkflowStage struct { type DashboardData struct { Now time.Time + ScanInterval time.Duration User store.User Sites []store.Site RuntimeChecks []RuntimeCheck @@ -112,10 +113,26 @@ func Dashboard(data DashboardData) templ.Component {

%s@%s:%d

-
- -
+
+
+ +
+
+ +
+
+
+ Edit site +
+ + + + + + +
+

Backup targets:

Scan: %s · Last: %s · Next: %s

@@ -128,11 +145,18 @@ func Dashboard(data DashboardData) templ.Component { html.EscapeString(site.Host), site.Port, site.ID, + site.ID, + site.ID, + html.EscapeString(site.SSHUser), + html.EscapeString(site.Host), + site.Port, + html.EscapeString(joinTargetPaths(site.Targets, "directory")), + html.EscapeString(joinTargetPaths(site.Targets, "sqlite_dump")), targets.String(), html.EscapeString(scanState), html.EscapeString(scanState), html.EscapeString(lastScan), - html.EscapeString(timeUntilNextScan(data.Now, site.LastScanAt)), + html.EscapeString(timeUntilNextScan(data.Now, site.LastScanAt, data.ScanInterval)), html.EscapeString(scanNotes), html.EscapeString(last), html.EscapeString(runStatus), @@ -209,8 +233,12 @@ func formatFlash(code string) string { switch code { case "site-added": return "Site added." - case "site-ran": - return "Run completed." + case "job-queued": + return "Preflight job queued." + case "site-updated": + return "Site updated." + case "site-deleted": + return "Site deleted." case "site-invalid": return "SSH user, host, and at least one target path are required." case "site-invalid-path": @@ -231,6 +259,16 @@ func targetModeLabel(mode string) string { return "directory" } +func joinTargetPaths(targets []store.SiteTarget, mode string) string { + var out []string + for _, t := range targets { + if t.Mode == mode { + out = append(out, t.Path) + } + } + return strings.Join(out, "\n") +} + func targetModeClass(mode string) string { if mode == "sqlite_dump" { return "sqlite" @@ -253,11 +291,11 @@ func formatBytes(v int64) string { return fmt.Sprintf("%.1f PB", value/1024) } -func timeUntilNextScan(now time.Time, lastScan sql.NullTime) string { +func timeUntilNextScan(now time.Time, lastScan sql.NullTime, interval time.Duration) string { if !lastScan.Valid { return "due now" } - next := lastScan.Time.Add(24 * time.Hour) + next := lastScan.Time.Add(interval) if !next.After(now) { return "due now" } diff --git a/web/static/app.css b/web/static/app.css index 3303949..6a241c3 100644 --- a/web/static/app.css +++ b/web/static/app.css @@ -197,6 +197,15 @@ h2 { margin: 0; } +.edit-panel { + margin-top: 0.8rem; +} + +.edit-panel summary { + cursor: pointer; + color: #7dd3fc; +} + .pill { display: inline-block; text-transform: uppercase;