From e5225a1353b9eed69ae5d45df0ad54f8b3cb332c Mon Sep 17 00:00:00 2001 From: Peter Li Date: Sat, 7 Feb 2026 20:51:36 -0800 Subject: [PATCH] saving --- AGENTS.md | 5 + cmd/satoru/backup_job.go | 438 +++++++++++++++++++++++++++++++++++ cmd/satoru/config.go | 39 ++++ cmd/satoru/input.go | 18 ++ cmd/satoru/jobs.go | 225 ++++++++++++++++-- cmd/satoru/main.go | 114 ++++++++- cmd/satoru/maintenance.go | 70 ++++++ cmd/satoru/progress_ws.go | 160 +++++++++++++ cmd/satoru/sqlite_backup.go | 18 ++ go.mod | 1 + go.sum | 2 + internal/store/migrations.go | 270 +++++++++++++++++++++ internal/store/store.go | 262 +++++++++++++-------- internal/webui/dashboard.go | 228 +++++++++++++++++- web/static/app.css | 46 ++++ 15 files changed, 1772 insertions(+), 124 deletions(-) create mode 100644 cmd/satoru/backup_job.go create mode 100644 cmd/satoru/config.go create mode 100644 cmd/satoru/maintenance.go create mode 100644 cmd/satoru/progress_ws.go create mode 100644 cmd/satoru/sqlite_backup.go create mode 100644 internal/store/migrations.go diff --git a/AGENTS.md b/AGENTS.md index e2a95aa..b76599d 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -49,3 +49,8 @@ Be proactive with debug-level logging for: Logs should include useful identifiers where possible: - `job_id`, `site_id`, `job_type`, `target_path`, `target_mode`, `status`, `error`. + +## Data Safety +Do not delete `data/satoru.db` during normal development, smoke checks, or troubleshooting. + +Use forward migrations to evolve schema/data for soft-launched deployments. diff --git a/cmd/satoru/backup_job.go b/cmd/satoru/backup_job.go new file mode 100644 index 0000000..c2b2463 --- /dev/null +++ b/cmd/satoru/backup_job.go @@ -0,0 +1,438 @@ +package main + +import ( + "bufio" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + "go.uber.org/zap" + + "satoru/internal/store" +) + +const ( + defaultStagingRoot = "./backups" + defaultResticRepo = "./repos/restic" +) + +func (a *app) runBackupJob(ctx context.Context, job store.Job, site store.Site) (string, string) { + a.log.Debug("backup job begin", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("site_uuid", site.SiteUUID), zap.Int("targets", len(site.Targets))) + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Backup job started"}) + + preflightStatus, preflightSummary := a.runPreflightJob(ctx, job, site) + if preflightStatus == "failed" { + msg := "backup aborted by preflight: " + preflightSummary + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: msg}) + return "failed", msg + } + if preflightStatus == "warning" { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "preflight warning; continuing backup"}) + } + + stagingRoot := getenvDefault("SATORU_STAGING_ROOT", defaultStagingRoot) + resticRepo := getenvDefault("SATORU_RESTIC_REPO", defaultResticRepo) + if err := os.MkdirAll(stagingRoot, 0o700); err != nil { + return "failed", "failed to create staging root" + } + if err := os.MkdirAll(resticRepo, 0o700); err != nil { + return "failed", "failed to create restic repo directory" + } + + successes := 0 + failures := 0 + stagedPaths := map[string]struct{}{} + + for _, target := range site.Targets { + stageDir := targetStageDir(stagingRoot, site.SiteUUID, target) + if err := os.MkdirAll(stageDir, 0o700); err != nil { + failures++ + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: fmt.Sprintf("stage dir failed for %s: %v", target.Path, err)}) + continue + } + + var err error + switch target.Mode { + case "directory": + err = a.pullDirectoryTarget(ctx, job.ID, site, target, stageDir) + case "sqlite_dump": + err = pullSQLiteTarget(ctx, job.ID, site, target, stageDir) + default: + err = fmt.Errorf("unknown target mode: %s", target.Mode) + } + + if err != nil { + failures++ + a.log.Debug("backup target failed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("target_path", target.Path), zap.String("target_mode", target.Mode), zap.Error(err)) + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: fmt.Sprintf("target failed: %s (%s): %v", target.Path, target.Mode, err)}) + continue + } + + successes++ + stagedPaths[stageDir] = struct{}{} + a.log.Debug("backup target success", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("target_path", target.Path), zap.String("target_mode", target.Mode), zap.String("stage_dir", stageDir)) + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: fmt.Sprintf("target synced: %s (%s)", target.Path, target.Mode)}) + } + + if successes == 0 { + return "failed", fmt.Sprintf("backup failed: %d/%d targets failed", failures, len(site.Targets)) + } + + paths := make([]string, 0, len(stagedPaths)) + for p := range stagedPaths { + paths = append(paths, p) + } + if err := runResticBackup(ctx, resticRepo, site, job.ID, paths); err != nil { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic backup failed: " + err.Error()}) + return "failed", "backup failed: restic backup error" + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "restic backup completed"}) + + if failures > 0 || preflightStatus == "warning" { + return "warning", fmt.Sprintf("backup warning: %d/%d targets synced", successes, len(site.Targets)) + } + return "success", fmt.Sprintf("backup complete: %d/%d targets synced", successes, len(site.Targets)) +} + +func (a *app) pullDirectoryTarget(ctx context.Context, jobID int64, site store.Site, target store.SiteTarget, stageDir string) error { + sshCmd := fmt.Sprintf("ssh -p %d", site.Port) + remote := fmt.Sprintf("%s@%s:%s/", site.SSHUser, site.Host, target.Path) + cmdCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + args := []string{"-a", "--delete", "--info=progress2", "-e", sshCmd} + for _, filter := range site.Filters { + args = append(args, "--exclude", filter) + } + args = append(args, remote, stageDir+"/") + cmd := exec.CommandContext(cmdCtx, "rsync", args...) + stderr, err := cmd.StderrPipe() + if err != nil { + return err + } + stdout, err := cmd.StdoutPipe() + if err != nil { + return err + } + if err := cmd.Start(); err != nil { + return err + } + go func() { + _, _ = io.Copy(io.Discard, stdout) + }() + + scanner := bufio.NewScanner(stderr) + scanner.Split(scanCRLF) + + lastEmit := time.Time{} + tail := make([]string, 0, 20) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + tail = append(tail, line) + if len(tail) > 20 { + tail = tail[len(tail)-20:] + } + + progress := parseRsyncProgress(line) + if progress == "" { + continue + } + if time.Since(lastEmit) < 2*time.Second { + continue + } + lastEmit = time.Now() + _ = a.store.AddJobEvent(ctx, store.JobEvent{ + JobID: jobID, + Level: "info", + Message: fmt.Sprintf("rsync %s: %s", target.Path, progress), + }) + } + if err := scanner.Err(); err != nil { + return err + } + if err := cmd.Wait(); err != nil { + msg := strings.TrimSpace(strings.Join(tail, "\n")) + if msg == "" { + msg = err.Error() + } + return errors.New(msg) + } + return nil +} + +func pullSQLiteTarget(ctx context.Context, jobID int64, site store.Site, target store.SiteTarget, stageDir string) error { + tmpBase := fmt.Sprintf("/tmp/satoru-backup-%d-%s.sqlite3", jobID, shortHash(target.Path)) + quotedDB := shellQuote(target.Path) + quotedTmp := shellQuote(tmpBase) + remoteCmd := strings.Join([]string{ + sqliteBackupCommand(quotedDB, quotedTmp), + fmt.Sprintf("gzip -f -- %s", quotedTmp), + }, " && ") + if err := sshCheck(ctx, site, remoteCmd); err != nil { + _ = sshCheck(ctx, site, fmt.Sprintf("rm -f -- %s %s", shellQuote(tmpBase), shellQuote(tmpBase+".gz"))) + return err + } + + sshCmd := fmt.Sprintf("ssh -p %d", site.Port) + remoteGz := fmt.Sprintf("%s@%s:%s", site.SSHUser, site.Host, tmpBase+".gz") + localFile := filepath.Join(stageDir, "sqlite-backup.sql.gz") + + cmdCtx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + cmd := exec.CommandContext(cmdCtx, "rsync", "-a", "-e", sshCmd, remoteGz, localFile) + out, err := cmd.CombinedOutput() + _ = sshCheck(ctx, site, fmt.Sprintf("rm -f -- %s %s", shellQuote(tmpBase), shellQuote(tmpBase+".gz"))) + if err != nil { + return errors.New(strings.TrimSpace(string(out))) + } + return nil +} + +func runResticBackup(ctx context.Context, repoPath string, site store.Site, jobID int64, stagedPaths []string) error { + if err := ensureResticRepo(ctx, repoPath); err != nil { + return err + } + + args := []string{"-r", repoPath, "backup"} + args = append(args, stagedPaths...) + args = append(args, "--tag", "site_uuid:"+site.SiteUUID, "--tag", "site_id:"+strconv.FormatInt(site.ID, 10), "--tag", "job_id:"+strconv.FormatInt(jobID, 10)) + + cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Minute) + defer cancel() + cmd := exec.CommandContext(cmdCtx, "restic", args...) + cmd.Env = resticEnv() + out, err := cmd.CombinedOutput() + if err != nil { + return errors.New(strings.TrimSpace(string(out))) + } + return nil +} + +func (a *app) runRetentionJob(ctx context.Context, job store.Job, site store.Site) (string, string) { + repoPath := getenvDefault("SATORU_RESTIC_REPO", defaultResticRepo) + if err := ensureResticRepo(ctx, repoPath); err != nil { + return "failed", "retention failed: restic repo unavailable" + } + + retentionArgs := strings.Fields(getenvDefault("SATORU_RESTIC_RETENTION_ARGS", "--keep-daily 7 --keep-weekly 4 --keep-monthly 6")) + args := []string{"-r", repoPath, "forget", "--prune", "--tag", "site_uuid:" + site.SiteUUID} + args = append(args, retentionArgs...) + a.log.Debug("retention command", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.Strings("args", args)) + + cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Minute) + defer cancel() + cmd := exec.CommandContext(cmdCtx, "restic", args...) + cmd.Env = resticEnv() + out, err := cmd.CombinedOutput() + if err != nil { + msg := strings.TrimSpace(string(out)) + if msg == "" { + msg = err.Error() + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "retention failed: " + msg}) + return "failed", "retention failed" + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "retention completed"}) + return "success", "retention completed" +} + +func (a *app) runResticSyncJob(ctx context.Context, job store.Job, site store.Site) (string, string) { + b2Repo := strings.TrimSpace(os.Getenv("SATORU_RESTIC_B2_REPOSITORY")) + if b2Repo == "" { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "restic sync skipped: SATORU_RESTIC_B2_REPOSITORY not set"}) + return "warning", "restic sync skipped: B2 repository not configured" + } + + repoPath := getenvDefault("SATORU_RESTIC_REPO", defaultResticRepo) + if err := ensureResticRepo(ctx, repoPath); err != nil { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic sync failed: local repo unavailable"}) + return "failed", "restic sync failed: local repo unavailable" + } + + snapshotID, err := latestSnapshotIDForSite(ctx, repoPath, site.SiteUUID) + if err != nil { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic sync failed: " + err.Error()}) + return "failed", "restic sync failed: local snapshot lookup error" + } + if snapshotID == "" { + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "restic sync skipped: no local snapshots for site"}) + return "warning", "restic sync skipped: no local snapshots for site" + } + + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "restic sync started"}) + args := []string{"-r", repoPath, "copy", snapshotID, "--repo2", b2Repo} + a.log.Debug("restic sync copy", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("local_repo", repoPath), zap.String("b2_repo", b2Repo), zap.String("snapshot_id", snapshotID), zap.Strings("args", args)) + + cmdCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + cmd := exec.CommandContext(cmdCtx, "restic", args...) + cmd.Env = resticEnv() + out, err := cmd.CombinedOutput() + if err != nil { + msg := strings.TrimSpace(string(out)) + if msg == "" { + msg = err.Error() + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic sync failed: " + msg}) + return "failed", "restic sync failed" + } + + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "restic sync completed"}) + return "success", "restic sync completed" +} + +func ensureResticRepo(ctx context.Context, repoPath string) error { + check := exec.CommandContext(ctx, "restic", "-r", repoPath, "cat", "config") + check.Env = resticEnv() + if err := check.Run(); err == nil { + return nil + } + + initCmd := exec.CommandContext(ctx, "restic", "-r", repoPath, "init") + initCmd.Env = resticEnv() + out, err := initCmd.CombinedOutput() + if err != nil && !strings.Contains(string(out), "already initialized") { + return errors.New(strings.TrimSpace(string(out))) + } + return nil +} + +func resticEnv() []string { + env := os.Environ() + password := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD")) + passwordFile := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD_FILE")) + password2 := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD2")) + passwordFile2 := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD_FILE2")) + if password == "" && passwordFile == "" { + password = configuredResticPassword() + } + if password != "" { + env = append(env, "RESTIC_PASSWORD="+password) + } + if passwordFile != "" { + env = append(env, "RESTIC_PASSWORD_FILE="+passwordFile) + } + if password2 != "" { + env = append(env, "RESTIC_PASSWORD2="+password2) + } + if passwordFile2 != "" { + env = append(env, "RESTIC_PASSWORD_FILE2="+passwordFile2) + } + return env +} + +type resticSnapshot struct { + ID string `json:"id"` + Time time.Time `json:"time"` +} + +func latestSnapshotIDForSite(ctx context.Context, repoPath, siteUUID string) (string, error) { + args := []string{"-r", repoPath, "snapshots", "--json", "--tag", "site_uuid:" + siteUUID} + cmdCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + cmd := exec.CommandContext(cmdCtx, "restic", args...) + cmd.Env = resticEnv() + out, err := cmd.CombinedOutput() + if err != nil { + msg := strings.TrimSpace(string(out)) + if msg == "" { + msg = err.Error() + } + return "", errors.New(msg) + } + + var snapshots []resticSnapshot + if err := json.Unmarshal(out, &snapshots); err != nil { + return "", err + } + if len(snapshots) == 0 { + return "", nil + } + + latest := snapshots[0] + for _, snap := range snapshots[1:] { + if snap.Time.After(latest.Time) { + latest = snap + } + } + return latest.ID, nil +} + +func targetStageDir(root, siteUUID string, target store.SiteTarget) string { + hash := hashPath(target.Path) + modeDir := "dir" + if target.Mode == "sqlite_dump" { + modeDir = "sqlite" + } + return filepath.Join(root, siteUUID, hash, modeDir) +} + +func hashPath(path string) string { + sum := sha256.Sum256([]byte(path)) + return hex.EncodeToString(sum[:]) +} + +func shortHash(path string) string { + v := hashPath(path) + if len(v) < 12 { + return v + } + return v[:12] +} + +func parseRsyncProgress(line string) string { + if !strings.Contains(line, "/s") || !strings.Contains(line, "to-chk=") { + return "" + } + fields := strings.Fields(strings.ReplaceAll(line, ",", "")) + var percent, speed, eta string + for _, f := range fields { + if strings.HasSuffix(f, "%") { + percent = f + } + if strings.HasSuffix(f, "/s") { + speed = f + } + if strings.Count(f, ":") == 2 { + eta = f + } + } + if speed == "" { + return "" + } + parts := make([]string, 0, 3) + if percent != "" { + parts = append(parts, percent) + } + parts = append(parts, speed) + if eta != "" { + parts = append(parts, "eta "+eta) + } + return strings.Join(parts, " ") +} + +func scanCRLF(data []byte, atEOF bool) (advance int, token []byte, err error) { + for i := 0; i < len(data); i++ { + if data[i] == '\n' || data[i] == '\r' { + return i + 1, data[:i], nil + } + } + if atEOF && len(data) > 0 { + return len(data), data, nil + } + return 0, nil, nil +} diff --git a/cmd/satoru/config.go b/cmd/satoru/config.go new file mode 100644 index 0000000..b7821e0 --- /dev/null +++ b/cmd/satoru/config.go @@ -0,0 +1,39 @@ +package main + +import ( + "os" + "strconv" + "strings" +) + +const defaultResticPassword = "satoru-change-me" + +func getenvDefault(k, d string) string { + v := strings.TrimSpace(os.Getenv(k)) + if v == "" { + return d + } + return v +} + +func parsePositiveIntEnv(name string, fallback int) int { + raw := strings.TrimSpace(os.Getenv(name)) + if raw == "" { + return fallback + } + v, err := strconv.Atoi(raw) + if err != nil || v <= 0 { + return fallback + } + return v +} + +func configuredResticPassword() string { + return getenvDefault("SATORU_RESTIC_PASSWORD", defaultResticPassword) +} + +func isUsingDefaultResticPassword() bool { + return strings.TrimSpace(os.Getenv("RESTIC_PASSWORD")) == "" && + strings.TrimSpace(os.Getenv("RESTIC_PASSWORD_FILE")) == "" && + strings.TrimSpace(os.Getenv("SATORU_RESTIC_PASSWORD")) == "" +} diff --git a/cmd/satoru/input.go b/cmd/satoru/input.go index 379b9b3..c04d6e1 100644 --- a/cmd/satoru/input.go +++ b/cmd/satoru/input.go @@ -44,6 +44,24 @@ func parsePathList(raw string) []string { return out } +func parseLineList(raw string) []string { + split := strings.Split(raw, "\n") + seen := map[string]struct{}{} + out := make([]string, 0, len(split)) + for _, item := range split { + item = strings.TrimSpace(item) + if item == "" { + continue + } + if _, ok := seen[item]; ok { + continue + } + seen[item] = struct{}{} + out = append(out, item) + } + return out +} + func buildTargets(directoryPaths, sqlitePaths []string) []store.SiteTarget { seen := map[string]struct{}{} out := make([]store.SiteTarget, 0, len(directoryPaths)+len(sqlitePaths)) diff --git a/cmd/satoru/jobs.go b/cmd/satoru/jobs.go index 4b50bb6..dff148b 100644 --- a/cmd/satoru/jobs.go +++ b/cmd/satoru/jobs.go @@ -17,9 +17,12 @@ import ( ) const ( - jobTypePreflight = "preflight" - jobPollInterval = 2 * time.Second - jobWorkers = 3 + jobTypePreflight = "preflight" + jobTypeBackup = "backup" + jobTypeRetention = "retention" + jobTypeResticSync = "restic_sync" + jobPollInterval = 2 * time.Second + jobWorkers = 3 ) func (a *app) startJobWorkers(ctx context.Context) { @@ -44,7 +47,9 @@ func (a *app) runWorkerLoop(ctx context.Context, workerID int) { case <-ctx.Done(): return case <-ticker.C: + a.jobClaimMu.Lock() job, ok, err := a.store.TryStartNextQueuedJob(ctx) + a.jobClaimMu.Unlock() if err != nil { a.log.Warn("worker failed to start job", zap.Int("worker_id", workerID), zap.Error(err)) continue @@ -58,54 +63,116 @@ func (a *app) runWorkerLoop(ctx context.Context, workerID int) { } func (a *app) executeJob(ctx context.Context, job store.Job) { + jobCtx, cancel := context.WithCancel(ctx) + a.registerJobCancel(job.ID, cancel) + defer func() { + a.clearJobCancel(job.ID) + cancel() + }() + a.log.Info("job start", zap.Int64("job_id", job.ID), zap.Int64("site_id", job.SiteID), zap.String("job_type", job.Type)) - site, err := a.store.SiteByID(ctx, job.SiteID) + site, err := a.store.SiteByID(jobCtx, job.SiteID) if err != nil { - _ = a.store.CompleteJob(ctx, job.ID, "failed", "failed to load site") + _ = a.store.CompleteJob(jobCtx, job.ID, "failed", "failed to load site") a.log.Error("job failed to load site", zap.Int64("job_id", job.ID), zap.Int64("site_id", job.SiteID), zap.Error(err)) return } - _ = a.store.UpdateSiteRunResult(ctx, site.ID, "running", fmt.Sprintf("Job #%d running (%s)", job.ID, job.Type), time.Now()) - _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Job started"}) + _ = a.store.UpdateSiteRunResult(jobCtx, site.ID, "running", fmt.Sprintf("Job #%d running (%s)", job.ID, job.Type), time.Now()) + _ = a.store.AddJobEvent(jobCtx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Job started"}) switch job.Type { case jobTypePreflight: - status, summary := a.runPreflightJob(ctx, job, site) - _ = a.store.CompleteJob(ctx, job.ID, status, summary) - _ = a.store.UpdateSiteRunResult(ctx, site.ID, status, summary, time.Now()) + status, summary := a.runPreflightJob(jobCtx, job, site) + if jobCtx.Err() == context.Canceled { + status = "failed" + summary = "job canceled by user" + } + _ = a.store.CompleteJob(jobCtx, job.ID, status, summary) + _ = a.store.UpdateSiteRunResult(jobCtx, site.ID, status, summary, time.Now()) + a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary)) + case jobTypeBackup: + status, summary := a.runBackupJob(jobCtx, job, site) + if jobCtx.Err() == context.Canceled { + status = "failed" + summary = "job canceled by user" + } + _ = a.store.CompleteJob(jobCtx, job.ID, status, summary) + _ = a.store.UpdateSiteRunResult(jobCtx, site.ID, status, summary, time.Now()) + a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary)) + case jobTypeRetention: + status, summary := a.runRetentionJob(jobCtx, job, site) + if jobCtx.Err() == context.Canceled { + status = "failed" + summary = "job canceled by user" + } + _ = a.store.CompleteJob(jobCtx, job.ID, status, summary) + a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary)) + case jobTypeResticSync: + status, summary := a.runResticSyncJob(jobCtx, job, site) + if jobCtx.Err() == context.Canceled { + status = "failed" + summary = "job canceled by user" + } + _ = a.store.CompleteJob(jobCtx, job.ID, status, summary) a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary)) default: summary := "unknown job type" - _ = a.store.CompleteJob(ctx, job.ID, "failed", summary) - _ = a.store.UpdateSiteRunResult(ctx, site.ID, "failed", summary, time.Now()) + _ = a.store.CompleteJob(jobCtx, job.ID, "failed", summary) + _ = a.store.UpdateSiteRunResult(jobCtx, site.ID, "failed", summary, time.Now()) a.log.Warn("job unknown type", zap.Int64("job_id", job.ID), zap.String("job_type", job.Type)) } } +func (a *app) registerJobCancel(jobID int64, cancel context.CancelFunc) { + a.jobCancelMu.Lock() + defer a.jobCancelMu.Unlock() + a.jobCancels[jobID] = cancel +} + +func (a *app) clearJobCancel(jobID int64) { + a.jobCancelMu.Lock() + defer a.jobCancelMu.Unlock() + delete(a.jobCancels, jobID) +} + +func (a *app) cancelRunningJob(jobID int64) bool { + a.jobCancelMu.Lock() + defer a.jobCancelMu.Unlock() + cancel, ok := a.jobCancels[jobID] + if !ok { + return false + } + cancel() + return true +} + func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Site) (string, string) { a.log.Debug("preflight begin", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.Int("targets", len(site.Targets))) failures := 0 warnings := 0 + failedChecks := make([]string, 0) + warnChecks := make([]string, 0) requiredLocal := []string{"ssh", "rsync", "restic", "gzip"} for _, tool := range requiredLocal { a.log.Debug("preflight local tool check", zap.Int64("job_id", job.ID), zap.String("tool", tool)) if _, err := exec.LookPath(tool); err != nil { failures++ + failedChecks = append(failedChecks, "local tool missing: "+tool) a.log.Debug("preflight local tool missing", zap.Int64("job_id", job.ID), zap.String("tool", tool), zap.Error(err)) _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: fmt.Sprintf("local tool missing: %s", tool)}) } } if failures > 0 { - return "failed", fmt.Sprintf("preflight failed: %d local tool checks failed", failures) + return "failed", "preflight failed: " + strings.Join(failedChecks, "; ") } a.log.Debug("preflight ssh connectivity check", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID)) if err := sshCheck(ctx, site, "echo preflight-ok"); err != nil { _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "ssh connectivity failed: " + err.Error()}) a.log.Debug("preflight ssh connectivity failed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.Error(err)) - return "failed", "preflight failed: ssh connectivity" + return "failed", "preflight failed: ssh connectivity: " + err.Error() } a.log.Debug("preflight ssh connectivity ok", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID)) _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "ssh connectivity ok"}) @@ -115,7 +182,7 @@ func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Sit var err error switch t.Mode { case "directory": - err = sshCheck(ctx, site, fmt.Sprintf("test -d -- %s", shellQuote(t.Path))) + err = sshCheck(ctx, site, fmt.Sprintf("[ -d %s ]", shellQuote(t.Path))) case "sqlite_dump": err = sqlitePreflightCheck(ctx, site, t.Path) default: @@ -125,6 +192,7 @@ func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Sit if err != nil { warnings++ msg := fmt.Sprintf("target %s (%s): %s", t.Path, t.Mode, err.Error()) + warnChecks = append(warnChecks, msg) _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: msg}) a.log.Debug("preflight target check failed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("target_path", t.Path), zap.String("target_mode", t.Mode), zap.Error(err)) } else { @@ -137,9 +205,9 @@ func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Sit case warnings == 0: return "success", fmt.Sprintf("preflight passed (%d targets)", len(site.Targets)) case warnings == len(site.Targets): - return "failed", fmt.Sprintf("preflight failed (%d/%d target checks failed)", warnings, len(site.Targets)) + return "failed", fmt.Sprintf("preflight failed (%d/%d target checks failed): %s", warnings, len(site.Targets), strings.Join(warnChecks, "; ")) default: - return "warning", fmt.Sprintf("preflight warning (%d/%d target checks failed)", warnings, len(site.Targets)) + return "warning", fmt.Sprintf("preflight warning (%d/%d target checks failed): %s", warnings, len(site.Targets), strings.Join(warnChecks, "; ")) } } @@ -164,9 +232,9 @@ func sqlitePreflightCheck(ctx context.Context, site store.Site, dbPath string) e quoted := shellQuote(dbPath) cmd := strings.Join([]string{ "sqlite3 --version >/dev/null", - "test -r -- " + quoted, + "[ -r " + quoted + " ]", "tmp=$(mktemp /tmp/satoru-preflight.XXXXXX)", - "sqlite3 " + quoted + " \".backup $tmp\"", + sqliteBackupCommand(quoted, "$tmp"), "rm -f -- \"$tmp\"", }, " && ") return sshCheck(ctx, site, cmd) @@ -182,6 +250,16 @@ func (a *app) enqueuePreflightJob(ctx context.Context, siteID int64) (store.Job, return job, nil } +func (a *app) enqueueBackupJob(ctx context.Context, siteID int64) (store.Job, error) { + job, err := a.store.CreateJob(ctx, siteID, jobTypeBackup) + if err != nil { + return store.Job{}, err + } + _ = a.store.UpdateSiteRunResult(ctx, siteID, "queued", fmt.Sprintf("Job #%d queued (backup)", job.ID), time.Now()) + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Backup job queued"}) + return job, nil +} + func (a *app) latestJobForSite(ctx context.Context, siteID int64) (store.Job, error) { jobs, err := a.store.ListRecentJobs(ctx, 200) if err != nil { @@ -194,3 +272,112 @@ func (a *app) latestJobForSite(ctx context.Context, siteID int64) (store.Job, er } return store.Job{}, sql.ErrNoRows } + +func (a *app) latestActiveJobForSite(ctx context.Context, siteID int64) (store.Job, error) { + jobs, err := a.store.ListRecentJobs(ctx, 500) + if err != nil { + return store.Job{}, err + } + for _, j := range jobs { + if j.SiteID != siteID { + continue + } + if j.Status == "queued" || j.Status == "running" { + return j, nil + } + } + return store.Job{}, sql.ErrNoRows +} + +func (a *app) cancelLatestActiveJobForSite(ctx context.Context, siteID int64) (bool, error) { + job, err := a.latestActiveJobForSite(ctx, siteID) + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + if err != nil { + return false, err + } + + switch job.Status { + case "queued": + changed, err := a.store.CancelQueuedJob(ctx, job.ID, "job canceled by user") + if err != nil { + return false, err + } + if !changed { + return false, nil + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job canceled by user before start"}) + _ = a.store.UpdateSiteRunResult(ctx, siteID, "failed", "job canceled by user", time.Now()) + return true, nil + case "running": + if !a.cancelRunningJob(job.ID) { + return false, nil + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job cancel requested by user"}) + _ = a.store.UpdateSiteRunResult(ctx, siteID, "running", "cancel requested by user", time.Now()) + return true, nil + default: + return false, nil + } +} + +func (a *app) cancelJobByID(ctx context.Context, jobID int64) (bool, error) { + job, err := a.store.JobByID(ctx, jobID) + if err != nil { + return false, err + } + + switch job.Status { + case "queued": + changed, err := a.store.CancelQueuedJob(ctx, job.ID, "job canceled by user") + if err != nil { + return false, err + } + if !changed { + return false, nil + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job canceled by user before start"}) + _ = a.store.UpdateSiteRunResult(ctx, job.SiteID, "failed", "job canceled by user", time.Now()) + return true, nil + case "running": + if !a.cancelRunningJob(job.ID) { + return false, nil + } + _ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job cancel requested by user"}) + _ = a.store.UpdateSiteRunResult(ctx, job.SiteID, "running", "cancel requested by user", time.Now()) + return true, nil + default: + return false, nil + } +} + +func (a *app) latestJobForSiteType(ctx context.Context, siteID int64, jobType string) (store.Job, error) { + jobs, err := a.store.ListRecentJobs(ctx, 500) + if err != nil { + return store.Job{}, err + } + for _, j := range jobs { + if j.SiteID == siteID && j.Type == jobType { + return j, nil + } + } + return store.Job{}, sql.ErrNoRows +} + +func (a *app) jobTypeDueForSite(ctx context.Context, siteID int64, jobType string, interval time.Duration, now time.Time) (bool, error) { + j, err := a.latestJobForSiteType(ctx, siteID, jobType) + if errors.Is(err, sql.ErrNoRows) { + return true, nil + } + if err != nil { + return false, err + } + if j.Status == "queued" || j.Status == "running" { + return false, nil + } + if !j.FinishedAt.Valid { + return true, nil + } + return j.FinishedAt.Time.Add(interval).Before(now) || j.FinishedAt.Time.Add(interval).Equal(now), nil +} diff --git a/cmd/satoru/main.go b/cmd/satoru/main.go index 97d4d52..6858f70 100644 --- a/cmd/satoru/main.go +++ b/cmd/satoru/main.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/a-h/templ" @@ -28,8 +29,11 @@ const ( ) type app struct { - store *store.Store - log *zap.Logger + store *store.Store + log *zap.Logger + jobClaimMu sync.Mutex + jobCancelMu sync.Mutex + jobCancels map[int64]context.CancelFunc } func main() { @@ -50,12 +54,20 @@ func main() { defer st.Close() st.SetLogger(logger) - a := &app{store: st, log: logger} + a := &app{ + store: st, + log: logger, + jobCancels: map[int64]context.CancelFunc{}, + } + if isUsingDefaultResticPassword() { + logger.Warn("using built-in default restic password; set SATORU_RESTIC_PASSWORD or RESTIC_PASSWORD for production") + } ctx, cancel := context.WithCancel(context.Background()) defer cancel() go a.startSiteScanLoop(ctx) go a.startJobWorkers(ctx) + go a.startMaintenanceScheduler(ctx) r := chi.NewRouter() @@ -63,10 +75,14 @@ func main() { r.Handle("/static/*", http.StripPrefix("/static/", fileServer)) r.Get("/", a.handleHome) + r.Get("/ws/progress", a.handleProgressWebSocket) r.Get("/account/password", a.handlePasswordPage) r.Post("/account/password", a.handlePasswordSubmit) r.Post("/sites", a.handleSiteCreate) r.Post("/sites/{id}/run", a.handleSiteRun) + r.Post("/sites/{id}/cancel", a.handleSiteCancel) + r.Post("/sites/{id}/restart", a.handleSiteRestart) + r.Post("/jobs/{id}/cancel", a.handleJobCancel) r.Post("/sites/{id}/update", a.handleSiteUpdate) r.Post("/sites/{id}/delete", a.handleSiteDelete) r.Get("/signup", a.handleSignupPage) @@ -287,6 +303,7 @@ func (a *app) handleSiteCreate(w http.ResponseWriter, r *http.Request) { } directoryPaths := parsePathList(r.FormValue("directory_paths")) sqlitePaths := parsePathList(r.FormValue("sqlite_paths")) + filters := parseLineList(r.FormValue("filters")) targets := buildTargets(directoryPaths, sqlitePaths) if sshUser == "" || host == "" || len(targets) == 0 { http.Redirect(w, r, "/?msg=site-invalid", http.StatusSeeOther) @@ -297,7 +314,7 @@ func (a *app) handleSiteCreate(w http.ResponseWriter, r *http.Request) { return } - site, err := a.store.CreateSite(r.Context(), sshUser, host, port, targets) + site, err := a.store.CreateSite(r.Context(), sshUser, host, port, targets, filters) if err != nil { http.Error(w, "failed to add site", http.StatusInternalServerError) return @@ -329,13 +346,97 @@ func (a *app) handleSiteRun(w http.ResponseWriter, r *http.Request) { return } - if _, err := a.enqueuePreflightJob(r.Context(), id); err != nil { + if _, err := a.enqueueBackupJob(r.Context(), id); err != nil { http.Error(w, "failed to queue job", http.StatusInternalServerError) return } http.Redirect(w, r, "/?msg=job-queued", http.StatusSeeOther) } +func (a *app) handleSiteCancel(w http.ResponseWriter, r *http.Request) { + if _, err := a.currentUserWithRollingSession(w, r); err != nil { + http.Redirect(w, r, "/signin", http.StatusSeeOther) + return + } + siteID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) + if err != nil { + http.Error(w, "invalid site id", http.StatusBadRequest) + return + } + if _, err := a.store.SiteByID(r.Context(), siteID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + http.NotFound(w, r) + return + } + http.Error(w, "failed to load site", http.StatusInternalServerError) + return + } + + changed, err := a.cancelLatestActiveJobForSite(r.Context(), siteID) + if err != nil { + http.Error(w, "failed to cancel job", http.StatusInternalServerError) + return + } + if !changed { + http.Redirect(w, r, "/?msg=no-active-job", http.StatusSeeOther) + return + } + http.Redirect(w, r, "/?msg=job-cancel-requested", http.StatusSeeOther) +} + +func (a *app) handleSiteRestart(w http.ResponseWriter, r *http.Request) { + if _, err := a.currentUserWithRollingSession(w, r); err != nil { + http.Redirect(w, r, "/signin", http.StatusSeeOther) + return + } + siteID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) + if err != nil { + http.Error(w, "invalid site id", http.StatusBadRequest) + return + } + if _, err := a.store.SiteByID(r.Context(), siteID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + http.NotFound(w, r) + return + } + http.Error(w, "failed to load site", http.StatusInternalServerError) + return + } + if _, err := a.enqueueBackupJob(r.Context(), siteID); err != nil { + http.Error(w, "failed to queue job", http.StatusInternalServerError) + return + } + http.Redirect(w, r, "/?msg=job-restarted", http.StatusSeeOther) +} + +func (a *app) handleJobCancel(w http.ResponseWriter, r *http.Request) { + if _, err := a.currentUserWithRollingSession(w, r); err != nil { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + jobID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64) + if err != nil { + http.Error(w, "invalid job id", http.StatusBadRequest) + return + } + + changed, err := a.cancelJobByID(r.Context(), jobID) + if errors.Is(err, sql.ErrNoRows) { + http.NotFound(w, r) + return + } + if err != nil { + http.Error(w, "failed to cancel job", http.StatusInternalServerError) + return + } + if !changed { + w.WriteHeader(http.StatusConflict) + _, _ = w.Write([]byte("job is not active")) + return + } + w.WriteHeader(http.StatusNoContent) +} + func (a *app) handleSiteUpdate(w http.ResponseWriter, r *http.Request) { if _, err := a.currentUserWithRollingSession(w, r); err != nil { http.Redirect(w, r, "/signin", http.StatusSeeOther) @@ -360,6 +461,7 @@ func (a *app) handleSiteUpdate(w http.ResponseWriter, r *http.Request) { } directoryPaths := parsePathList(r.FormValue("directory_paths")) sqlitePaths := parsePathList(r.FormValue("sqlite_paths")) + filters := parseLineList(r.FormValue("filters")) targets := buildTargets(directoryPaths, sqlitePaths) if sshUser == "" || host == "" || len(targets) == 0 { http.Redirect(w, r, "/?msg=site-invalid", http.StatusSeeOther) @@ -370,7 +472,7 @@ func (a *app) handleSiteUpdate(w http.ResponseWriter, r *http.Request) { return } - site, err := a.store.UpdateSite(r.Context(), id, sshUser, host, port, targets) + site, err := a.store.UpdateSite(r.Context(), id, sshUser, host, port, targets, filters) if err != nil { if errors.Is(err, sql.ErrNoRows) { http.NotFound(w, r) diff --git a/cmd/satoru/maintenance.go b/cmd/satoru/maintenance.go new file mode 100644 index 0000000..87a6fcf --- /dev/null +++ b/cmd/satoru/maintenance.go @@ -0,0 +1,70 @@ +package main + +import ( + "context" + "time" + + "go.uber.org/zap" + + "satoru/internal/store" +) + +const ( + maintenanceTick = time.Minute +) + +func (a *app) startMaintenanceScheduler(ctx context.Context) { + ticker := time.NewTicker(maintenanceTick) + defer ticker.Stop() + + // Run once at boot. + a.enqueueDueMaintenanceJobs(ctx) + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + a.enqueueDueMaintenanceJobs(ctx) + } + } +} + +func (a *app) enqueueDueMaintenanceJobs(ctx context.Context) { + sites, err := a.store.ListSites(ctx) + if err != nil { + a.log.Warn("maintenance list sites failed", zap.Error(err)) + return + } + + retentionInterval := maintenanceIntervalFromEnv("SATORU_RETENTION_INTERVAL_HOURS", 24) + syncInterval := maintenanceIntervalFromEnv("SATORU_RESTIC_SYNC_INTERVAL_HOURS", 24) + now := time.Now() + + for _, site := range sites { + a.enqueueIfDue(ctx, site, jobTypeRetention, retentionInterval, now) + a.enqueueIfDue(ctx, site, jobTypeResticSync, syncInterval, now) + } +} + +func (a *app) enqueueIfDue(ctx context.Context, site store.Site, jobType string, interval time.Duration, now time.Time) { + due, err := a.jobTypeDueForSite(ctx, site.ID, jobType, interval, now) + if err != nil { + a.log.Warn("maintenance due-check failed", zap.Int64("site_id", site.ID), zap.String("job_type", jobType), zap.Error(err)) + return + } + if !due { + return + } + + if _, err := a.store.CreateJob(ctx, site.ID, jobType); err != nil { + a.log.Warn("maintenance enqueue failed", zap.Int64("site_id", site.ID), zap.String("job_type", jobType), zap.Error(err)) + return + } + a.log.Debug("maintenance job queued", zap.Int64("site_id", site.ID), zap.String("job_type", jobType)) +} + +func maintenanceIntervalFromEnv(name string, fallbackHours int) time.Duration { + hours := parsePositiveIntEnv(name, fallbackHours) + return time.Duration(hours) * time.Hour +} diff --git a/cmd/satoru/progress_ws.go b/cmd/satoru/progress_ws.go new file mode 100644 index 0000000..5ddfc13 --- /dev/null +++ b/cmd/satoru/progress_ws.go @@ -0,0 +1,160 @@ +package main + +import ( + "context" + "net/http" + "strings" + "time" + + "github.com/gorilla/websocket" + "go.uber.org/zap" +) + +var progressUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + origin := strings.TrimSpace(r.Header.Get("Origin")) + if origin == "" { + return true + } + return strings.Contains(origin, r.Host) + }, +} + +type liveSiteStatus struct { + SiteID int64 `json:"site_id"` + LastRunStatus string `json:"last_run_status"` + LastRunOutput string `json:"last_run_output"` + LastRunAt time.Time `json:"last_run_at,omitempty"` +} + +type liveJobStatus struct { + JobID int64 `json:"job_id"` + SiteID int64 `json:"site_id"` + Type string `json:"type"` + Status string `json:"status"` + Summary string `json:"summary,omitempty"` + StartedAt time.Time `json:"started_at,omitempty"` + FinishedAt time.Time `json:"finished_at,omitempty"` +} + +type liveJobEvent struct { + JobID int64 `json:"job_id"` + SiteID int64 `json:"site_id"` + JobType string `json:"job_type"` + Level string `json:"level"` + Message string `json:"message"` + OccurredAt time.Time `json:"occurred_at"` +} + +type liveProgressPayload struct { + Timestamp time.Time `json:"timestamp"` + Sites []liveSiteStatus `json:"sites"` + Jobs []liveJobStatus `json:"jobs"` + Events []liveJobEvent `json:"events"` +} + +func (a *app) handleProgressWebSocket(w http.ResponseWriter, r *http.Request) { + if _, err := a.currentUserWithRollingSession(w, r); err != nil { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + + conn, err := progressUpgrader.Upgrade(w, r, nil) + if err != nil { + a.log.Debug("progress ws upgrade failed", zap.Error(err)) + return + } + defer conn.Close() + + send := func() error { + payload, err := a.buildLiveProgressPayload(r.Context()) + if err != nil { + return err + } + _ = conn.SetWriteDeadline(time.Now().Add(8 * time.Second)) + return conn.WriteJSON(payload) + } + + if err := send(); err != nil { + a.log.Debug("progress ws initial send failed", zap.Error(err)) + return + } + + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-r.Context().Done(): + return + case <-ticker.C: + if err := send(); err != nil { + return + } + } + } +} + +func (a *app) buildLiveProgressPayload(ctx context.Context) (liveProgressPayload, error) { + sites, err := a.store.ListSites(ctx) + if err != nil { + return liveProgressPayload{}, err + } + jobs, err := a.store.ListRecentJobs(ctx, 30) + if err != nil { + return liveProgressPayload{}, err + } + events, err := a.store.ListRecentJobEvents(ctx, 60) + if err != nil { + return liveProgressPayload{}, err + } + + payload := liveProgressPayload{ + Timestamp: time.Now().UTC(), + Sites: make([]liveSiteStatus, 0, len(sites)), + Jobs: make([]liveJobStatus, 0, len(jobs)), + Events: make([]liveJobEvent, 0, len(events)), + } + for _, site := range sites { + row := liveSiteStatus{ + SiteID: site.ID, + LastRunStatus: strings.TrimSpace(site.LastRunStatus.String), + LastRunOutput: strings.TrimSpace(site.LastRunOutput.String), + } + if row.LastRunStatus == "" { + row.LastRunStatus = "pending" + } + if site.LastRunAt.Valid { + row.LastRunAt = site.LastRunAt.Time + } + payload.Sites = append(payload.Sites, row) + } + for _, job := range jobs { + row := liveJobStatus{ + JobID: job.ID, + SiteID: job.SiteID, + Type: job.Type, + Status: job.Status, + Summary: strings.TrimSpace(job.Summary.String), + } + if job.StartedAt.Valid { + row.StartedAt = job.StartedAt.Time + } + if job.FinishedAt.Valid { + row.FinishedAt = job.FinishedAt.Time + } + payload.Jobs = append(payload.Jobs, row) + } + for _, ev := range events { + payload.Events = append(payload.Events, liveJobEvent{ + JobID: ev.JobID, + SiteID: ev.SiteID, + JobType: ev.JobType, + Level: ev.Level, + Message: ev.Message, + OccurredAt: ev.OccurredAt, + }) + } + return payload, nil +} diff --git a/cmd/satoru/sqlite_backup.go b/cmd/satoru/sqlite_backup.go new file mode 100644 index 0000000..519f779 --- /dev/null +++ b/cmd/satoru/sqlite_backup.go @@ -0,0 +1,18 @@ +package main + +import "fmt" + +const ( + sqliteBackupRetryCount = 8 + sqliteBackupTimeoutMS = 5000 +) + +func sqliteBackupCommand(quotedDBPath, quotedBackupPath string) string { + return fmt.Sprintf( + "ok=0; for i in $(seq 1 %d); do sqlite3 -cmd \".timeout %d\" %s \".backup %s\" && ok=1 && break; sleep 1; done; [ \"$ok\" -eq 1 ]", + sqliteBackupRetryCount, + sqliteBackupTimeoutMS, + quotedDBPath, + quotedBackupPath, + ) +} diff --git a/go.mod b/go.mod index f42a00a..07b1552 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.25.7 require ( github.com/a-h/templ v0.3.977 github.com/go-chi/chi/v5 v5.2.5 + github.com/gorilla/websocket v1.5.3 go.uber.org/zap v1.27.1 golang.org/x/crypto v0.47.0 modernc.org/sqlite v1.44.3 diff --git a/go.sum b/go.sum index e46a2a4..481b389 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= diff --git a/internal/store/migrations.go b/internal/store/migrations.go new file mode 100644 index 0000000..91b19b7 --- /dev/null +++ b/internal/store/migrations.go @@ -0,0 +1,270 @@ +package store + +import ( + "context" + "database/sql" + "fmt" + "strings" + + "go.uber.org/zap" +) + +type migration struct { + version int + name string + up func(context.Context, *sql.Tx) error +} + +func (s *Store) runMigrations(ctx context.Context) error { + if _, err := s.db.ExecContext(ctx, ` +CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + name TEXT NOT NULL, + applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP +)`); err != nil { + return err + } + + migrations := []migration{ + { + version: 1, + name: "initial_schema", + up: func(ctx context.Context, tx *sql.Tx) error { + stmts := []string{ + `CREATE TABLE IF NOT EXISTS users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + username TEXT NOT NULL UNIQUE, + password_hash TEXT NOT NULL, + is_admin INTEGER NOT NULL DEFAULT 0, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + )`, + `CREATE TABLE IF NOT EXISTS sessions ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + token_hash TEXT NOT NULL UNIQUE, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + expires_at DATETIME NOT NULL + )`, + `CREATE TABLE IF NOT EXISTS sites ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + site_uuid TEXT NOT NULL UNIQUE, + ssh_user TEXT NOT NULL, + host TEXT NOT NULL, + port INTEGER NOT NULL DEFAULT 22, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_run_status TEXT, + last_run_output TEXT, + last_run_at DATETIME, + last_scan_at DATETIME, + last_scan_state TEXT, + last_scan_notes TEXT + )`, + `CREATE TABLE IF NOT EXISTS site_targets ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE, + path TEXT NOT NULL, + mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump')), + last_size_bytes INTEGER, + last_scan_at DATETIME, + last_error TEXT, + UNIQUE(site_id, path, mode) + )`, + `CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE, + type TEXT NOT NULL, + status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'success', 'warning', 'failed')), + summary TEXT, + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + started_at DATETIME, + finished_at DATETIME + )`, + `CREATE TABLE IF NOT EXISTS job_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE, + level TEXT NOT NULL, + message TEXT NOT NULL, + occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP + )`, + } + for _, stmt := range stmts { + if _, err := tx.ExecContext(ctx, stmt); err != nil { + return err + } + } + return nil + }, + }, + { + version: 2, + name: "ensure_sites_site_uuid", + up: func(ctx context.Context, tx *sql.Tx) error { + exists, err := tableExists(ctx, tx, "sites") + if err != nil { + return err + } + if !exists { + return nil + } + + hasUUID, err := columnExists(ctx, tx, "sites", "site_uuid") + if err != nil { + return err + } + if !hasUUID { + if _, err := tx.ExecContext(ctx, `ALTER TABLE sites ADD COLUMN site_uuid TEXT`); err != nil { + return err + } + } + + rows, err := tx.QueryContext(ctx, `SELECT id, site_uuid FROM sites`) + if err != nil { + return err + } + defer rows.Close() + + var idsNeedingUUID []int64 + for rows.Next() { + var id int64 + var uuid sql.NullString + if err := rows.Scan(&id, &uuid); err != nil { + return err + } + if !uuid.Valid || strings.TrimSpace(uuid.String) == "" { + idsNeedingUUID = append(idsNeedingUUID, id) + } + } + if err := rows.Err(); err != nil { + return err + } + + for _, id := range idsNeedingUUID { + uuid, err := newSiteUUID() + if err != nil { + return err + } + if _, err := tx.ExecContext(ctx, `UPDATE sites SET site_uuid = ? WHERE id = ?`, uuid, id); err != nil { + return err + } + } + + if _, err := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_sites_site_uuid ON sites(site_uuid)`); err != nil { + return err + } + return nil + }, + }, + { + version: 3, + name: "add_runtime_query_indexes", + up: func(ctx context.Context, tx *sql.Tx) error { + stmts := []string{ + `CREATE INDEX IF NOT EXISTS idx_jobs_status_id ON jobs(status, id)`, + `CREATE INDEX IF NOT EXISTS idx_jobs_site_type_id ON jobs(site_id, type, id DESC)`, + `CREATE INDEX IF NOT EXISTS idx_job_events_job_id_id ON job_events(job_id, id DESC)`, + `CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions(expires_at)`, + } + for _, stmt := range stmts { + if _, err := tx.ExecContext(ctx, stmt); err != nil { + return err + } + } + return nil + }, + }, + { + version: 4, + name: "site_filters", + up: func(ctx context.Context, tx *sql.Tx) error { + stmts := []string{ + `CREATE TABLE IF NOT EXISTS site_filters ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE, + pattern TEXT NOT NULL, + UNIQUE(site_id, pattern) + )`, + `CREATE INDEX IF NOT EXISTS idx_site_filters_site_id ON site_filters(site_id, id)`, + } + for _, stmt := range stmts { + if _, err := tx.ExecContext(ctx, stmt); err != nil { + return err + } + } + return nil + }, + }, + } + + for _, m := range migrations { + applied, err := migrationApplied(ctx, s.db, m.version) + if err != nil { + return err + } + if applied { + continue + } + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + + if err := m.up(ctx, tx); err != nil { + _ = tx.Rollback() + return fmt.Errorf("migration %d (%s) failed: %w", m.version, m.name, err) + } + if _, err := tx.ExecContext(ctx, `INSERT INTO schema_migrations (version, name) VALUES (?, ?)`, m.version, m.name); err != nil { + _ = tx.Rollback() + return fmt.Errorf("migration %d (%s) record failed: %w", m.version, m.name, err) + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("migration %d (%s) commit failed: %w", m.version, m.name, err) + } + s.debugDB("migration applied", zap.Int("version", m.version), zap.String("name", m.name)) + } + + return nil +} + +func migrationApplied(ctx context.Context, db *sql.DB, version int) (bool, error) { + var count int + if err := db.QueryRowContext(ctx, `SELECT COUNT(1) FROM schema_migrations WHERE version = ?`, version).Scan(&count); err != nil { + return false, err + } + return count > 0, nil +} + +func tableExists(ctx context.Context, tx *sql.Tx, table string) (bool, error) { + var count int + if err := tx.QueryRowContext(ctx, `SELECT COUNT(1) FROM sqlite_master WHERE type='table' AND name = ?`, table).Scan(&count); err != nil { + return false, err + } + return count > 0, nil +} + +func columnExists(ctx context.Context, tx *sql.Tx, table, col string) (bool, error) { + rows, err := tx.QueryContext(ctx, "PRAGMA table_info("+table+")") + if err != nil { + return false, err + } + defer rows.Close() + + for rows.Next() { + var cid int + var name string + var ctype string + var notNull int + var dflt sql.NullString + var pk int + if err := rows.Scan(&cid, &name, &ctype, ¬Null, &dflt, &pk); err != nil { + return false, err + } + if strings.EqualFold(name, col) { + return true, nil + } + } + if err := rows.Err(); err != nil { + return false, err + } + return false, nil +} diff --git a/internal/store/store.go b/internal/store/store.go index e7c3f00..2820aa9 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -2,7 +2,9 @@ package store import ( "context" + "crypto/rand" "database/sql" + "encoding/hex" "errors" "fmt" "strings" @@ -28,6 +30,7 @@ type User struct { type Site struct { ID int64 + SiteUUID string SSHUser string Host string Port int @@ -39,6 +42,7 @@ type Site struct { LastScanState sql.NullString LastScanNotes sql.NullString Targets []SiteTarget + Filters []string } type SiteTarget struct { @@ -67,12 +71,23 @@ type JobEvent struct { OccurredAt time.Time } +type JobEventRecord struct { + JobID int64 + SiteID int64 + JobType string + Level string + Message string + OccurredAt time.Time +} + func Open(path string) (*Store, error) { - dsn := fmt.Sprintf("file:%s?_pragma=foreign_keys(1)", path) + dsn := fmt.Sprintf("file:%s?_pragma=foreign_keys(1)&_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)", path) db, err := sql.Open("sqlite", dsn) if err != nil { return nil, err } + db.SetMaxOpenConns(8) + db.SetMaxIdleConns(8) if err := db.Ping(); err != nil { return nil, err @@ -94,92 +109,7 @@ func (s *Store) SetLogger(logger *zap.Logger) { } func (s *Store) migrate(ctx context.Context) error { - const usersSQL = ` -CREATE TABLE IF NOT EXISTS users ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - username TEXT NOT NULL UNIQUE, - password_hash TEXT NOT NULL, - is_admin INTEGER NOT NULL DEFAULT 0, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP -);` - - const sessionsSQL = ` -CREATE TABLE IF NOT EXISTS sessions ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - token_hash TEXT NOT NULL UNIQUE, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - expires_at DATETIME NOT NULL -);` - - const sitesSQL = ` -CREATE TABLE IF NOT EXISTS sites ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - ssh_user TEXT NOT NULL, - host TEXT NOT NULL, - port INTEGER NOT NULL DEFAULT 22, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - last_run_status TEXT, - last_run_output TEXT, - last_run_at DATETIME, - last_scan_at DATETIME, - last_scan_state TEXT, - last_scan_notes TEXT -);` - - const siteTargetsSQL = ` -CREATE TABLE IF NOT EXISTS site_targets ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE, - path TEXT NOT NULL, - mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump')), - last_size_bytes INTEGER, - last_scan_at DATETIME, - last_error TEXT, - UNIQUE(site_id, path, mode) -);` - - const jobsSQL = ` -CREATE TABLE IF NOT EXISTS jobs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE, - type TEXT NOT NULL, - status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'success', 'warning', 'failed')), - summary TEXT, - created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, - started_at DATETIME, - finished_at DATETIME -);` - - const jobEventsSQL = ` -CREATE TABLE IF NOT EXISTS job_events ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE, - level TEXT NOT NULL, - message TEXT NOT NULL, - occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP -);` - - if _, err := s.db.ExecContext(ctx, usersSQL); err != nil { - return err - } - if _, err := s.db.ExecContext(ctx, sessionsSQL); err != nil { - return err - } - if _, err := s.db.ExecContext(ctx, sitesSQL); err != nil { - return err - } - if _, err := s.db.ExecContext(ctx, siteTargetsSQL); err != nil { - return err - } - if _, err := s.db.ExecContext(ctx, jobsSQL); err != nil { - return err - } - if _, err := s.db.ExecContext(ctx, jobEventsSQL); err != nil { - return err - } - s.debugDB("schema migrated") - return nil + return s.runMigrations(ctx) } func (s *Store) CreateUser(ctx context.Context, username, passwordHash string) (User, error) { @@ -294,14 +224,18 @@ func (s *Store) UpdateUserPasswordHash(ctx context.Context, userID int64, passwo return err } -func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int, targets []SiteTarget) (Site, error) { +func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int, targets []SiteTarget, filters []string) (Site, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return Site{}, err } defer tx.Rollback() - res, err := tx.ExecContext(ctx, `INSERT INTO sites (ssh_user, host, port) VALUES (?, ?, ?)`, sshUser, host, port) + siteUUID, err := newSiteUUID() + if err != nil { + return Site{}, err + } + res, err := tx.ExecContext(ctx, `INSERT INTO sites (site_uuid, ssh_user, host, port) VALUES (?, ?, ?, ?)`, siteUUID, sshUser, host, port) if err != nil { return Site{}, err } @@ -321,15 +255,20 @@ func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int, return Site{}, err } } + for _, f := range filters { + if _, err := tx.ExecContext(ctx, `INSERT INTO site_filters (site_id, pattern) VALUES (?, ?)`, id, f); err != nil { + return Site{}, err + } + } if err := tx.Commit(); err != nil { return Site{}, err } - s.debugDB("site created", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets))) + s.debugDB("site created", zap.Int64("site_id", id), zap.String("site_uuid", siteUUID), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)), zap.Int("filters", len(filters))) return s.SiteByID(ctx, id) } -func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string, port int, targets []SiteTarget) (Site, error) { +func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string, port int, targets []SiteTarget, filters []string) (Site, error) { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return Site{}, err @@ -362,11 +301,19 @@ func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string, return Site{}, err } } + if _, err := tx.ExecContext(ctx, `DELETE FROM site_filters WHERE site_id = ?`, id); err != nil { + return Site{}, err + } + for _, f := range filters { + if _, err := tx.ExecContext(ctx, `INSERT INTO site_filters (site_id, pattern) VALUES (?, ?)`, id, f); err != nil { + return Site{}, err + } + } if err := tx.Commit(); err != nil { return Site{}, err } - s.debugDB("site updated", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets))) + s.debugDB("site updated", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)), zap.Int("filters", len(filters))) return s.SiteByID(ctx, id) } @@ -388,7 +335,7 @@ func (s *Store) DeleteSite(ctx context.Context, id int64) error { func (s *Store) ListSites(ctx context.Context) ([]Site, error) { const q = ` -SELECT id, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes +SELECT id, site_uuid, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes FROM sites ORDER BY id DESC` rows, err := s.db.QueryContext(ctx, q) @@ -411,12 +358,15 @@ ORDER BY id DESC` if err := s.populateTargets(ctx, out); err != nil { return nil, err } + if err := s.populateFilters(ctx, out); err != nil { + return nil, err + } return out, nil } func (s *Store) SiteByID(ctx context.Context, id int64) (Site, error) { const q = ` -SELECT id, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes +SELECT id, site_uuid, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes FROM sites WHERE id = ?` site, err := scanSite(s.db.QueryRowContext(ctx, q, id)) @@ -428,6 +378,11 @@ WHERE id = ?` return Site{}, err } site.Targets = targets + filters, err := s.filtersBySiteID(ctx, id) + if err != nil { + return Site{}, err + } + site.Filters = filters return site, nil } @@ -529,6 +484,30 @@ func (s *Store) CompleteJob(ctx context.Context, jobID int64, status, summary st return err } +func (s *Store) CancelQueuedJob(ctx context.Context, jobID int64, summary string) (bool, error) { + res, err := s.db.ExecContext( + ctx, + `UPDATE jobs + SET status = 'failed', summary = ?, finished_at = ? + WHERE id = ? AND status = 'queued'`, + summary, + time.Now().UTC().Format(time.RFC3339), + jobID, + ) + if err != nil { + return false, err + } + rows, err := res.RowsAffected() + if err != nil { + return false, err + } + if rows > 0 { + s.debugDB("job canceled from queue", zap.Int64("job_id", jobID), zap.String("summary", summary)) + return true, nil + } + return false, nil +} + func (s *Store) AddJobEvent(ctx context.Context, event JobEvent) error { _, err := s.db.ExecContext( ctx, @@ -571,6 +550,35 @@ LIMIT ?`, limit) return out, nil } +func (s *Store) ListRecentJobEvents(ctx context.Context, limit int) ([]JobEventRecord, error) { + if limit <= 0 { + limit = 50 + } + rows, err := s.db.QueryContext(ctx, ` +SELECT je.job_id, j.site_id, j.type, je.level, je.message, je.occurred_at +FROM job_events je +JOIN jobs j ON j.id = je.job_id +ORDER BY je.id DESC +LIMIT ?`, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []JobEventRecord + for rows.Next() { + var ev JobEventRecord + if err := rows.Scan(&ev.JobID, &ev.SiteID, &ev.JobType, &ev.Level, &ev.Message, &ev.OccurredAt); err != nil { + return nil, err + } + out = append(out, ev) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + func (s *Store) UpdateSiteScanResult(ctx context.Context, siteID int64, state, notes string, scannedAt time.Time, targets []SiteTarget) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { @@ -651,6 +659,7 @@ func scanSite(row scanner) (Site, error) { var site Site if err := row.Scan( &site.ID, + &site.SiteUUID, &site.SSHUser, &site.Host, &site.Port, @@ -681,6 +690,20 @@ func (s *Store) populateTargets(ctx context.Context, sites []Site) error { return nil } +func (s *Store) populateFilters(ctx context.Context, sites []Site) error { + if len(sites) == 0 { + return nil + } + filtersBySite, err := s.allFiltersBySiteID(ctx) + if err != nil { + return err + } + for i := range sites { + sites[i].Filters = filtersBySite[sites[i].ID] + } + return nil +} + func (s *Store) allTargetsBySiteID(ctx context.Context) (map[int64][]SiteTarget, error) { const q = `SELECT site_id, path, mode, last_size_bytes, last_scan_at, last_error FROM site_targets ORDER BY id ASC` rows, err := s.db.QueryContext(ctx, q) @@ -726,6 +749,49 @@ func (s *Store) targetsBySiteID(ctx context.Context, siteID int64) ([]SiteTarget return out, nil } +func (s *Store) allFiltersBySiteID(ctx context.Context) (map[int64][]string, error) { + rows, err := s.db.QueryContext(ctx, `SELECT site_id, pattern FROM site_filters ORDER BY id ASC`) + if err != nil { + return nil, err + } + defer rows.Close() + + out := map[int64][]string{} + for rows.Next() { + var siteID int64 + var pattern string + if err := rows.Scan(&siteID, &pattern); err != nil { + return nil, err + } + out[siteID] = append(out[siteID], pattern) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + +func (s *Store) filtersBySiteID(ctx context.Context, siteID int64) ([]string, error) { + rows, err := s.db.QueryContext(ctx, `SELECT pattern FROM site_filters WHERE site_id = ? ORDER BY id ASC`, siteID) + if err != nil { + return nil, err + } + defer rows.Close() + + var out []string + for rows.Next() { + var pattern string + if err := rows.Scan(&pattern); err != nil { + return nil, err + } + out = append(out, pattern) + } + if err := rows.Err(); err != nil { + return nil, err + } + return out, nil +} + func boolToInt(v bool) int { if v { return 1 @@ -766,3 +832,11 @@ func (s *Store) debugDB(msg string, fields ...zap.Field) { } s.log.Debug(msg, fields...) } + +func newSiteUUID() (string, error) { + buf := make([]byte, 16) + if _, err := rand.Read(buf); err != nil { + return "", err + } + return hex.EncodeToString(buf), nil +} diff --git a/internal/webui/dashboard.go b/internal/webui/dashboard.go index 2720a87..2dba51f 100644 --- a/internal/webui/dashboard.go +++ b/internal/webui/dashboard.go @@ -110,12 +110,18 @@ func Dashboard(data DashboardData) templ.Component { } sites.WriteString(fmt.Sprintf(` -
+

%s@%s:%d

- + +
+
+ +
+
+
@@ -130,29 +136,40 @@ func Dashboard(data DashboardData) templ.Component { +

Backup targets:

    %s
+

Filters: %s

Scan: %s · Last: %s · Next: %s

%s

Last run: %s

-

Status: %s

-
%s
+

Status: %s

+

Current step: idle

+
+ +
+
%s
`, + site.ID, html.EscapeString(site.SSHUser), html.EscapeString(site.Host), site.Port, site.ID, site.ID, site.ID, + site.ID, + site.ID, html.EscapeString(site.SSHUser), html.EscapeString(site.Host), site.Port, html.EscapeString(joinTargetPaths(site.Targets, "directory")), html.EscapeString(joinTargetPaths(site.Targets, "sqlite_dump")), + html.EscapeString(strings.Join(site.Filters, "\n")), targets.String(), + html.EscapeString(formatFilters(site.Filters)), html.EscapeString(scanState), html.EscapeString(scanState), html.EscapeString(lastScan), @@ -160,7 +177,13 @@ func Dashboard(data DashboardData) templ.Component { html.EscapeString(scanNotes), html.EscapeString(last), html.EscapeString(runStatus), + site.ID, html.EscapeString(runStatus), + site.ID, + site.ID, + site.ID, + site.ID, + site.ID, html.EscapeString(runOutput), )) } @@ -208,14 +231,196 @@ func Dashboard(data DashboardData) templ.Component { + +
+

Live Backup Progress

+

Connecting...

+
    +
  • Waiting for job updates.
  • +
+
+ +
+
No live events yet.
+

Managed Sites

%s
+ `, html.EscapeString(data.User.Username), @@ -234,7 +439,13 @@ func formatFlash(code string) string { case "site-added": return "Site added." case "job-queued": - return "Preflight job queued." + return "Backup job queued." + case "job-cancel-requested": + return "Cancel requested for active job." + case "job-restarted": + return "Backup restart queued." + case "no-active-job": + return "No active job to cancel." case "site-updated": return "Site updated." case "site-deleted": @@ -269,6 +480,13 @@ func joinTargetPaths(targets []store.SiteTarget, mode string) string { return strings.Join(out, "\n") } +func formatFilters(filters []string) string { + if len(filters) == 0 { + return "(none)" + } + return strings.Join(filters, ", ") +} + func targetModeClass(mode string) string { if mode == "sqlite_dump" { return "sqlite" diff --git a/web/static/app.css b/web/static/app.css index 6a241c3..5d746c0 100644 --- a/web/static/app.css +++ b/web/static/app.css @@ -92,6 +92,11 @@ h1 { border-color: var(--border); } +.button.button-sm { + padding: 0.35rem 0.55rem; + font-size: 0.82rem; +} + input { width: 100%; border: 1px solid var(--border); @@ -226,6 +231,26 @@ h2 { color: #fecaca; } +.pill.warning { + border-color: #f59e0b; + color: #fde68a; +} + +.pill.success { + border-color: #22c55e; + color: #86efac; +} + +.pill.running { + border-color: #38bdf8; + color: #bae6fd; +} + +.pill.queued { + border-color: #a78bfa; + color: #ddd6fe; +} + .pill.partial { border-color: #f59e0b; color: #fde68a; @@ -277,6 +302,27 @@ textarea { margin-left: 0.5rem; } +.throbber { + display: inline-block; + width: 0.9rem; + height: 0.9rem; + margin-left: 0.45rem; + border: 2px solid color-mix(in srgb, #38bdf8 35%, transparent); + border-top-color: #38bdf8; + border-radius: 50%; + animation: spin 0.8s linear infinite; + vertical-align: -1px; +} + +.hidden { + display: none; +} + +@keyframes spin { + from { transform: rotate(0deg); } + to { transform: rotate(360deg); } +} + a { color: #7dd3fc; }