Compare commits
2 Commits
d85ee1e184
...
2ce2d5d881
| Author | SHA1 | Date |
|---|---|---|
|
|
2ce2d5d881 | |
|
|
e5225a1353 |
|
|
@ -33,3 +33,4 @@ data/
|
|||
.env.*
|
||||
!.env.example
|
||||
backups/
|
||||
repos/
|
||||
|
|
|
|||
|
|
@ -49,3 +49,8 @@ Be proactive with debug-level logging for:
|
|||
|
||||
Logs should include useful identifiers where possible:
|
||||
- `job_id`, `site_id`, `job_type`, `target_path`, `target_mode`, `status`, `error`.
|
||||
|
||||
## Data Safety
|
||||
Do not delete `data/satoru.db` during normal development, smoke checks, or troubleshooting.
|
||||
|
||||
Use forward migrations to evolve schema/data for soft-launched deployments.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,527 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"satoru/internal/store"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultStagingRoot = "./backups"
|
||||
defaultResticRepo = "./repos/restic"
|
||||
)
|
||||
|
||||
func (a *app) runBackupJob(ctx context.Context, job store.Job, site store.Site) (string, string) {
|
||||
a.log.Debug("backup job begin", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("site_uuid", site.SiteUUID), zap.Int("targets", len(site.Targets)))
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Backup job started"})
|
||||
|
||||
preflightStatus, preflightSummary := a.runPreflightJob(ctx, job, site)
|
||||
if preflightStatus == "failed" {
|
||||
msg := "backup aborted by preflight: " + preflightSummary
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: msg})
|
||||
return "failed", msg
|
||||
}
|
||||
if preflightStatus == "warning" {
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "preflight warning; continuing backup"})
|
||||
}
|
||||
|
||||
stagingRoot := getenvDefault("SATORU_STAGING_ROOT", defaultStagingRoot)
|
||||
resticRepo := getenvDefault("SATORU_RESTIC_REPO", defaultResticRepo)
|
||||
if err := os.MkdirAll(stagingRoot, 0o700); err != nil {
|
||||
return "failed", "failed to create staging root"
|
||||
}
|
||||
if err := os.MkdirAll(resticRepo, 0o700); err != nil {
|
||||
return "failed", "failed to create restic repo directory"
|
||||
}
|
||||
|
||||
successes := 0
|
||||
failures := 0
|
||||
stagedPaths := map[string]struct{}{}
|
||||
|
||||
for _, target := range site.Targets {
|
||||
stageDir := targetStageDir(stagingRoot, site.SiteUUID, target)
|
||||
if err := os.MkdirAll(stageDir, 0o700); err != nil {
|
||||
failures++
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: fmt.Sprintf("stage dir failed for %s: %v", target.Path, err)})
|
||||
continue
|
||||
}
|
||||
|
||||
var err error
|
||||
switch target.Mode {
|
||||
case "directory":
|
||||
err = a.pullDirectoryTarget(ctx, job.ID, site, target, stageDir)
|
||||
case "sqlite_dump":
|
||||
err = pullSQLiteTarget(ctx, job.ID, site, target, stageDir)
|
||||
case "mysql_dump":
|
||||
err = pullMySQLTarget(ctx, site, target, stageDir)
|
||||
default:
|
||||
err = fmt.Errorf("unknown target mode: %s", target.Mode)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
failures++
|
||||
a.log.Debug("backup target failed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("target_path", target.Path), zap.String("target_mode", target.Mode), zap.Error(err))
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: fmt.Sprintf("target failed: %s (%s): %v", target.Path, target.Mode, err)})
|
||||
continue
|
||||
}
|
||||
|
||||
successes++
|
||||
stagedPaths[stageDir] = struct{}{}
|
||||
a.log.Debug("backup target success", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("target_path", target.Path), zap.String("target_mode", target.Mode), zap.String("stage_dir", stageDir))
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: fmt.Sprintf("target synced: %s (%s)", target.Path, target.Mode)})
|
||||
}
|
||||
|
||||
if successes == 0 {
|
||||
return "failed", fmt.Sprintf("backup failed: %d/%d targets failed", failures, len(site.Targets))
|
||||
}
|
||||
|
||||
paths := make([]string, 0, len(stagedPaths))
|
||||
for p := range stagedPaths {
|
||||
paths = append(paths, p)
|
||||
}
|
||||
if err := runResticBackup(ctx, resticRepo, site, job.ID, paths); err != nil {
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic backup failed: " + err.Error()})
|
||||
return "failed", "backup failed: restic backup error"
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "restic backup completed"})
|
||||
|
||||
if failures > 0 || preflightStatus == "warning" {
|
||||
return "warning", fmt.Sprintf("backup warning: %d/%d targets synced", successes, len(site.Targets))
|
||||
}
|
||||
return "success", fmt.Sprintf("backup complete: %d/%d targets synced", successes, len(site.Targets))
|
||||
}
|
||||
|
||||
func (a *app) pullDirectoryTarget(ctx context.Context, jobID int64, site store.Site, target store.SiteTarget, stageDir string) error {
|
||||
sshCmd := fmt.Sprintf("ssh -p %d", site.Port)
|
||||
remote, err := syncRemotePath(ctx, site, target.Path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
args := []string{"-a", "--delete", "--info=progress2", "-e", sshCmd}
|
||||
for _, filter := range site.Filters {
|
||||
args = append(args, "--exclude", filter)
|
||||
}
|
||||
args = append(args, remote, stageDir+"/")
|
||||
cmd := exec.CommandContext(cmdCtx, "rsync", args...)
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
_, _ = io.Copy(io.Discard, stdout)
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(stderr)
|
||||
scanner.Split(scanCRLF)
|
||||
|
||||
lastEmit := time.Time{}
|
||||
tail := make([]string, 0, 20)
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
tail = append(tail, line)
|
||||
if len(tail) > 20 {
|
||||
tail = tail[len(tail)-20:]
|
||||
}
|
||||
|
||||
progress := parseRsyncProgress(line)
|
||||
if progress == "" {
|
||||
continue
|
||||
}
|
||||
if time.Since(lastEmit) < 2*time.Second {
|
||||
continue
|
||||
}
|
||||
lastEmit = time.Now()
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{
|
||||
JobID: jobID,
|
||||
Level: "info",
|
||||
Message: fmt.Sprintf("rsync %s: %s", target.Path, progress),
|
||||
})
|
||||
}
|
||||
if err := scanner.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cmd.Wait(); err != nil {
|
||||
msg := strings.TrimSpace(strings.Join(tail, "\n"))
|
||||
if msg == "" {
|
||||
msg = err.Error()
|
||||
}
|
||||
return errors.New(msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func syncRemotePath(ctx context.Context, site store.Site, path string) (string, error) {
|
||||
pathType, err := remotePathType(ctx, site, path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
switch pathType {
|
||||
case "dir":
|
||||
return fmt.Sprintf("%s@%s:%s/", site.SSHUser, site.Host, path), nil
|
||||
case "file":
|
||||
return fmt.Sprintf("%s@%s:%s", site.SSHUser, site.Host, path), nil
|
||||
default:
|
||||
return "", errors.New("target path is neither a directory nor a file")
|
||||
}
|
||||
}
|
||||
|
||||
func remotePathType(ctx context.Context, site store.Site, path string) (string, error) {
|
||||
target := fmt.Sprintf("%s@%s", site.SSHUser, site.Host)
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
quoted := shellQuote(path)
|
||||
remoteCmd := fmt.Sprintf("if [ -d %s ]; then echo dir; elif [ -f %s ]; then echo file; else echo missing; fi", quoted, quoted)
|
||||
cmd := exec.CommandContext(cmdCtx, "ssh", "-p", strconv.Itoa(site.Port), target, remoteCmd)
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
msg := strings.TrimSpace(string(out))
|
||||
if msg == "" {
|
||||
msg = err.Error()
|
||||
}
|
||||
return "", errors.New(msg)
|
||||
}
|
||||
v := strings.TrimSpace(string(out))
|
||||
if v == "" || v == "missing" {
|
||||
return "", errors.New("target path not found")
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func pullSQLiteTarget(ctx context.Context, jobID int64, site store.Site, target store.SiteTarget, stageDir string) error {
|
||||
tmpBase := fmt.Sprintf("/tmp/satoru-backup-%d-%s.sqlite3", jobID, shortHash(target.Path))
|
||||
quotedDB := shellQuote(target.Path)
|
||||
quotedTmp := shellQuote(tmpBase)
|
||||
remoteCmd := strings.Join([]string{
|
||||
sqliteBackupCommand(quotedDB, quotedTmp),
|
||||
fmt.Sprintf("gzip -f -- %s", quotedTmp),
|
||||
}, " && ")
|
||||
if err := sshCheck(ctx, site, remoteCmd); err != nil {
|
||||
_ = sshCheck(ctx, site, fmt.Sprintf("rm -f -- %s %s", shellQuote(tmpBase), shellQuote(tmpBase+".gz")))
|
||||
return err
|
||||
}
|
||||
|
||||
sshCmd := fmt.Sprintf("ssh -p %d", site.Port)
|
||||
remoteGz := fmt.Sprintf("%s@%s:%s", site.SSHUser, site.Host, tmpBase+".gz")
|
||||
localFile := filepath.Join(stageDir, "sqlite-backup.sql.gz")
|
||||
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(cmdCtx, "rsync", "-a", "-e", sshCmd, remoteGz, localFile)
|
||||
out, err := cmd.CombinedOutput()
|
||||
_ = sshCheck(ctx, site, fmt.Sprintf("rm -f -- %s %s", shellQuote(tmpBase), shellQuote(tmpBase+".gz")))
|
||||
if err != nil {
|
||||
return errors.New(strings.TrimSpace(string(out)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func pullMySQLTarget(ctx context.Context, site store.Site, target store.SiteTarget, stageDir string) error {
|
||||
if !target.MySQLHost.Valid || !target.MySQLUser.Valid || !target.MySQLDB.Valid || !target.MySQLPassword.Valid {
|
||||
return errors.New("mysql target missing db host/db user/db name/db password")
|
||||
}
|
||||
|
||||
tmpBase := fmt.Sprintf("/tmp/satoru-mysql-%s.sql", shortHash(target.Path+target.MySQLHost.String+target.MySQLDB.String))
|
||||
remoteCmd := strings.Join([]string{
|
||||
mysqlDumpCommand(target, false, tmpBase),
|
||||
fmt.Sprintf("gzip -f -- %s", shellQuote(tmpBase)),
|
||||
}, " && ")
|
||||
if err := sshCheck(ctx, site, remoteCmd); err != nil {
|
||||
_ = sshCheck(ctx, site, fmt.Sprintf("rm -f -- %s %s", shellQuote(tmpBase), shellQuote(tmpBase+".gz")))
|
||||
return err
|
||||
}
|
||||
|
||||
sshCmd := fmt.Sprintf("ssh -p %d", site.Port)
|
||||
remoteGz := fmt.Sprintf("%s@%s:%s", site.SSHUser, site.Host, tmpBase+".gz")
|
||||
localFile := filepath.Join(stageDir, "mysql-dump.sql.gz")
|
||||
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 10*time.Minute)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(cmdCtx, "rsync", "-a", "-e", sshCmd, remoteGz, localFile)
|
||||
out, err := cmd.CombinedOutput()
|
||||
_ = sshCheck(ctx, site, fmt.Sprintf("rm -f -- %s %s", shellQuote(tmpBase), shellQuote(tmpBase+".gz")))
|
||||
if err != nil {
|
||||
return errors.New(strings.TrimSpace(string(out)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func runResticBackup(ctx context.Context, repoPath string, site store.Site, jobID int64, stagedPaths []string) error {
|
||||
if err := ensureResticRepo(ctx, repoPath); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
args := []string{"-r", repoPath, "backup"}
|
||||
args = append(args, stagedPaths...)
|
||||
args = append(args, "--tag", "site_uuid:"+site.SiteUUID, "--tag", "site_id:"+strconv.FormatInt(site.ID, 10), "--tag", "job_id:"+strconv.FormatInt(jobID, 10))
|
||||
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Minute)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(cmdCtx, "restic", args...)
|
||||
cmd.Env = resticEnv()
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return errors.New(strings.TrimSpace(string(out)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *app) runRetentionJob(ctx context.Context, job store.Job, site store.Site) (string, string) {
|
||||
repoPath := getenvDefault("SATORU_RESTIC_REPO", defaultResticRepo)
|
||||
if err := ensureResticRepo(ctx, repoPath); err != nil {
|
||||
return "failed", "retention failed: restic repo unavailable"
|
||||
}
|
||||
|
||||
retentionArgs := strings.Fields(getenvDefault("SATORU_RESTIC_RETENTION_ARGS", "--keep-daily 7 --keep-weekly 4 --keep-monthly 6"))
|
||||
args := []string{"-r", repoPath, "forget", "--prune", "--tag", "site_uuid:" + site.SiteUUID}
|
||||
args = append(args, retentionArgs...)
|
||||
a.log.Debug("retention command", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.Strings("args", args))
|
||||
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Minute)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(cmdCtx, "restic", args...)
|
||||
cmd.Env = resticEnv()
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
msg := strings.TrimSpace(string(out))
|
||||
if msg == "" {
|
||||
msg = err.Error()
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "retention failed: " + msg})
|
||||
return "failed", "retention failed"
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "retention completed"})
|
||||
return "success", "retention completed"
|
||||
}
|
||||
|
||||
func (a *app) runResticSyncJob(ctx context.Context, job store.Job, site store.Site) (string, string) {
|
||||
b2Repo := strings.TrimSpace(os.Getenv("SATORU_RESTIC_B2_REPOSITORY"))
|
||||
if b2Repo == "" {
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "restic sync skipped: SATORU_RESTIC_B2_REPOSITORY not set"})
|
||||
return "warning", "restic sync skipped: B2 repository not configured"
|
||||
}
|
||||
|
||||
repoPath := getenvDefault("SATORU_RESTIC_REPO", defaultResticRepo)
|
||||
if err := ensureResticRepo(ctx, repoPath); err != nil {
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic sync failed: local repo unavailable"})
|
||||
return "failed", "restic sync failed: local repo unavailable"
|
||||
}
|
||||
|
||||
snapshotID, err := latestSnapshotIDForSite(ctx, repoPath, site.SiteUUID)
|
||||
if err != nil {
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic sync failed: " + err.Error()})
|
||||
return "failed", "restic sync failed: local snapshot lookup error"
|
||||
}
|
||||
if snapshotID == "" {
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "restic sync skipped: no local snapshots for site"})
|
||||
return "warning", "restic sync skipped: no local snapshots for site"
|
||||
}
|
||||
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "restic sync started"})
|
||||
args := []string{"-r", repoPath, "copy", snapshotID, "--repo2", b2Repo}
|
||||
a.log.Debug("restic sync copy", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("local_repo", repoPath), zap.String("b2_repo", b2Repo), zap.String("snapshot_id", snapshotID), zap.Strings("args", args))
|
||||
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(cmdCtx, "restic", args...)
|
||||
cmd.Env = resticEnv()
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
msg := strings.TrimSpace(string(out))
|
||||
if msg == "" {
|
||||
msg = err.Error()
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "restic sync failed: " + msg})
|
||||
return "failed", "restic sync failed"
|
||||
}
|
||||
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "restic sync completed"})
|
||||
return "success", "restic sync completed"
|
||||
}
|
||||
|
||||
func ensureResticRepo(ctx context.Context, repoPath string) error {
|
||||
check := exec.CommandContext(ctx, "restic", "-r", repoPath, "cat", "config")
|
||||
check.Env = resticEnv()
|
||||
if err := check.Run(); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
initCmd := exec.CommandContext(ctx, "restic", "-r", repoPath, "init")
|
||||
initCmd.Env = resticEnv()
|
||||
out, err := initCmd.CombinedOutput()
|
||||
if err != nil && !strings.Contains(string(out), "already initialized") {
|
||||
return errors.New(strings.TrimSpace(string(out)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func resticEnv() []string {
|
||||
env := os.Environ()
|
||||
password := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD"))
|
||||
passwordFile := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD_FILE"))
|
||||
password2 := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD2"))
|
||||
passwordFile2 := strings.TrimSpace(os.Getenv("RESTIC_PASSWORD_FILE2"))
|
||||
if password == "" && passwordFile == "" {
|
||||
password = configuredResticPassword()
|
||||
}
|
||||
if password != "" {
|
||||
env = append(env, "RESTIC_PASSWORD="+password)
|
||||
}
|
||||
if passwordFile != "" {
|
||||
env = append(env, "RESTIC_PASSWORD_FILE="+passwordFile)
|
||||
}
|
||||
if password2 != "" {
|
||||
env = append(env, "RESTIC_PASSWORD2="+password2)
|
||||
}
|
||||
if passwordFile2 != "" {
|
||||
env = append(env, "RESTIC_PASSWORD_FILE2="+passwordFile2)
|
||||
}
|
||||
return env
|
||||
}
|
||||
|
||||
type resticSnapshot struct {
|
||||
ID string `json:"id"`
|
||||
Time time.Time `json:"time"`
|
||||
}
|
||||
|
||||
func latestSnapshotIDForSite(ctx context.Context, repoPath, siteUUID string) (string, error) {
|
||||
args := []string{"-r", repoPath, "snapshots", "--json", "--tag", "site_uuid:" + siteUUID}
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
cmd := exec.CommandContext(cmdCtx, "restic", args...)
|
||||
cmd.Env = resticEnv()
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
msg := strings.TrimSpace(string(out))
|
||||
if msg == "" {
|
||||
msg = err.Error()
|
||||
}
|
||||
return "", errors.New(msg)
|
||||
}
|
||||
|
||||
var snapshots []resticSnapshot
|
||||
if err := json.Unmarshal(out, &snapshots); err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(snapshots) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
latest := snapshots[0]
|
||||
for _, snap := range snapshots[1:] {
|
||||
if snap.Time.After(latest.Time) {
|
||||
latest = snap
|
||||
}
|
||||
}
|
||||
return latest.ID, nil
|
||||
}
|
||||
|
||||
func targetStageDir(root, siteUUID string, target store.SiteTarget) string {
|
||||
hash := hashPath(targetIdentity(target))
|
||||
modeDir := "dir"
|
||||
if target.Mode == "sqlite_dump" {
|
||||
modeDir = "sqlite"
|
||||
} else if target.Mode == "mysql_dump" {
|
||||
modeDir = "mysql"
|
||||
}
|
||||
return filepath.Join(root, siteUUID, hash, modeDir)
|
||||
}
|
||||
|
||||
func targetIdentity(target store.SiteTarget) string {
|
||||
parts := []string{target.Mode, target.Path}
|
||||
if target.MySQLHost.Valid {
|
||||
parts = append(parts, target.MySQLHost.String)
|
||||
}
|
||||
if target.MySQLUser.Valid {
|
||||
parts = append(parts, target.MySQLUser.String)
|
||||
}
|
||||
if target.MySQLDB.Valid {
|
||||
parts = append(parts, target.MySQLDB.String)
|
||||
}
|
||||
return strings.Join(parts, "|")
|
||||
}
|
||||
|
||||
func hashPath(path string) string {
|
||||
sum := sha256.Sum256([]byte(path))
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
func shortHash(path string) string {
|
||||
v := hashPath(path)
|
||||
if len(v) < 12 {
|
||||
return v
|
||||
}
|
||||
return v[:12]
|
||||
}
|
||||
|
||||
func parseRsyncProgress(line string) string {
|
||||
if !strings.Contains(line, "/s") || !strings.Contains(line, "to-chk=") {
|
||||
return ""
|
||||
}
|
||||
fields := strings.Fields(strings.ReplaceAll(line, ",", ""))
|
||||
var percent, speed, eta string
|
||||
for _, f := range fields {
|
||||
if strings.HasSuffix(f, "%") {
|
||||
percent = f
|
||||
}
|
||||
if strings.HasSuffix(f, "/s") {
|
||||
speed = f
|
||||
}
|
||||
if strings.Count(f, ":") == 2 {
|
||||
eta = f
|
||||
}
|
||||
}
|
||||
if speed == "" {
|
||||
return ""
|
||||
}
|
||||
parts := make([]string, 0, 3)
|
||||
if percent != "" {
|
||||
parts = append(parts, percent)
|
||||
}
|
||||
parts = append(parts, speed)
|
||||
if eta != "" {
|
||||
parts = append(parts, "eta "+eta)
|
||||
}
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
func scanCRLF(data []byte, atEOF bool) (advance int, token []byte, err error) {
|
||||
for i := 0; i < len(data); i++ {
|
||||
if data[i] == '\n' || data[i] == '\r' {
|
||||
return i + 1, data[:i], nil
|
||||
}
|
||||
}
|
||||
if atEOF && len(data) > 0 {
|
||||
return len(data), data, nil
|
||||
}
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,39 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const defaultResticPassword = "satoru-change-me"
|
||||
|
||||
func getenvDefault(k, d string) string {
|
||||
v := strings.TrimSpace(os.Getenv(k))
|
||||
if v == "" {
|
||||
return d
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func parsePositiveIntEnv(name string, fallback int) int {
|
||||
raw := strings.TrimSpace(os.Getenv(name))
|
||||
if raw == "" {
|
||||
return fallback
|
||||
}
|
||||
v, err := strconv.Atoi(raw)
|
||||
if err != nil || v <= 0 {
|
||||
return fallback
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func configuredResticPassword() string {
|
||||
return getenvDefault("SATORU_RESTIC_PASSWORD", defaultResticPassword)
|
||||
}
|
||||
|
||||
func isUsingDefaultResticPassword() bool {
|
||||
return strings.TrimSpace(os.Getenv("RESTIC_PASSWORD")) == "" &&
|
||||
strings.TrimSpace(os.Getenv("RESTIC_PASSWORD_FILE")) == "" &&
|
||||
strings.TrimSpace(os.Getenv("SATORU_RESTIC_PASSWORD")) == ""
|
||||
}
|
||||
|
|
@ -44,6 +44,24 @@ func parsePathList(raw string) []string {
|
|||
return out
|
||||
}
|
||||
|
||||
func parseLineList(raw string) []string {
|
||||
split := strings.Split(raw, "\n")
|
||||
seen := map[string]struct{}{}
|
||||
out := make([]string, 0, len(split))
|
||||
for _, item := range split {
|
||||
item = strings.TrimSpace(item)
|
||||
if item == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[item]; ok {
|
||||
continue
|
||||
}
|
||||
seen[item] = struct{}{}
|
||||
out = append(out, item)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func buildTargets(directoryPaths, sqlitePaths []string) []store.SiteTarget {
|
||||
seen := map[string]struct{}{}
|
||||
out := make([]store.SiteTarget, 0, len(directoryPaths)+len(sqlitePaths))
|
||||
|
|
|
|||
|
|
@ -17,9 +17,12 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
jobTypePreflight = "preflight"
|
||||
jobPollInterval = 2 * time.Second
|
||||
jobWorkers = 3
|
||||
jobTypePreflight = "preflight"
|
||||
jobTypeBackup = "backup"
|
||||
jobTypeRetention = "retention"
|
||||
jobTypeResticSync = "restic_sync"
|
||||
jobPollInterval = 2 * time.Second
|
||||
jobWorkers = 3
|
||||
)
|
||||
|
||||
func (a *app) startJobWorkers(ctx context.Context) {
|
||||
|
|
@ -44,7 +47,9 @@ func (a *app) runWorkerLoop(ctx context.Context, workerID int) {
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
a.jobClaimMu.Lock()
|
||||
job, ok, err := a.store.TryStartNextQueuedJob(ctx)
|
||||
a.jobClaimMu.Unlock()
|
||||
if err != nil {
|
||||
a.log.Warn("worker failed to start job", zap.Int("worker_id", workerID), zap.Error(err))
|
||||
continue
|
||||
|
|
@ -58,54 +63,116 @@ func (a *app) runWorkerLoop(ctx context.Context, workerID int) {
|
|||
}
|
||||
|
||||
func (a *app) executeJob(ctx context.Context, job store.Job) {
|
||||
jobCtx, cancel := context.WithCancel(ctx)
|
||||
a.registerJobCancel(job.ID, cancel)
|
||||
defer func() {
|
||||
a.clearJobCancel(job.ID)
|
||||
cancel()
|
||||
}()
|
||||
|
||||
a.log.Info("job start", zap.Int64("job_id", job.ID), zap.Int64("site_id", job.SiteID), zap.String("job_type", job.Type))
|
||||
site, err := a.store.SiteByID(ctx, job.SiteID)
|
||||
site, err := a.store.SiteByID(jobCtx, job.SiteID)
|
||||
if err != nil {
|
||||
_ = a.store.CompleteJob(ctx, job.ID, "failed", "failed to load site")
|
||||
_ = a.store.CompleteJob(jobCtx, job.ID, "failed", "failed to load site")
|
||||
a.log.Error("job failed to load site", zap.Int64("job_id", job.ID), zap.Int64("site_id", job.SiteID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
_ = a.store.UpdateSiteRunResult(ctx, site.ID, "running", fmt.Sprintf("Job #%d running (%s)", job.ID, job.Type), time.Now())
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Job started"})
|
||||
_ = a.store.UpdateSiteRunResult(jobCtx, site.ID, "running", fmt.Sprintf("Job #%d running (%s)", job.ID, job.Type), time.Now())
|
||||
_ = a.store.AddJobEvent(jobCtx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Job started"})
|
||||
|
||||
switch job.Type {
|
||||
case jobTypePreflight:
|
||||
status, summary := a.runPreflightJob(ctx, job, site)
|
||||
_ = a.store.CompleteJob(ctx, job.ID, status, summary)
|
||||
_ = a.store.UpdateSiteRunResult(ctx, site.ID, status, summary, time.Now())
|
||||
status, summary := a.runPreflightJob(jobCtx, job, site)
|
||||
if jobCtx.Err() == context.Canceled {
|
||||
status = "failed"
|
||||
summary = "job canceled by user"
|
||||
}
|
||||
_ = a.store.CompleteJob(jobCtx, job.ID, status, summary)
|
||||
_ = a.store.UpdateSiteRunResult(jobCtx, site.ID, status, summary, time.Now())
|
||||
a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary))
|
||||
case jobTypeBackup:
|
||||
status, summary := a.runBackupJob(jobCtx, job, site)
|
||||
if jobCtx.Err() == context.Canceled {
|
||||
status = "failed"
|
||||
summary = "job canceled by user"
|
||||
}
|
||||
_ = a.store.CompleteJob(jobCtx, job.ID, status, summary)
|
||||
_ = a.store.UpdateSiteRunResult(jobCtx, site.ID, status, summary, time.Now())
|
||||
a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary))
|
||||
case jobTypeRetention:
|
||||
status, summary := a.runRetentionJob(jobCtx, job, site)
|
||||
if jobCtx.Err() == context.Canceled {
|
||||
status = "failed"
|
||||
summary = "job canceled by user"
|
||||
}
|
||||
_ = a.store.CompleteJob(jobCtx, job.ID, status, summary)
|
||||
a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary))
|
||||
case jobTypeResticSync:
|
||||
status, summary := a.runResticSyncJob(jobCtx, job, site)
|
||||
if jobCtx.Err() == context.Canceled {
|
||||
status = "failed"
|
||||
summary = "job canceled by user"
|
||||
}
|
||||
_ = a.store.CompleteJob(jobCtx, job.ID, status, summary)
|
||||
a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary))
|
||||
default:
|
||||
summary := "unknown job type"
|
||||
_ = a.store.CompleteJob(ctx, job.ID, "failed", summary)
|
||||
_ = a.store.UpdateSiteRunResult(ctx, site.ID, "failed", summary, time.Now())
|
||||
_ = a.store.CompleteJob(jobCtx, job.ID, "failed", summary)
|
||||
_ = a.store.UpdateSiteRunResult(jobCtx, site.ID, "failed", summary, time.Now())
|
||||
a.log.Warn("job unknown type", zap.Int64("job_id", job.ID), zap.String("job_type", job.Type))
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) registerJobCancel(jobID int64, cancel context.CancelFunc) {
|
||||
a.jobCancelMu.Lock()
|
||||
defer a.jobCancelMu.Unlock()
|
||||
a.jobCancels[jobID] = cancel
|
||||
}
|
||||
|
||||
func (a *app) clearJobCancel(jobID int64) {
|
||||
a.jobCancelMu.Lock()
|
||||
defer a.jobCancelMu.Unlock()
|
||||
delete(a.jobCancels, jobID)
|
||||
}
|
||||
|
||||
func (a *app) cancelRunningJob(jobID int64) bool {
|
||||
a.jobCancelMu.Lock()
|
||||
defer a.jobCancelMu.Unlock()
|
||||
cancel, ok := a.jobCancels[jobID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
cancel()
|
||||
return true
|
||||
}
|
||||
|
||||
func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Site) (string, string) {
|
||||
a.log.Debug("preflight begin", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.Int("targets", len(site.Targets)))
|
||||
failures := 0
|
||||
warnings := 0
|
||||
failedChecks := make([]string, 0)
|
||||
warnChecks := make([]string, 0)
|
||||
|
||||
requiredLocal := []string{"ssh", "rsync", "restic", "gzip"}
|
||||
for _, tool := range requiredLocal {
|
||||
a.log.Debug("preflight local tool check", zap.Int64("job_id", job.ID), zap.String("tool", tool))
|
||||
if _, err := exec.LookPath(tool); err != nil {
|
||||
failures++
|
||||
failedChecks = append(failedChecks, "local tool missing: "+tool)
|
||||
a.log.Debug("preflight local tool missing", zap.Int64("job_id", job.ID), zap.String("tool", tool), zap.Error(err))
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: fmt.Sprintf("local tool missing: %s", tool)})
|
||||
}
|
||||
}
|
||||
if failures > 0 {
|
||||
return "failed", fmt.Sprintf("preflight failed: %d local tool checks failed", failures)
|
||||
return "failed", "preflight failed: " + strings.Join(failedChecks, "; ")
|
||||
}
|
||||
|
||||
a.log.Debug("preflight ssh connectivity check", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID))
|
||||
if err := sshCheck(ctx, site, "echo preflight-ok"); err != nil {
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "error", Message: "ssh connectivity failed: " + err.Error()})
|
||||
a.log.Debug("preflight ssh connectivity failed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.Error(err))
|
||||
return "failed", "preflight failed: ssh connectivity"
|
||||
return "failed", "preflight failed: ssh connectivity: " + err.Error()
|
||||
}
|
||||
a.log.Debug("preflight ssh connectivity ok", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID))
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "ssh connectivity ok"})
|
||||
|
|
@ -115,9 +182,11 @@ func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Sit
|
|||
var err error
|
||||
switch t.Mode {
|
||||
case "directory":
|
||||
err = sshCheck(ctx, site, fmt.Sprintf("test -d -- %s", shellQuote(t.Path)))
|
||||
err = sshCheck(ctx, site, fmt.Sprintf("[ -d %s ] || [ -f %s ]", shellQuote(t.Path), shellQuote(t.Path)))
|
||||
case "sqlite_dump":
|
||||
err = sqlitePreflightCheck(ctx, site, t.Path)
|
||||
case "mysql_dump":
|
||||
err = mysqlPreflightCheck(ctx, site, t)
|
||||
default:
|
||||
err = fmt.Errorf("unknown target mode: %s", t.Mode)
|
||||
}
|
||||
|
|
@ -125,6 +194,7 @@ func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Sit
|
|||
if err != nil {
|
||||
warnings++
|
||||
msg := fmt.Sprintf("target %s (%s): %s", t.Path, t.Mode, err.Error())
|
||||
warnChecks = append(warnChecks, msg)
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: msg})
|
||||
a.log.Debug("preflight target check failed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("target_path", t.Path), zap.String("target_mode", t.Mode), zap.Error(err))
|
||||
} else {
|
||||
|
|
@ -137,9 +207,9 @@ func (a *app) runPreflightJob(ctx context.Context, job store.Job, site store.Sit
|
|||
case warnings == 0:
|
||||
return "success", fmt.Sprintf("preflight passed (%d targets)", len(site.Targets))
|
||||
case warnings == len(site.Targets):
|
||||
return "failed", fmt.Sprintf("preflight failed (%d/%d target checks failed)", warnings, len(site.Targets))
|
||||
return "failed", fmt.Sprintf("preflight failed (%d/%d target checks failed): %s", warnings, len(site.Targets), strings.Join(warnChecks, "; "))
|
||||
default:
|
||||
return "warning", fmt.Sprintf("preflight warning (%d/%d target checks failed)", warnings, len(site.Targets))
|
||||
return "warning", fmt.Sprintf("preflight warning (%d/%d target checks failed): %s", warnings, len(site.Targets), strings.Join(warnChecks, "; "))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -164,14 +234,25 @@ func sqlitePreflightCheck(ctx context.Context, site store.Site, dbPath string) e
|
|||
quoted := shellQuote(dbPath)
|
||||
cmd := strings.Join([]string{
|
||||
"sqlite3 --version >/dev/null",
|
||||
"test -r -- " + quoted,
|
||||
"[ -r " + quoted + " ]",
|
||||
"tmp=$(mktemp /tmp/satoru-preflight.XXXXXX)",
|
||||
"sqlite3 " + quoted + " \".backup $tmp\"",
|
||||
sqliteBackupCommand(quoted, "$tmp"),
|
||||
"rm -f -- \"$tmp\"",
|
||||
}, " && ")
|
||||
return sshCheck(ctx, site, cmd)
|
||||
}
|
||||
|
||||
func mysqlPreflightCheck(ctx context.Context, site store.Site, target store.SiteTarget) error {
|
||||
if !target.MySQLHost.Valid || !target.MySQLUser.Valid || !target.MySQLDB.Valid || !target.MySQLPassword.Valid {
|
||||
return errors.New("mysql target missing db host/db user/db name/db password")
|
||||
}
|
||||
cmd := strings.Join([]string{
|
||||
"mysqldump --version >/dev/null",
|
||||
mysqlDumpCommand(target, true, ""),
|
||||
}, " && ")
|
||||
return sshCheck(ctx, site, cmd)
|
||||
}
|
||||
|
||||
func (a *app) enqueuePreflightJob(ctx context.Context, siteID int64) (store.Job, error) {
|
||||
job, err := a.store.CreateJob(ctx, siteID, jobTypePreflight)
|
||||
if err != nil {
|
||||
|
|
@ -182,6 +263,16 @@ func (a *app) enqueuePreflightJob(ctx context.Context, siteID int64) (store.Job,
|
|||
return job, nil
|
||||
}
|
||||
|
||||
func (a *app) enqueueBackupJob(ctx context.Context, siteID int64) (store.Job, error) {
|
||||
job, err := a.store.CreateJob(ctx, siteID, jobTypeBackup)
|
||||
if err != nil {
|
||||
return store.Job{}, err
|
||||
}
|
||||
_ = a.store.UpdateSiteRunResult(ctx, siteID, "queued", fmt.Sprintf("Job #%d queued (backup)", job.ID), time.Now())
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "info", Message: "Backup job queued"})
|
||||
return job, nil
|
||||
}
|
||||
|
||||
func (a *app) latestJobForSite(ctx context.Context, siteID int64) (store.Job, error) {
|
||||
jobs, err := a.store.ListRecentJobs(ctx, 200)
|
||||
if err != nil {
|
||||
|
|
@ -194,3 +285,112 @@ func (a *app) latestJobForSite(ctx context.Context, siteID int64) (store.Job, er
|
|||
}
|
||||
return store.Job{}, sql.ErrNoRows
|
||||
}
|
||||
|
||||
func (a *app) latestActiveJobForSite(ctx context.Context, siteID int64) (store.Job, error) {
|
||||
jobs, err := a.store.ListRecentJobs(ctx, 500)
|
||||
if err != nil {
|
||||
return store.Job{}, err
|
||||
}
|
||||
for _, j := range jobs {
|
||||
if j.SiteID != siteID {
|
||||
continue
|
||||
}
|
||||
if j.Status == "queued" || j.Status == "running" {
|
||||
return j, nil
|
||||
}
|
||||
}
|
||||
return store.Job{}, sql.ErrNoRows
|
||||
}
|
||||
|
||||
func (a *app) cancelLatestActiveJobForSite(ctx context.Context, siteID int64) (bool, error) {
|
||||
job, err := a.latestActiveJobForSite(ctx, siteID)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return false, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
switch job.Status {
|
||||
case "queued":
|
||||
changed, err := a.store.CancelQueuedJob(ctx, job.ID, "job canceled by user")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !changed {
|
||||
return false, nil
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job canceled by user before start"})
|
||||
_ = a.store.UpdateSiteRunResult(ctx, siteID, "failed", "job canceled by user", time.Now())
|
||||
return true, nil
|
||||
case "running":
|
||||
if !a.cancelRunningJob(job.ID) {
|
||||
return false, nil
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job cancel requested by user"})
|
||||
_ = a.store.UpdateSiteRunResult(ctx, siteID, "running", "cancel requested by user", time.Now())
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) cancelJobByID(ctx context.Context, jobID int64) (bool, error) {
|
||||
job, err := a.store.JobByID(ctx, jobID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
switch job.Status {
|
||||
case "queued":
|
||||
changed, err := a.store.CancelQueuedJob(ctx, job.ID, "job canceled by user")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if !changed {
|
||||
return false, nil
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job canceled by user before start"})
|
||||
_ = a.store.UpdateSiteRunResult(ctx, job.SiteID, "failed", "job canceled by user", time.Now())
|
||||
return true, nil
|
||||
case "running":
|
||||
if !a.cancelRunningJob(job.ID) {
|
||||
return false, nil
|
||||
}
|
||||
_ = a.store.AddJobEvent(ctx, store.JobEvent{JobID: job.ID, Level: "warn", Message: "job cancel requested by user"})
|
||||
_ = a.store.UpdateSiteRunResult(ctx, job.SiteID, "running", "cancel requested by user", time.Now())
|
||||
return true, nil
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) latestJobForSiteType(ctx context.Context, siteID int64, jobType string) (store.Job, error) {
|
||||
jobs, err := a.store.ListRecentJobs(ctx, 500)
|
||||
if err != nil {
|
||||
return store.Job{}, err
|
||||
}
|
||||
for _, j := range jobs {
|
||||
if j.SiteID == siteID && j.Type == jobType {
|
||||
return j, nil
|
||||
}
|
||||
}
|
||||
return store.Job{}, sql.ErrNoRows
|
||||
}
|
||||
|
||||
func (a *app) jobTypeDueForSite(ctx context.Context, siteID int64, jobType string, interval time.Duration, now time.Time) (bool, error) {
|
||||
j, err := a.latestJobForSiteType(ctx, siteID, jobType)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return true, nil
|
||||
}
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if j.Status == "queued" || j.Status == "running" {
|
||||
return false, nil
|
||||
}
|
||||
if !j.FinishedAt.Valid {
|
||||
return true, nil
|
||||
}
|
||||
return j.FinishedAt.Time.Add(interval).Before(now) || j.FinishedAt.Time.Add(interval).Equal(now), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import (
|
|||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/a-h/templ"
|
||||
|
|
@ -28,8 +29,11 @@ const (
|
|||
)
|
||||
|
||||
type app struct {
|
||||
store *store.Store
|
||||
log *zap.Logger
|
||||
store *store.Store
|
||||
log *zap.Logger
|
||||
jobClaimMu sync.Mutex
|
||||
jobCancelMu sync.Mutex
|
||||
jobCancels map[int64]context.CancelFunc
|
||||
}
|
||||
|
||||
func main() {
|
||||
|
|
@ -50,12 +54,20 @@ func main() {
|
|||
defer st.Close()
|
||||
st.SetLogger(logger)
|
||||
|
||||
a := &app{store: st, log: logger}
|
||||
a := &app{
|
||||
store: st,
|
||||
log: logger,
|
||||
jobCancels: map[int64]context.CancelFunc{},
|
||||
}
|
||||
if isUsingDefaultResticPassword() {
|
||||
logger.Warn("using built-in default restic password; set SATORU_RESTIC_PASSWORD or RESTIC_PASSWORD for production")
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
go a.startSiteScanLoop(ctx)
|
||||
go a.startJobWorkers(ctx)
|
||||
go a.startMaintenanceScheduler(ctx)
|
||||
|
||||
r := chi.NewRouter()
|
||||
|
||||
|
|
@ -63,10 +75,16 @@ func main() {
|
|||
r.Handle("/static/*", http.StripPrefix("/static/", fileServer))
|
||||
|
||||
r.Get("/", a.handleHome)
|
||||
r.Get("/ws/progress", a.handleProgressWebSocket)
|
||||
r.Get("/account/password", a.handlePasswordPage)
|
||||
r.Post("/account/password", a.handlePasswordSubmit)
|
||||
r.Post("/sites", a.handleSiteCreate)
|
||||
r.Post("/sites/{id}/run", a.handleSiteRun)
|
||||
r.Post("/sites/{id}/mysql-dumps", a.handleSiteAddMySQLDump)
|
||||
r.Post("/sites/{id}/cancel", a.handleSiteCancel)
|
||||
r.Post("/sites/{id}/restart", a.handleSiteRestart)
|
||||
r.Post("/jobs/{id}/cancel", a.handleJobCancel)
|
||||
r.Post("/sites/{id}/targets/{targetID}/delete", a.handleSiteTargetDelete)
|
||||
r.Post("/sites/{id}/update", a.handleSiteUpdate)
|
||||
r.Post("/sites/{id}/delete", a.handleSiteDelete)
|
||||
r.Get("/signup", a.handleSignupPage)
|
||||
|
|
@ -287,6 +305,7 @@ func (a *app) handleSiteCreate(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
directoryPaths := parsePathList(r.FormValue("directory_paths"))
|
||||
sqlitePaths := parsePathList(r.FormValue("sqlite_paths"))
|
||||
filters := parseLineList(r.FormValue("filters"))
|
||||
targets := buildTargets(directoryPaths, sqlitePaths)
|
||||
if sshUser == "" || host == "" || len(targets) == 0 {
|
||||
http.Redirect(w, r, "/?msg=site-invalid", http.StatusSeeOther)
|
||||
|
|
@ -297,7 +316,7 @@ func (a *app) handleSiteCreate(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
site, err := a.store.CreateSite(r.Context(), sshUser, host, port, targets)
|
||||
site, err := a.store.CreateSite(r.Context(), sshUser, host, port, targets, filters)
|
||||
if err != nil {
|
||||
http.Error(w, "failed to add site", http.StatusInternalServerError)
|
||||
return
|
||||
|
|
@ -329,13 +348,163 @@ func (a *app) handleSiteRun(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if _, err := a.enqueuePreflightJob(r.Context(), id); err != nil {
|
||||
if _, err := a.enqueueBackupJob(r.Context(), id); err != nil {
|
||||
http.Error(w, "failed to queue job", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
http.Redirect(w, r, "/?msg=job-queued", http.StatusSeeOther)
|
||||
}
|
||||
|
||||
func (a *app) handleSiteAddMySQLDump(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
|
||||
http.Redirect(w, r, "/signin", http.StatusSeeOther)
|
||||
return
|
||||
}
|
||||
siteID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid site id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if _, err := a.store.SiteByID(r.Context(), siteID); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
http.Error(w, "failed to load site", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if err := r.ParseForm(); err != nil {
|
||||
http.Error(w, "invalid form", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
dbHost := strings.TrimSpace(r.FormValue("db_host"))
|
||||
dbUser := strings.TrimSpace(r.FormValue("db_user"))
|
||||
dbName := strings.TrimSpace(r.FormValue("db_name"))
|
||||
dbPassword := r.FormValue("db_password")
|
||||
if dbHost == "" || dbUser == "" || dbName == "" || dbPassword == "" {
|
||||
http.Redirect(w, r, "/?msg=mysql-invalid", http.StatusSeeOther)
|
||||
return
|
||||
}
|
||||
|
||||
if err := a.store.AddMySQLDumpTarget(r.Context(), siteID, dbHost, dbUser, dbName, dbPassword); err != nil {
|
||||
http.Error(w, "failed to add mysql dump target", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
http.Redirect(w, r, "/?msg=mysql-added", http.StatusSeeOther)
|
||||
}
|
||||
|
||||
func (a *app) handleSiteCancel(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
|
||||
http.Redirect(w, r, "/signin", http.StatusSeeOther)
|
||||
return
|
||||
}
|
||||
siteID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid site id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if _, err := a.store.SiteByID(r.Context(), siteID); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
http.Error(w, "failed to load site", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
changed, err := a.cancelLatestActiveJobForSite(r.Context(), siteID)
|
||||
if err != nil {
|
||||
http.Error(w, "failed to cancel job", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if !changed {
|
||||
http.Redirect(w, r, "/?msg=no-active-job", http.StatusSeeOther)
|
||||
return
|
||||
}
|
||||
http.Redirect(w, r, "/?msg=job-cancel-requested", http.StatusSeeOther)
|
||||
}
|
||||
|
||||
func (a *app) handleSiteRestart(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
|
||||
http.Redirect(w, r, "/signin", http.StatusSeeOther)
|
||||
return
|
||||
}
|
||||
siteID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid site id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if _, err := a.store.SiteByID(r.Context(), siteID); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
http.Error(w, "failed to load site", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if _, err := a.enqueueBackupJob(r.Context(), siteID); err != nil {
|
||||
http.Error(w, "failed to queue job", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
http.Redirect(w, r, "/?msg=job-restarted", http.StatusSeeOther)
|
||||
}
|
||||
|
||||
func (a *app) handleJobCancel(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
|
||||
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
jobID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid job id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
changed, err := a.cancelJobByID(r.Context(), jobID)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, "failed to cancel job", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
if !changed {
|
||||
w.WriteHeader(http.StatusConflict)
|
||||
_, _ = w.Write([]byte("job is not active"))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (a *app) handleSiteTargetDelete(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
|
||||
http.Redirect(w, r, "/signin", http.StatusSeeOther)
|
||||
return
|
||||
}
|
||||
siteID, err := strconv.ParseInt(chi.URLParam(r, "id"), 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid site id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
targetID, err := strconv.ParseInt(chi.URLParam(r, "targetID"), 10, 64)
|
||||
if err != nil {
|
||||
http.Error(w, "invalid target id", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
if err := a.store.DeleteSiteTarget(r.Context(), siteID, targetID); err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.Redirect(w, r, "/?msg=target-not-found", http.StatusSeeOther)
|
||||
return
|
||||
}
|
||||
http.Error(w, "failed to delete target", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
http.Redirect(w, r, "/?msg=target-deleted", http.StatusSeeOther)
|
||||
}
|
||||
|
||||
func (a *app) handleSiteUpdate(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
|
||||
http.Redirect(w, r, "/signin", http.StatusSeeOther)
|
||||
|
|
@ -360,6 +529,7 @@ func (a *app) handleSiteUpdate(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
directoryPaths := parsePathList(r.FormValue("directory_paths"))
|
||||
sqlitePaths := parsePathList(r.FormValue("sqlite_paths"))
|
||||
filters := parseLineList(r.FormValue("filters"))
|
||||
targets := buildTargets(directoryPaths, sqlitePaths)
|
||||
if sshUser == "" || host == "" || len(targets) == 0 {
|
||||
http.Redirect(w, r, "/?msg=site-invalid", http.StatusSeeOther)
|
||||
|
|
@ -370,7 +540,7 @@ func (a *app) handleSiteUpdate(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
site, err := a.store.UpdateSite(r.Context(), id, sshUser, host, port, targets)
|
||||
site, err := a.store.UpdateSite(r.Context(), id, sshUser, host, port, targets, filters)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
http.NotFound(w, r)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,70 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"satoru/internal/store"
|
||||
)
|
||||
|
||||
const (
|
||||
maintenanceTick = time.Minute
|
||||
)
|
||||
|
||||
func (a *app) startMaintenanceScheduler(ctx context.Context) {
|
||||
ticker := time.NewTicker(maintenanceTick)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Run once at boot.
|
||||
a.enqueueDueMaintenanceJobs(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
a.enqueueDueMaintenanceJobs(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) enqueueDueMaintenanceJobs(ctx context.Context) {
|
||||
sites, err := a.store.ListSites(ctx)
|
||||
if err != nil {
|
||||
a.log.Warn("maintenance list sites failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
retentionInterval := maintenanceIntervalFromEnv("SATORU_RETENTION_INTERVAL_HOURS", 24)
|
||||
syncInterval := maintenanceIntervalFromEnv("SATORU_RESTIC_SYNC_INTERVAL_HOURS", 24)
|
||||
now := time.Now()
|
||||
|
||||
for _, site := range sites {
|
||||
a.enqueueIfDue(ctx, site, jobTypeRetention, retentionInterval, now)
|
||||
a.enqueueIfDue(ctx, site, jobTypeResticSync, syncInterval, now)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) enqueueIfDue(ctx context.Context, site store.Site, jobType string, interval time.Duration, now time.Time) {
|
||||
due, err := a.jobTypeDueForSite(ctx, site.ID, jobType, interval, now)
|
||||
if err != nil {
|
||||
a.log.Warn("maintenance due-check failed", zap.Int64("site_id", site.ID), zap.String("job_type", jobType), zap.Error(err))
|
||||
return
|
||||
}
|
||||
if !due {
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := a.store.CreateJob(ctx, site.ID, jobType); err != nil {
|
||||
a.log.Warn("maintenance enqueue failed", zap.Int64("site_id", site.ID), zap.String("job_type", jobType), zap.Error(err))
|
||||
return
|
||||
}
|
||||
a.log.Debug("maintenance job queued", zap.Int64("site_id", site.ID), zap.String("job_type", jobType))
|
||||
}
|
||||
|
||||
func maintenanceIntervalFromEnv(name string, fallbackHours int) time.Duration {
|
||||
hours := parsePositiveIntEnv(name, fallbackHours)
|
||||
return time.Duration(hours) * time.Hour
|
||||
}
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"satoru/internal/store"
|
||||
)
|
||||
|
||||
func mysqlDumpCommand(target store.SiteTarget, noData bool, outputPath string) string {
|
||||
base := fmt.Sprintf(
|
||||
"mysqldump -h %s -u %s -p%s --single-transaction --quick --routines --events %s",
|
||||
shellQuote(target.MySQLHost.String),
|
||||
shellQuote(target.MySQLUser.String),
|
||||
shellQuote(target.MySQLPassword.String),
|
||||
shellQuote(target.MySQLDB.String),
|
||||
)
|
||||
if noData {
|
||||
return base + " --no-data >/dev/null"
|
||||
}
|
||||
return base + " > " + shellQuote(outputPath)
|
||||
}
|
||||
|
||||
func mysqlStatusCommand(target store.SiteTarget) string {
|
||||
return fmt.Sprintf(
|
||||
"mysql --version >/dev/null && mysql -h %s -u %s -p%s --connect-timeout=5 -e %s %s >/dev/null",
|
||||
shellQuote(target.MySQLHost.String),
|
||||
shellQuote(target.MySQLUser.String),
|
||||
shellQuote(target.MySQLPassword.String),
|
||||
shellQuote("SELECT 1"),
|
||||
shellQuote(target.MySQLDB.String),
|
||||
)
|
||||
}
|
||||
|
|
@ -0,0 +1,160 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var progressUpgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
origin := strings.TrimSpace(r.Header.Get("Origin"))
|
||||
if origin == "" {
|
||||
return true
|
||||
}
|
||||
return strings.Contains(origin, r.Host)
|
||||
},
|
||||
}
|
||||
|
||||
type liveSiteStatus struct {
|
||||
SiteID int64 `json:"site_id"`
|
||||
LastRunStatus string `json:"last_run_status"`
|
||||
LastRunOutput string `json:"last_run_output"`
|
||||
LastRunAt time.Time `json:"last_run_at,omitempty"`
|
||||
}
|
||||
|
||||
type liveJobStatus struct {
|
||||
JobID int64 `json:"job_id"`
|
||||
SiteID int64 `json:"site_id"`
|
||||
Type string `json:"type"`
|
||||
Status string `json:"status"`
|
||||
Summary string `json:"summary,omitempty"`
|
||||
StartedAt time.Time `json:"started_at,omitempty"`
|
||||
FinishedAt time.Time `json:"finished_at,omitempty"`
|
||||
}
|
||||
|
||||
type liveJobEvent struct {
|
||||
JobID int64 `json:"job_id"`
|
||||
SiteID int64 `json:"site_id"`
|
||||
JobType string `json:"job_type"`
|
||||
Level string `json:"level"`
|
||||
Message string `json:"message"`
|
||||
OccurredAt time.Time `json:"occurred_at"`
|
||||
}
|
||||
|
||||
type liveProgressPayload struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Sites []liveSiteStatus `json:"sites"`
|
||||
Jobs []liveJobStatus `json:"jobs"`
|
||||
Events []liveJobEvent `json:"events"`
|
||||
}
|
||||
|
||||
func (a *app) handleProgressWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
if _, err := a.currentUserWithRollingSession(w, r); err != nil {
|
||||
http.Error(w, "unauthorized", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := progressUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
a.log.Debug("progress ws upgrade failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
send := func() error {
|
||||
payload, err := a.buildLiveProgressPayload(r.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_ = conn.SetWriteDeadline(time.Now().Add(8 * time.Second))
|
||||
return conn.WriteJSON(payload)
|
||||
}
|
||||
|
||||
if err := send(); err != nil {
|
||||
a.log.Debug("progress ws initial send failed", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := send(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) buildLiveProgressPayload(ctx context.Context) (liveProgressPayload, error) {
|
||||
sites, err := a.store.ListSites(ctx)
|
||||
if err != nil {
|
||||
return liveProgressPayload{}, err
|
||||
}
|
||||
jobs, err := a.store.ListRecentJobs(ctx, 30)
|
||||
if err != nil {
|
||||
return liveProgressPayload{}, err
|
||||
}
|
||||
events, err := a.store.ListRecentJobEvents(ctx, 60)
|
||||
if err != nil {
|
||||
return liveProgressPayload{}, err
|
||||
}
|
||||
|
||||
payload := liveProgressPayload{
|
||||
Timestamp: time.Now().UTC(),
|
||||
Sites: make([]liveSiteStatus, 0, len(sites)),
|
||||
Jobs: make([]liveJobStatus, 0, len(jobs)),
|
||||
Events: make([]liveJobEvent, 0, len(events)),
|
||||
}
|
||||
for _, site := range sites {
|
||||
row := liveSiteStatus{
|
||||
SiteID: site.ID,
|
||||
LastRunStatus: strings.TrimSpace(site.LastRunStatus.String),
|
||||
LastRunOutput: strings.TrimSpace(site.LastRunOutput.String),
|
||||
}
|
||||
if row.LastRunStatus == "" {
|
||||
row.LastRunStatus = "pending"
|
||||
}
|
||||
if site.LastRunAt.Valid {
|
||||
row.LastRunAt = site.LastRunAt.Time
|
||||
}
|
||||
payload.Sites = append(payload.Sites, row)
|
||||
}
|
||||
for _, job := range jobs {
|
||||
row := liveJobStatus{
|
||||
JobID: job.ID,
|
||||
SiteID: job.SiteID,
|
||||
Type: job.Type,
|
||||
Status: job.Status,
|
||||
Summary: strings.TrimSpace(job.Summary.String),
|
||||
}
|
||||
if job.StartedAt.Valid {
|
||||
row.StartedAt = job.StartedAt.Time
|
||||
}
|
||||
if job.FinishedAt.Valid {
|
||||
row.FinishedAt = job.FinishedAt.Time
|
||||
}
|
||||
payload.Jobs = append(payload.Jobs, row)
|
||||
}
|
||||
for _, ev := range events {
|
||||
payload.Events = append(payload.Events, liveJobEvent{
|
||||
JobID: ev.JobID,
|
||||
SiteID: ev.SiteID,
|
||||
JobType: ev.JobType,
|
||||
Level: ev.Level,
|
||||
Message: ev.Message,
|
||||
OccurredAt: ev.OccurredAt,
|
||||
})
|
||||
}
|
||||
return payload, nil
|
||||
}
|
||||
|
|
@ -136,6 +136,10 @@ func (a *app) scanSiteNow(ctx context.Context, siteID int64) {
|
|||
}
|
||||
|
||||
func queryTargetSize(ctx context.Context, site store.Site, target store.SiteTarget) (int64, error) {
|
||||
if target.Mode == "mysql_dump" {
|
||||
return queryMySQLTargetStatus(ctx, site, target)
|
||||
}
|
||||
|
||||
targetAddr := fmt.Sprintf("%s@%s", site.SSHUser, site.Host)
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
|
||||
defer cancel()
|
||||
|
|
@ -157,11 +161,34 @@ func queryTargetSize(ctx context.Context, site store.Site, target store.SiteTarg
|
|||
return size, nil
|
||||
}
|
||||
|
||||
func queryMySQLTargetStatus(ctx context.Context, site store.Site, target store.SiteTarget) (int64, error) {
|
||||
if !target.MySQLHost.Valid || !target.MySQLUser.Valid || !target.MySQLDB.Valid || !target.MySQLPassword.Valid {
|
||||
return 0, errors.New("mysql target missing db host/db user/db name/db password")
|
||||
}
|
||||
|
||||
targetAddr := fmt.Sprintf("%s@%s", site.SSHUser, site.Host)
|
||||
cmdCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
|
||||
defer cancel()
|
||||
cmd := exec.CommandContext(cmdCtx, "ssh", "-p", strconv.Itoa(site.Port), targetAddr, mysqlStatusCommand(target))
|
||||
out, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
msg := strings.TrimSpace(string(out))
|
||||
if msg == "" {
|
||||
msg = err.Error()
|
||||
}
|
||||
return 0, errors.New(msg)
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func remoteSizeCommand(target store.SiteTarget) string {
|
||||
path := shellQuote(target.Path)
|
||||
if target.Mode == "sqlite_dump" {
|
||||
return fmt.Sprintf("stat -c%%s -- %s", path)
|
||||
}
|
||||
if target.Mode == "mysql_dump" {
|
||||
return ""
|
||||
}
|
||||
return fmt.Sprintf("du -sb -- %s | awk '{print $1}'", path)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,18 @@
|
|||
package main
|
||||
|
||||
import "fmt"
|
||||
|
||||
const (
|
||||
sqliteBackupRetryCount = 8
|
||||
sqliteBackupTimeoutMS = 5000
|
||||
)
|
||||
|
||||
func sqliteBackupCommand(quotedDBPath, quotedBackupPath string) string {
|
||||
return fmt.Sprintf(
|
||||
"ok=0; for i in $(seq 1 %d); do sqlite3 -cmd \".timeout %d\" %s \".backup %s\" && ok=1 && break; sleep 1; done; [ \"$ok\" -eq 1 ]",
|
||||
sqliteBackupRetryCount,
|
||||
sqliteBackupTimeoutMS,
|
||||
quotedDBPath,
|
||||
quotedBackupPath,
|
||||
)
|
||||
}
|
||||
1
go.mod
1
go.mod
|
|
@ -5,6 +5,7 @@ go 1.25.7
|
|||
require (
|
||||
github.com/a-h/templ v0.3.977
|
||||
github.com/go-chi/chi/v5 v5.2.5
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
go.uber.org/zap v1.27.1
|
||||
golang.org/x/crypto v0.47.0
|
||||
modernc.org/sqlite v1.44.3
|
||||
|
|
|
|||
2
go.sum
2
go.sum
|
|
@ -12,6 +12,8 @@ github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17k
|
|||
github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
|
||||
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,354 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type migration struct {
|
||||
version int
|
||||
name string
|
||||
up func(context.Context, *sql.Tx) error
|
||||
}
|
||||
|
||||
func (s *Store) runMigrations(ctx context.Context) error {
|
||||
if _, err := s.db.ExecContext(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
version INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
migrations := []migration{
|
||||
{
|
||||
version: 1,
|
||||
name: "initial_schema",
|
||||
up: func(ctx context.Context, tx *sql.Tx) error {
|
||||
stmts := []string{
|
||||
`CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
username TEXT NOT NULL UNIQUE,
|
||||
password_hash TEXT NOT NULL,
|
||||
is_admin INTEGER NOT NULL DEFAULT 0,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS sessions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
token_hash TEXT NOT NULL UNIQUE,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at DATETIME NOT NULL
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS sites (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_uuid TEXT NOT NULL UNIQUE,
|
||||
ssh_user TEXT NOT NULL,
|
||||
host TEXT NOT NULL,
|
||||
port INTEGER NOT NULL DEFAULT 22,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
last_run_status TEXT,
|
||||
last_run_output TEXT,
|
||||
last_run_at DATETIME,
|
||||
last_scan_at DATETIME,
|
||||
last_scan_state TEXT,
|
||||
last_scan_notes TEXT
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS site_targets (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
|
||||
path TEXT NOT NULL,
|
||||
mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump', 'mysql_dump')),
|
||||
mysql_host TEXT,
|
||||
mysql_user TEXT,
|
||||
mysql_db TEXT,
|
||||
mysql_password TEXT,
|
||||
last_size_bytes INTEGER,
|
||||
last_scan_at DATETIME,
|
||||
last_error TEXT,
|
||||
UNIQUE(site_id, path, mode)
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
|
||||
type TEXT NOT NULL,
|
||||
status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'success', 'warning', 'failed')),
|
||||
summary TEXT,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
started_at DATETIME,
|
||||
finished_at DATETIME
|
||||
)`,
|
||||
`CREATE TABLE IF NOT EXISTS job_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
|
||||
level TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
)`,
|
||||
}
|
||||
for _, stmt := range stmts {
|
||||
if _, err := tx.ExecContext(ctx, stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 2,
|
||||
name: "ensure_sites_site_uuid",
|
||||
up: func(ctx context.Context, tx *sql.Tx) error {
|
||||
exists, err := tableExists(ctx, tx, "sites")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
hasUUID, err := columnExists(ctx, tx, "sites", "site_uuid")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !hasUUID {
|
||||
if _, err := tx.ExecContext(ctx, `ALTER TABLE sites ADD COLUMN site_uuid TEXT`); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
rows, err := tx.QueryContext(ctx, `SELECT id, site_uuid FROM sites`)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var idsNeedingUUID []int64
|
||||
for rows.Next() {
|
||||
var id int64
|
||||
var uuid sql.NullString
|
||||
if err := rows.Scan(&id, &uuid); err != nil {
|
||||
return err
|
||||
}
|
||||
if !uuid.Valid || strings.TrimSpace(uuid.String) == "" {
|
||||
idsNeedingUUID = append(idsNeedingUUID, id)
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, id := range idsNeedingUUID {
|
||||
uuid, err := newSiteUUID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `UPDATE sites SET site_uuid = ? WHERE id = ?`, uuid, id); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_sites_site_uuid ON sites(site_uuid)`); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 3,
|
||||
name: "add_runtime_query_indexes",
|
||||
up: func(ctx context.Context, tx *sql.Tx) error {
|
||||
stmts := []string{
|
||||
`CREATE INDEX IF NOT EXISTS idx_jobs_status_id ON jobs(status, id)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_jobs_site_type_id ON jobs(site_id, type, id DESC)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_job_events_job_id_id ON job_events(job_id, id DESC)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions(expires_at)`,
|
||||
}
|
||||
for _, stmt := range stmts {
|
||||
if _, err := tx.ExecContext(ctx, stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 4,
|
||||
name: "site_filters",
|
||||
up: func(ctx context.Context, tx *sql.Tx) error {
|
||||
stmts := []string{
|
||||
`CREATE TABLE IF NOT EXISTS site_filters (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
|
||||
pattern TEXT NOT NULL,
|
||||
UNIQUE(site_id, pattern)
|
||||
)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_site_filters_site_id ON site_filters(site_id, id)`,
|
||||
}
|
||||
for _, stmt := range stmts {
|
||||
if _, err := tx.ExecContext(ctx, stmt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 5,
|
||||
name: "site_targets_mysql_dump",
|
||||
up: func(ctx context.Context, tx *sql.Tx) error {
|
||||
exists, err := tableExists(ctx, tx, "site_targets")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := tx.ExecContext(ctx, `CREATE TABLE site_targets_new (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
|
||||
path TEXT NOT NULL,
|
||||
mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump', 'mysql_dump')),
|
||||
mysql_host TEXT,
|
||||
mysql_user TEXT,
|
||||
mysql_db TEXT,
|
||||
mysql_password TEXT,
|
||||
last_size_bytes INTEGER,
|
||||
last_scan_at DATETIME,
|
||||
last_error TEXT,
|
||||
UNIQUE(site_id, path, mode)
|
||||
)`); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
hasMySQLHost, err := columnExists(ctx, tx, "site_targets", "mysql_host")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if hasMySQLHost {
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
INSERT INTO site_targets_new (id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error)
|
||||
SELECT id, site_id, path, mode, mysql_host, NULL, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error
|
||||
FROM site_targets`); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if _, err := tx.ExecContext(ctx, `
|
||||
INSERT INTO site_targets_new (id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error)
|
||||
SELECT id, site_id, path, mode, NULL, NULL, NULL, NULL, last_size_bytes, last_scan_at, last_error
|
||||
FROM site_targets`); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if _, err := tx.ExecContext(ctx, `DROP TABLE site_targets`); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `ALTER TABLE site_targets_new RENAME TO site_targets`); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
version: 6,
|
||||
name: "site_targets_mysql_user",
|
||||
up: func(ctx context.Context, tx *sql.Tx) error {
|
||||
exists, err := tableExists(ctx, tx, "site_targets")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
hasUser, err := columnExists(ctx, tx, "site_targets", "mysql_user")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if hasUser {
|
||||
return nil
|
||||
}
|
||||
_, err = tx.ExecContext(ctx, `ALTER TABLE site_targets ADD COLUMN mysql_user TEXT`)
|
||||
return err
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, m := range migrations {
|
||||
applied, err := migrationApplied(ctx, s.db, m.version)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if applied {
|
||||
continue
|
||||
}
|
||||
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.up(ctx, tx); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return fmt.Errorf("migration %d (%s) failed: %w", m.version, m.name, err)
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `INSERT INTO schema_migrations (version, name) VALUES (?, ?)`, m.version, m.name); err != nil {
|
||||
_ = tx.Rollback()
|
||||
return fmt.Errorf("migration %d (%s) record failed: %w", m.version, m.name, err)
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return fmt.Errorf("migration %d (%s) commit failed: %w", m.version, m.name, err)
|
||||
}
|
||||
s.debugDB("migration applied", zap.Int("version", m.version), zap.String("name", m.name))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func migrationApplied(ctx context.Context, db *sql.DB, version int) (bool, error) {
|
||||
var count int
|
||||
if err := db.QueryRowContext(ctx, `SELECT COUNT(1) FROM schema_migrations WHERE version = ?`, version).Scan(&count); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func tableExists(ctx context.Context, tx *sql.Tx, table string) (bool, error) {
|
||||
var count int
|
||||
if err := tx.QueryRowContext(ctx, `SELECT COUNT(1) FROM sqlite_master WHERE type='table' AND name = ?`, table).Scan(&count); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
func columnExists(ctx context.Context, tx *sql.Tx, table, col string) (bool, error) {
|
||||
rows, err := tx.QueryContext(ctx, "PRAGMA table_info("+table+")")
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var cid int
|
||||
var name string
|
||||
var ctype string
|
||||
var notNull int
|
||||
var dflt sql.NullString
|
||||
var pk int
|
||||
if err := rows.Scan(&cid, &name, &ctype, ¬Null, &dflt, &pk); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if strings.EqualFold(name, col) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
|
@ -2,7 +2,9 @@ package store
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
|
@ -28,6 +30,7 @@ type User struct {
|
|||
|
||||
type Site struct {
|
||||
ID int64
|
||||
SiteUUID string
|
||||
SSHUser string
|
||||
Host string
|
||||
Port int
|
||||
|
|
@ -39,14 +42,20 @@ type Site struct {
|
|||
LastScanState sql.NullString
|
||||
LastScanNotes sql.NullString
|
||||
Targets []SiteTarget
|
||||
Filters []string
|
||||
}
|
||||
|
||||
type SiteTarget struct {
|
||||
Path string
|
||||
Mode string
|
||||
LastSizeByte sql.NullInt64
|
||||
LastScanAt sql.NullTime
|
||||
LastError sql.NullString
|
||||
ID int64
|
||||
Path string
|
||||
Mode string
|
||||
MySQLHost sql.NullString
|
||||
MySQLUser sql.NullString
|
||||
MySQLDB sql.NullString
|
||||
MySQLPassword sql.NullString
|
||||
LastSizeByte sql.NullInt64
|
||||
LastScanAt sql.NullTime
|
||||
LastError sql.NullString
|
||||
}
|
||||
|
||||
type Job struct {
|
||||
|
|
@ -67,12 +76,23 @@ type JobEvent struct {
|
|||
OccurredAt time.Time
|
||||
}
|
||||
|
||||
type JobEventRecord struct {
|
||||
JobID int64
|
||||
SiteID int64
|
||||
JobType string
|
||||
Level string
|
||||
Message string
|
||||
OccurredAt time.Time
|
||||
}
|
||||
|
||||
func Open(path string) (*Store, error) {
|
||||
dsn := fmt.Sprintf("file:%s?_pragma=foreign_keys(1)", path)
|
||||
dsn := fmt.Sprintf("file:%s?_pragma=foreign_keys(1)&_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)", path)
|
||||
db, err := sql.Open("sqlite", dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db.SetMaxOpenConns(8)
|
||||
db.SetMaxIdleConns(8)
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -94,92 +114,7 @@ func (s *Store) SetLogger(logger *zap.Logger) {
|
|||
}
|
||||
|
||||
func (s *Store) migrate(ctx context.Context) error {
|
||||
const usersSQL = `
|
||||
CREATE TABLE IF NOT EXISTS users (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
username TEXT NOT NULL UNIQUE,
|
||||
password_hash TEXT NOT NULL,
|
||||
is_admin INTEGER NOT NULL DEFAULT 0,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);`
|
||||
|
||||
const sessionsSQL = `
|
||||
CREATE TABLE IF NOT EXISTS sessions (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
|
||||
token_hash TEXT NOT NULL UNIQUE,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
expires_at DATETIME NOT NULL
|
||||
);`
|
||||
|
||||
const sitesSQL = `
|
||||
CREATE TABLE IF NOT EXISTS sites (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ssh_user TEXT NOT NULL,
|
||||
host TEXT NOT NULL,
|
||||
port INTEGER NOT NULL DEFAULT 22,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
last_run_status TEXT,
|
||||
last_run_output TEXT,
|
||||
last_run_at DATETIME,
|
||||
last_scan_at DATETIME,
|
||||
last_scan_state TEXT,
|
||||
last_scan_notes TEXT
|
||||
);`
|
||||
|
||||
const siteTargetsSQL = `
|
||||
CREATE TABLE IF NOT EXISTS site_targets (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
|
||||
path TEXT NOT NULL,
|
||||
mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump')),
|
||||
last_size_bytes INTEGER,
|
||||
last_scan_at DATETIME,
|
||||
last_error TEXT,
|
||||
UNIQUE(site_id, path, mode)
|
||||
);`
|
||||
|
||||
const jobsSQL = `
|
||||
CREATE TABLE IF NOT EXISTS jobs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
|
||||
type TEXT NOT NULL,
|
||||
status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'success', 'warning', 'failed')),
|
||||
summary TEXT,
|
||||
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
started_at DATETIME,
|
||||
finished_at DATETIME
|
||||
);`
|
||||
|
||||
const jobEventsSQL = `
|
||||
CREATE TABLE IF NOT EXISTS job_events (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
|
||||
level TEXT NOT NULL,
|
||||
message TEXT NOT NULL,
|
||||
occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
|
||||
);`
|
||||
|
||||
if _, err := s.db.ExecContext(ctx, usersSQL); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, sessionsSQL); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, sitesSQL); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, siteTargetsSQL); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, jobsSQL); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := s.db.ExecContext(ctx, jobEventsSQL); err != nil {
|
||||
return err
|
||||
}
|
||||
s.debugDB("schema migrated")
|
||||
return nil
|
||||
return s.runMigrations(ctx)
|
||||
}
|
||||
|
||||
func (s *Store) CreateUser(ctx context.Context, username, passwordHash string) (User, error) {
|
||||
|
|
@ -294,14 +229,18 @@ func (s *Store) UpdateUserPasswordHash(ctx context.Context, userID int64, passwo
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int, targets []SiteTarget) (Site, error) {
|
||||
func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int, targets []SiteTarget, filters []string) (Site, error) {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
res, err := tx.ExecContext(ctx, `INSERT INTO sites (ssh_user, host, port) VALUES (?, ?, ?)`, sshUser, host, port)
|
||||
siteUUID, err := newSiteUUID()
|
||||
if err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
res, err := tx.ExecContext(ctx, `INSERT INTO sites (site_uuid, ssh_user, host, port) VALUES (?, ?, ?, ?)`, siteUUID, sshUser, host, port)
|
||||
if err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
|
|
@ -313,23 +252,32 @@ func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int,
|
|||
for _, t := range targets {
|
||||
if _, err := tx.ExecContext(
|
||||
ctx,
|
||||
`INSERT INTO site_targets (site_id, path, mode) VALUES (?, ?, ?)`,
|
||||
`INSERT INTO site_targets (site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
id,
|
||||
t.Path,
|
||||
t.Mode,
|
||||
nullStringArg(t.MySQLHost),
|
||||
nullStringArg(t.MySQLUser),
|
||||
nullStringArg(t.MySQLDB),
|
||||
nullStringArg(t.MySQLPassword),
|
||||
); err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
}
|
||||
for _, f := range filters {
|
||||
if _, err := tx.ExecContext(ctx, `INSERT INTO site_filters (site_id, pattern) VALUES (?, ?)`, id, f); err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
s.debugDB("site created", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)))
|
||||
s.debugDB("site created", zap.Int64("site_id", id), zap.String("site_uuid", siteUUID), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)), zap.Int("filters", len(filters)))
|
||||
return s.SiteByID(ctx, id)
|
||||
}
|
||||
|
||||
func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string, port int, targets []SiteTarget) (Site, error) {
|
||||
func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string, port int, targets []SiteTarget, filters []string) (Site, error) {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
return Site{}, err
|
||||
|
|
@ -354,19 +302,31 @@ func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string,
|
|||
for _, t := range targets {
|
||||
if _, err := tx.ExecContext(
|
||||
ctx,
|
||||
`INSERT INTO site_targets (site_id, path, mode) VALUES (?, ?, ?)`,
|
||||
`INSERT INTO site_targets (site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password) VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
id,
|
||||
t.Path,
|
||||
t.Mode,
|
||||
nullStringArg(t.MySQLHost),
|
||||
nullStringArg(t.MySQLUser),
|
||||
nullStringArg(t.MySQLDB),
|
||||
nullStringArg(t.MySQLPassword),
|
||||
); err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
}
|
||||
if _, err := tx.ExecContext(ctx, `DELETE FROM site_filters WHERE site_id = ?`, id); err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
for _, f := range filters {
|
||||
if _, err := tx.ExecContext(ctx, `INSERT INTO site_filters (site_id, pattern) VALUES (?, ?)`, id, f); err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := tx.Commit(); err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
s.debugDB("site updated", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)))
|
||||
s.debugDB("site updated", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)), zap.Int("filters", len(filters)))
|
||||
return s.SiteByID(ctx, id)
|
||||
}
|
||||
|
||||
|
|
@ -386,9 +346,42 @@ func (s *Store) DeleteSite(ctx context.Context, id int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) DeleteSiteTarget(ctx context.Context, siteID, targetID int64) error {
|
||||
res, err := s.db.ExecContext(ctx, `DELETE FROM site_targets WHERE id = ? AND site_id = ?`, targetID, siteID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
affected, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if affected == 0 {
|
||||
return sql.ErrNoRows
|
||||
}
|
||||
s.debugDB("site target deleted", zap.Int64("site_id", siteID), zap.Int64("target_id", targetID))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) AddMySQLDumpTarget(ctx context.Context, siteID int64, dbHost, dbUser, dbName, dbPassword string) error {
|
||||
_, err := s.db.ExecContext(
|
||||
ctx,
|
||||
`INSERT INTO site_targets (site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password) VALUES (?, ?, 'mysql_dump', ?, ?, ?, ?)`,
|
||||
siteID,
|
||||
dbName,
|
||||
dbHost,
|
||||
dbUser,
|
||||
dbName,
|
||||
dbPassword,
|
||||
)
|
||||
if err == nil {
|
||||
s.debugDB("mysql dump target added", zap.Int64("site_id", siteID), zap.String("db_host", dbHost), zap.String("db_user", dbUser), zap.String("db_name", dbName))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) ListSites(ctx context.Context) ([]Site, error) {
|
||||
const q = `
|
||||
SELECT id, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes
|
||||
SELECT id, site_uuid, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes
|
||||
FROM sites
|
||||
ORDER BY id DESC`
|
||||
rows, err := s.db.QueryContext(ctx, q)
|
||||
|
|
@ -411,12 +404,15 @@ ORDER BY id DESC`
|
|||
if err := s.populateTargets(ctx, out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.populateFilters(ctx, out); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) SiteByID(ctx context.Context, id int64) (Site, error) {
|
||||
const q = `
|
||||
SELECT id, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes
|
||||
SELECT id, site_uuid, ssh_user, host, port, created_at, last_run_status, last_run_output, last_run_at, last_scan_at, last_scan_state, last_scan_notes
|
||||
FROM sites
|
||||
WHERE id = ?`
|
||||
site, err := scanSite(s.db.QueryRowContext(ctx, q, id))
|
||||
|
|
@ -428,6 +424,11 @@ WHERE id = ?`
|
|||
return Site{}, err
|
||||
}
|
||||
site.Targets = targets
|
||||
filters, err := s.filtersBySiteID(ctx, id)
|
||||
if err != nil {
|
||||
return Site{}, err
|
||||
}
|
||||
site.Filters = filters
|
||||
return site, nil
|
||||
}
|
||||
|
||||
|
|
@ -529,6 +530,30 @@ func (s *Store) CompleteJob(ctx context.Context, jobID int64, status, summary st
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *Store) CancelQueuedJob(ctx context.Context, jobID int64, summary string) (bool, error) {
|
||||
res, err := s.db.ExecContext(
|
||||
ctx,
|
||||
`UPDATE jobs
|
||||
SET status = 'failed', summary = ?, finished_at = ?
|
||||
WHERE id = ? AND status = 'queued'`,
|
||||
summary,
|
||||
time.Now().UTC().Format(time.RFC3339),
|
||||
jobID,
|
||||
)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
rows, err := res.RowsAffected()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if rows > 0 {
|
||||
s.debugDB("job canceled from queue", zap.Int64("job_id", jobID), zap.String("summary", summary))
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (s *Store) AddJobEvent(ctx context.Context, event JobEvent) error {
|
||||
_, err := s.db.ExecContext(
|
||||
ctx,
|
||||
|
|
@ -571,6 +596,35 @@ LIMIT ?`, limit)
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) ListRecentJobEvents(ctx context.Context, limit int) ([]JobEventRecord, error) {
|
||||
if limit <= 0 {
|
||||
limit = 50
|
||||
}
|
||||
rows, err := s.db.QueryContext(ctx, `
|
||||
SELECT je.job_id, j.site_id, j.type, je.level, je.message, je.occurred_at
|
||||
FROM job_events je
|
||||
JOIN jobs j ON j.id = je.job_id
|
||||
ORDER BY je.id DESC
|
||||
LIMIT ?`, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var out []JobEventRecord
|
||||
for rows.Next() {
|
||||
var ev JobEventRecord
|
||||
if err := rows.Scan(&ev.JobID, &ev.SiteID, &ev.JobType, &ev.Level, &ev.Message, &ev.OccurredAt); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, ev)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) UpdateSiteScanResult(ctx context.Context, siteID int64, state, notes string, scannedAt time.Time, targets []SiteTarget) error {
|
||||
tx, err := s.db.BeginTx(ctx, nil)
|
||||
if err != nil {
|
||||
|
|
@ -651,6 +705,7 @@ func scanSite(row scanner) (Site, error) {
|
|||
var site Site
|
||||
if err := row.Scan(
|
||||
&site.ID,
|
||||
&site.SiteUUID,
|
||||
&site.SSHUser,
|
||||
&site.Host,
|
||||
&site.Port,
|
||||
|
|
@ -681,8 +736,22 @@ func (s *Store) populateTargets(ctx context.Context, sites []Site) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) populateFilters(ctx context.Context, sites []Site) error {
|
||||
if len(sites) == 0 {
|
||||
return nil
|
||||
}
|
||||
filtersBySite, err := s.allFiltersBySiteID(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for i := range sites {
|
||||
sites[i].Filters = filtersBySite[sites[i].ID]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) allTargetsBySiteID(ctx context.Context) (map[int64][]SiteTarget, error) {
|
||||
const q = `SELECT site_id, path, mode, last_size_bytes, last_scan_at, last_error FROM site_targets ORDER BY id ASC`
|
||||
const q = `SELECT site_id, id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error FROM site_targets ORDER BY id ASC`
|
||||
rows, err := s.db.QueryContext(ctx, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -693,7 +762,7 @@ func (s *Store) allTargetsBySiteID(ctx context.Context) (map[int64][]SiteTarget,
|
|||
for rows.Next() {
|
||||
var siteID int64
|
||||
var target SiteTarget
|
||||
if err := rows.Scan(&siteID, &target.Path, &target.Mode, &target.LastSizeByte, &target.LastScanAt, &target.LastError); err != nil {
|
||||
if err := rows.Scan(&siteID, &target.ID, &target.Path, &target.Mode, &target.MySQLHost, &target.MySQLUser, &target.MySQLDB, &target.MySQLPassword, &target.LastSizeByte, &target.LastScanAt, &target.LastError); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[siteID] = append(out[siteID], target)
|
||||
|
|
@ -705,7 +774,7 @@ func (s *Store) allTargetsBySiteID(ctx context.Context) (map[int64][]SiteTarget,
|
|||
}
|
||||
|
||||
func (s *Store) targetsBySiteID(ctx context.Context, siteID int64) ([]SiteTarget, error) {
|
||||
const q = `SELECT path, mode, last_size_bytes, last_scan_at, last_error FROM site_targets WHERE site_id = ? ORDER BY id ASC`
|
||||
const q = `SELECT id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error FROM site_targets WHERE site_id = ? ORDER BY id ASC`
|
||||
rows, err := s.db.QueryContext(ctx, q, siteID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -715,7 +784,7 @@ func (s *Store) targetsBySiteID(ctx context.Context, siteID int64) ([]SiteTarget
|
|||
var out []SiteTarget
|
||||
for rows.Next() {
|
||||
var target SiteTarget
|
||||
if err := rows.Scan(&target.Path, &target.Mode, &target.LastSizeByte, &target.LastScanAt, &target.LastError); err != nil {
|
||||
if err := rows.Scan(&target.ID, &target.Path, &target.Mode, &target.MySQLHost, &target.MySQLUser, &target.MySQLDB, &target.MySQLPassword, &target.LastSizeByte, &target.LastScanAt, &target.LastError); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, target)
|
||||
|
|
@ -726,6 +795,49 @@ func (s *Store) targetsBySiteID(ctx context.Context, siteID int64) ([]SiteTarget
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) allFiltersBySiteID(ctx context.Context) (map[int64][]string, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `SELECT site_id, pattern FROM site_filters ORDER BY id ASC`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := map[int64][]string{}
|
||||
for rows.Next() {
|
||||
var siteID int64
|
||||
var pattern string
|
||||
if err := rows.Scan(&siteID, &pattern); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[siteID] = append(out[siteID], pattern)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) filtersBySiteID(ctx context.Context, siteID int64) ([]string, error) {
|
||||
rows, err := s.db.QueryContext(ctx, `SELECT pattern FROM site_filters WHERE site_id = ? ORDER BY id ASC`, siteID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var out []string
|
||||
for rows.Next() {
|
||||
var pattern string
|
||||
if err := rows.Scan(&pattern); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, pattern)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func boolToInt(v bool) int {
|
||||
if v {
|
||||
return 1
|
||||
|
|
@ -766,3 +878,11 @@ func (s *Store) debugDB(msg string, fields ...zap.Field) {
|
|||
}
|
||||
s.log.Debug(msg, fields...)
|
||||
}
|
||||
|
||||
func newSiteUUID() (string, error) {
|
||||
buf := make([]byte, 16)
|
||||
if _, err := rand.Read(buf); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hex.EncodeToString(buf), nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -99,23 +99,41 @@ func Dashboard(data DashboardData) templ.Component {
|
|||
sizeOrErr := "size pending"
|
||||
if t.LastError.Valid && t.LastError.String != "" {
|
||||
sizeOrErr = "error: " + t.LastError.String
|
||||
} else if t.Mode == "mysql_dump" && t.LastScanAt.Valid {
|
||||
sizeOrErr = "connection established"
|
||||
} else if t.LastSizeByte.Valid {
|
||||
sizeOrErr = formatBytes(t.LastSizeByte.Int64)
|
||||
}
|
||||
targets.WriteString(fmt.Sprintf(`<li><span class="pill %s">%s</span> <code>%s</code><span class="muted inline">%s</span></li>`,
|
||||
label := targetModeLabel(t.Mode)
|
||||
pathText := t.Path
|
||||
if t.Mode == "mysql_dump" {
|
||||
label = "mysql dump"
|
||||
if t.MySQLHost.Valid && t.MySQLUser.Valid && t.MySQLDB.Valid {
|
||||
pathText = fmt.Sprintf("db=%s user=%s host=%s", t.MySQLDB.String, t.MySQLUser.String, t.MySQLHost.String)
|
||||
}
|
||||
}
|
||||
targets.WriteString(fmt.Sprintf(`<li><span class="pill %s">%s</span> <code>%s</code><span class="muted inline">%s</span><form class="inline-form" method="post" action="/sites/%d/targets/%d/delete"><button class="button ghost button-sm" type="submit">Remove</button></form></li>`,
|
||||
targetModeClass(t.Mode),
|
||||
html.EscapeString(targetModeLabel(t.Mode)),
|
||||
html.EscapeString(t.Path),
|
||||
html.EscapeString(sizeOrErr)))
|
||||
html.EscapeString(label),
|
||||
html.EscapeString(pathText),
|
||||
html.EscapeString(sizeOrErr),
|
||||
site.ID,
|
||||
t.ID))
|
||||
}
|
||||
|
||||
sites.WriteString(fmt.Sprintf(`
|
||||
<article class="site-card">
|
||||
<article class="site-card" data-site-id="%d">
|
||||
<div class="site-head">
|
||||
<h3>%s@%s:%d</h3>
|
||||
<div class="row">
|
||||
<form method="post" action="/sites/%d/run">
|
||||
<button class="button" type="submit">Run preflight</button>
|
||||
<button class="button" type="submit">Run backup</button>
|
||||
</form>
|
||||
<form method="post" action="/sites/%d/cancel">
|
||||
<button class="button ghost" type="submit">Cancel active job</button>
|
||||
</form>
|
||||
<form method="post" action="/sites/%d/restart">
|
||||
<button class="button ghost" type="submit">Restart backup</button>
|
||||
</form>
|
||||
<form method="post" action="/sites/%d/delete" onsubmit="return confirm('Delete this site?');">
|
||||
<button class="button ghost" type="submit">Delete</button>
|
||||
|
|
@ -130,29 +148,51 @@ func Dashboard(data DashboardData) templ.Component {
|
|||
<label class="stack"><span>Port</span><input type="number" name="port" min="1" max="65535" value="%d" required /></label>
|
||||
<label class="stack"><span>Directory paths (one per line)</span><textarea name="directory_paths" rows="4">%s</textarea></label>
|
||||
<label class="stack"><span>SQLite DB paths (one per line)</span><textarea name="sqlite_paths" rows="4">%s</textarea></label>
|
||||
<label class="stack"><span>Exclude filters (one per line)</span><textarea name="filters" rows="4">%s</textarea></label>
|
||||
<button class="button" type="submit">Save changes</button>
|
||||
</form>
|
||||
</details>
|
||||
<details class="edit-panel">
|
||||
<summary>Add MySQL dump operation</summary>
|
||||
<form class="grid-2" method="post" action="/sites/%d/mysql-dumps">
|
||||
<label class="stack"><span>DB Host</span><input name="db_host" placeholder="127.0.0.1" required /></label>
|
||||
<label class="stack"><span>DB User</span><input name="db_user" placeholder="backup_user" required /></label>
|
||||
<label class="stack"><span>Database</span><input name="db_name" placeholder="appdb" required /></label>
|
||||
<label class="stack"><span>DB Password</span><input type="password" name="db_password" required /></label>
|
||||
<button class="button" type="submit">Add MySQL dump</button>
|
||||
</form>
|
||||
</details>
|
||||
<p class="muted">Backup targets:</p>
|
||||
<ul class="target-list">%s</ul>
|
||||
<p class="muted">Filters: %s</p>
|
||||
<p class="muted">Scan: <span class="pill %s">%s</span> · Last: %s · Next: %s</p>
|
||||
<p class="muted">%s</p>
|
||||
<p class="muted">Last run: %s</p>
|
||||
<p class="muted">Status: <span class="pill %s">%s</span></p>
|
||||
<pre class="output">%s</pre>
|
||||
<p class="muted">Status: <span class="pill %s" data-site-run-status="%d">%s</span></p>
|
||||
<p class="muted">Current step: <span data-site-current-step="%d">idle</span><span class="throbber hidden" data-site-throbber="%d" aria-hidden="true"></span></p>
|
||||
<div class="row">
|
||||
<button class="button ghost button-sm copy-btn" type="button" data-copy-target="site-output-%d">Copy output</button>
|
||||
</div>
|
||||
<pre class="output" id="site-output-%d" data-site-run-output="%d">%s</pre>
|
||||
</article>`,
|
||||
site.ID,
|
||||
html.EscapeString(site.SSHUser),
|
||||
html.EscapeString(site.Host),
|
||||
site.Port,
|
||||
site.ID,
|
||||
site.ID,
|
||||
site.ID,
|
||||
site.ID,
|
||||
site.ID,
|
||||
html.EscapeString(site.SSHUser),
|
||||
html.EscapeString(site.Host),
|
||||
site.Port,
|
||||
html.EscapeString(joinTargetPaths(site.Targets, "directory")),
|
||||
html.EscapeString(joinTargetPaths(site.Targets, "sqlite_dump")),
|
||||
html.EscapeString(strings.Join(site.Filters, "\n")),
|
||||
site.ID,
|
||||
targets.String(),
|
||||
html.EscapeString(formatFilters(site.Filters)),
|
||||
html.EscapeString(scanState),
|
||||
html.EscapeString(scanState),
|
||||
html.EscapeString(lastScan),
|
||||
|
|
@ -160,7 +200,13 @@ func Dashboard(data DashboardData) templ.Component {
|
|||
html.EscapeString(scanNotes),
|
||||
html.EscapeString(last),
|
||||
html.EscapeString(runStatus),
|
||||
site.ID,
|
||||
html.EscapeString(runStatus),
|
||||
site.ID,
|
||||
site.ID,
|
||||
site.ID,
|
||||
site.ID,
|
||||
site.ID,
|
||||
html.EscapeString(runOutput),
|
||||
))
|
||||
}
|
||||
|
|
@ -208,14 +254,196 @@ func Dashboard(data DashboardData) templ.Component {
|
|||
<label class="stack"><span>Port</span><input type="number" name="port" min="1" max="65535" value="22" required /></label>
|
||||
<label class="stack"><span>Directory paths (one per line)</span><textarea name="directory_paths" placeholder="/var/www /etc/nginx" rows="4"></textarea></label>
|
||||
<label class="stack"><span>SQLite DB paths (will dump, one per line)</span><textarea name="sqlite_paths" placeholder="/srv/app/db/app.sqlite3" rows="4"></textarea></label>
|
||||
<label class="stack"><span>Exclude filters (one per line)</span><textarea name="filters" placeholder="*.tmp node_modules" rows="4"></textarea></label>
|
||||
<button class="button" type="submit">Add site</button>
|
||||
</form>
|
||||
</section>
|
||||
<section>
|
||||
<h2>Live Backup Progress</h2>
|
||||
<p class="muted" id="live-connection">Connecting...</p>
|
||||
<ul class="target-list" id="live-active-jobs">
|
||||
<li class="muted">Waiting for job updates.</li>
|
||||
</ul>
|
||||
<div class="row">
|
||||
<button class="button ghost button-sm copy-btn" type="button" data-copy-target="live-events">Copy live events</button>
|
||||
</div>
|
||||
<pre class="output" id="live-events">No live events yet.</pre>
|
||||
</section>
|
||||
<section>
|
||||
<h2>Managed Sites</h2>
|
||||
%s
|
||||
</section>
|
||||
</main>
|
||||
<script>
|
||||
(function () {
|
||||
const connLabel = document.getElementById("live-connection");
|
||||
const activeJobs = document.getElementById("live-active-jobs");
|
||||
const liveEvents = document.getElementById("live-events");
|
||||
|
||||
function safeText(v, fallback) {
|
||||
const s = (v || "").toString().trim();
|
||||
return s === "" ? fallback : s;
|
||||
}
|
||||
|
||||
function renderJobs(jobs) {
|
||||
const active = jobs.filter((j) => j.status === "queued" || j.status === "running");
|
||||
if (active.length === 0) {
|
||||
activeJobs.innerHTML = '<li class="muted">No queued or running jobs.</li>';
|
||||
return;
|
||||
}
|
||||
activeJobs.innerHTML = active.map((j) => {
|
||||
const summary = safeText(j.summary, "in progress");
|
||||
return "<li><span class=\"pill " + j.status + "\">" + j.status + "</span> Job #" + j.job_id + " site " + j.site_id + " (" + j.type + ") <span class=\"muted inline\">" + summary + "</span> <button class=\"button ghost button-sm cancel-job-btn\" type=\"button\" data-job-id=\"" + j.job_id + "\">Cancel</button></li>";
|
||||
}).join("");
|
||||
}
|
||||
|
||||
function renderEvents(events) {
|
||||
if (!events || events.length === 0) {
|
||||
liveEvents.textContent = "No live events yet.";
|
||||
return;
|
||||
}
|
||||
const lines = events.slice(0, 12).map((e) => {
|
||||
const ts = new Date(e.occurred_at).toLocaleString();
|
||||
return "[" + ts + "] site=" + e.site_id + " job=" + e.job_id + " (" + e.job_type + ") " + e.level + ": " + e.message;
|
||||
});
|
||||
liveEvents.textContent = lines.join("\n");
|
||||
}
|
||||
|
||||
function latestEventByJob(events) {
|
||||
const out = {};
|
||||
(events || []).forEach((e) => {
|
||||
if (out[e.job_id]) {
|
||||
return;
|
||||
}
|
||||
out[e.job_id] = e;
|
||||
});
|
||||
return out;
|
||||
}
|
||||
|
||||
function updateSites(sites, jobs, events) {
|
||||
const activeJobBySite = {};
|
||||
(jobs || []).forEach((j) => {
|
||||
if (activeJobBySite[j.site_id]) {
|
||||
return;
|
||||
}
|
||||
if (j.status === "queued" || j.status === "running") {
|
||||
activeJobBySite[j.site_id] = j;
|
||||
}
|
||||
});
|
||||
const eventByJob = latestEventByJob(events);
|
||||
|
||||
sites.forEach((s) => {
|
||||
const statusEl = document.querySelector("[data-site-run-status=\"" + s.site_id + "\"]");
|
||||
if (statusEl) {
|
||||
const status = safeText(s.last_run_status, "pending");
|
||||
statusEl.textContent = status;
|
||||
statusEl.className = "pill " + status;
|
||||
}
|
||||
const outputEl = document.querySelector("[data-site-run-output=\"" + s.site_id + "\"]");
|
||||
if (outputEl) {
|
||||
outputEl.textContent = safeText(s.last_run_output, "(no output yet)");
|
||||
}
|
||||
const stepEl = document.querySelector("[data-site-current-step=\"" + s.site_id + "\"]");
|
||||
const throbberEl = document.querySelector("[data-site-throbber=\"" + s.site_id + "\"]");
|
||||
const activeJob = activeJobBySite[s.site_id];
|
||||
if (!stepEl || !throbberEl) {
|
||||
return;
|
||||
}
|
||||
if (!activeJob) {
|
||||
stepEl.textContent = "idle";
|
||||
throbberEl.classList.add("hidden");
|
||||
return;
|
||||
}
|
||||
const ev = eventByJob[activeJob.job_id];
|
||||
stepEl.textContent = ev ? ev.message : ("job " + activeJob.status + " (" + activeJob.type + ")");
|
||||
throbberEl.classList.remove("hidden");
|
||||
});
|
||||
}
|
||||
|
||||
function connect() {
|
||||
const proto = window.location.protocol === "https:" ? "wss" : "ws";
|
||||
const ws = new WebSocket(proto + "://" + window.location.host + "/ws/progress");
|
||||
|
||||
ws.onopen = function () {
|
||||
connLabel.textContent = "Live updates connected.";
|
||||
};
|
||||
ws.onclose = function () {
|
||||
connLabel.textContent = "Disconnected. Reconnecting...";
|
||||
setTimeout(connect, 1500);
|
||||
};
|
||||
ws.onerror = function () {
|
||||
connLabel.textContent = "Live updates error.";
|
||||
};
|
||||
ws.onmessage = function (evt) {
|
||||
let payload;
|
||||
try {
|
||||
payload = JSON.parse(evt.data);
|
||||
} catch (_) {
|
||||
return;
|
||||
}
|
||||
const sites = payload.sites || [];
|
||||
const jobs = payload.jobs || [];
|
||||
const events = payload.events || [];
|
||||
updateSites(sites, jobs, events);
|
||||
renderJobs(jobs);
|
||||
renderEvents(events);
|
||||
};
|
||||
}
|
||||
|
||||
function setupCopyButtons() {
|
||||
document.querySelectorAll(".copy-btn").forEach((btn) => {
|
||||
btn.addEventListener("click", function () {
|
||||
const targetID = btn.getAttribute("data-copy-target");
|
||||
if (!targetID || !navigator.clipboard) {
|
||||
return;
|
||||
}
|
||||
const el = document.getElementById(targetID);
|
||||
if (!el) {
|
||||
return;
|
||||
}
|
||||
const text = (el.textContent || "").trim();
|
||||
navigator.clipboard.writeText(text).then(function () {
|
||||
const original = btn.textContent;
|
||||
btn.textContent = "Copied";
|
||||
setTimeout(function () {
|
||||
btn.textContent = original;
|
||||
}, 900);
|
||||
});
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function setupActiveJobActions() {
|
||||
activeJobs.addEventListener("click", function (evt) {
|
||||
const target = evt.target;
|
||||
if (!target || !target.classList || !target.classList.contains("cancel-job-btn")) {
|
||||
return;
|
||||
}
|
||||
const jobID = target.getAttribute("data-job-id");
|
||||
if (!jobID) {
|
||||
return;
|
||||
}
|
||||
target.disabled = true;
|
||||
target.textContent = "Canceling...";
|
||||
fetch("/jobs/" + jobID + "/cancel", { method: "POST", credentials: "same-origin" })
|
||||
.then(function (res) {
|
||||
if (!res.ok) {
|
||||
target.textContent = "Cancel failed";
|
||||
return;
|
||||
}
|
||||
target.textContent = "Cancel sent";
|
||||
})
|
||||
.catch(function () {
|
||||
target.textContent = "Cancel failed";
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
setupCopyButtons();
|
||||
setupActiveJobActions();
|
||||
connect();
|
||||
})();
|
||||
</script>
|
||||
</body>
|
||||
</html>`,
|
||||
html.EscapeString(data.User.Username),
|
||||
|
|
@ -234,7 +462,13 @@ func formatFlash(code string) string {
|
|||
case "site-added":
|
||||
return "Site added."
|
||||
case "job-queued":
|
||||
return "Preflight job queued."
|
||||
return "Backup job queued."
|
||||
case "job-cancel-requested":
|
||||
return "Cancel requested for active job."
|
||||
case "job-restarted":
|
||||
return "Backup restart queued."
|
||||
case "no-active-job":
|
||||
return "No active job to cancel."
|
||||
case "site-updated":
|
||||
return "Site updated."
|
||||
case "site-deleted":
|
||||
|
|
@ -247,6 +481,14 @@ func formatFlash(code string) string {
|
|||
return "Port must be an integer between 1 and 65535."
|
||||
case "password-updated":
|
||||
return "Password updated."
|
||||
case "mysql-added":
|
||||
return "MySQL dump operation added."
|
||||
case "mysql-invalid":
|
||||
return "MySQL host, user, database, and password are required."
|
||||
case "target-deleted":
|
||||
return "Target removed."
|
||||
case "target-not-found":
|
||||
return "Target not found."
|
||||
default:
|
||||
return code
|
||||
}
|
||||
|
|
@ -256,6 +498,9 @@ func targetModeLabel(mode string) string {
|
|||
if mode == "sqlite_dump" {
|
||||
return "sqlite dump"
|
||||
}
|
||||
if mode == "mysql_dump" {
|
||||
return "mysql dump"
|
||||
}
|
||||
return "directory"
|
||||
}
|
||||
|
||||
|
|
@ -269,8 +514,15 @@ func joinTargetPaths(targets []store.SiteTarget, mode string) string {
|
|||
return strings.Join(out, "\n")
|
||||
}
|
||||
|
||||
func formatFilters(filters []string) string {
|
||||
if len(filters) == 0 {
|
||||
return "(none)"
|
||||
}
|
||||
return strings.Join(filters, ", ")
|
||||
}
|
||||
|
||||
func targetModeClass(mode string) string {
|
||||
if mode == "sqlite_dump" {
|
||||
if mode == "sqlite_dump" || mode == "mysql_dump" {
|
||||
return "sqlite"
|
||||
}
|
||||
return "ok"
|
||||
|
|
|
|||
|
|
@ -92,6 +92,11 @@ h1 {
|
|||
border-color: var(--border);
|
||||
}
|
||||
|
||||
.button.button-sm {
|
||||
padding: 0.35rem 0.55rem;
|
||||
font-size: 0.82rem;
|
||||
}
|
||||
|
||||
input {
|
||||
width: 100%;
|
||||
border: 1px solid var(--border);
|
||||
|
|
@ -226,6 +231,26 @@ h2 {
|
|||
color: #fecaca;
|
||||
}
|
||||
|
||||
.pill.warning {
|
||||
border-color: #f59e0b;
|
||||
color: #fde68a;
|
||||
}
|
||||
|
||||
.pill.success {
|
||||
border-color: #22c55e;
|
||||
color: #86efac;
|
||||
}
|
||||
|
||||
.pill.running {
|
||||
border-color: #38bdf8;
|
||||
color: #bae6fd;
|
||||
}
|
||||
|
||||
.pill.queued {
|
||||
border-color: #a78bfa;
|
||||
color: #ddd6fe;
|
||||
}
|
||||
|
||||
.pill.partial {
|
||||
border-color: #f59e0b;
|
||||
color: #fde68a;
|
||||
|
|
@ -277,6 +302,32 @@ textarea {
|
|||
margin-left: 0.5rem;
|
||||
}
|
||||
|
||||
.inline-form {
|
||||
display: inline-block;
|
||||
margin-left: 0.5rem;
|
||||
}
|
||||
|
||||
.throbber {
|
||||
display: inline-block;
|
||||
width: 0.9rem;
|
||||
height: 0.9rem;
|
||||
margin-left: 0.45rem;
|
||||
border: 2px solid color-mix(in srgb, #38bdf8 35%, transparent);
|
||||
border-top-color: #38bdf8;
|
||||
border-radius: 50%;
|
||||
animation: spin 0.8s linear infinite;
|
||||
vertical-align: -1px;
|
||||
}
|
||||
|
||||
.hidden {
|
||||
display: none;
|
||||
}
|
||||
|
||||
@keyframes spin {
|
||||
from { transform: rotate(0deg); }
|
||||
to { transform: rotate(360deg); }
|
||||
}
|
||||
|
||||
a {
|
||||
color: #7dd3fc;
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue