This commit is contained in:
Peter Li 2026-02-07 19:51:17 -08:00
parent a3b9681938
commit 556935a59e
10 changed files with 253 additions and 14 deletions

51
AGENTS.md Normal file
View File

@ -0,0 +1,51 @@
# Satoru Agent Notes
## Project Intent
Satoru is a restic-based backup control plane for Linux hosts over SSH.
Core model:
1. Satoru pulls from edge hosts (no direct edge->B2 writes).
2. Stages data locally.
3. Runs restic locally for deduplicated snapshots.
4. Syncs restic data to B2 (scheduled).
## Current Direction
1. Background job execution for site operations.
2. Preflight-first workflow before full backup runs.
3. Strong operational visibility through structured logs and job events.
## Run The Server
```bash
cd /Users/peterli/git/satoru
go run ./cmd/satoru
```
Default URL:
- `http://localhost:8080`
## Logging
Satoru uses structured logging and supports:
1. `LOG_FORMAT=console|json` (default `json`)
2. `LOG_LEVEL=debug|info|warn|error` (default `info`)
3. `LOG_FILE=/path/to/file.log` (optional; logs still go to stdout)
Examples:
```bash
LOG_FORMAT=console LOG_LEVEL=debug go run ./cmd/satoru
```
```bash
LOG_FORMAT=json LOG_LEVEL=debug LOG_FILE=./logs/satoru.log go run ./cmd/satoru
tail -f ./logs/satoru.log | jq
```
## Debug Logging Expectation
Be proactive with debug-level logging for:
1. DB state changes (job/site/session mutations).
2. Job lifecycle transitions and step boundaries.
3. Scan and backup target-level decisions/results.
4. External command start/finish, duration, and failures.
Logs should include useful identifiers where possible:
- `job_id`, `site_id`, `job_type`, `target_path`, `target_mode`, `status`, `error`.

View File

@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"log"
"os/exec"
"strconv"
"strings"
@ -13,6 +12,8 @@ import (
"time"
"satoru/internal/store"
"go.uber.org/zap"
)
const (
@ -45,7 +46,7 @@ func (a *app) runWorkerLoop(ctx context.Context, workerID int) {
case <-ticker.C:
job, ok, err := a.store.TryStartNextQueuedJob(ctx)
if err != nil {
log.Printf("worker %d: failed to start job: %v", workerID, err)
a.log.Warn("worker failed to start job", zap.Int("worker_id", workerID), zap.Error(err))
continue
}
if !ok {
@ -57,9 +58,11 @@ func (a *app) runWorkerLoop(ctx context.Context, workerID int) {
}
func (a *app) executeJob(ctx context.Context, job store.Job) {
a.log.Info("job start", zap.Int64("job_id", job.ID), zap.Int64("site_id", job.SiteID), zap.String("job_type", job.Type))
site, err := a.store.SiteByID(ctx, job.SiteID)
if err != nil {
_ = a.store.CompleteJob(ctx, job.ID, "failed", "failed to load site")
a.log.Error("job failed to load site", zap.Int64("job_id", job.ID), zap.Int64("site_id", job.SiteID), zap.Error(err))
return
}
@ -71,10 +74,12 @@ func (a *app) executeJob(ctx context.Context, job store.Job) {
status, summary := a.runPreflightJob(ctx, job, site)
_ = a.store.CompleteJob(ctx, job.ID, status, summary)
_ = a.store.UpdateSiteRunResult(ctx, site.ID, status, summary, time.Now())
a.log.Info("job completed", zap.Int64("job_id", job.ID), zap.Int64("site_id", site.ID), zap.String("job_type", job.Type), zap.String("status", status), zap.String("summary", summary))
default:
summary := "unknown job type"
_ = a.store.CompleteJob(ctx, job.ID, "failed", summary)
_ = a.store.UpdateSiteRunResult(ctx, site.ID, "failed", summary, time.Now())
a.log.Warn("job unknown type", zap.Int64("job_id", job.ID), zap.String("job_type", job.Type))
}
}

52
cmd/satoru/logging.go Normal file
View File

@ -0,0 +1,52 @@
package main
import (
"os"
"path/filepath"
"strings"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
func buildLogger() (*zap.Logger, func(), error) {
cfg := zap.NewProductionConfig()
cfg.Encoding = "json"
cfg.Level = zap.NewAtomicLevelAt(parseLogLevel(os.Getenv("LOG_LEVEL")))
cfg.OutputPaths = []string{"stdout"}
cfg.ErrorOutputPaths = []string{"stderr"}
if strings.EqualFold(strings.TrimSpace(os.Getenv("LOG_FORMAT")), "console") {
cfg = zap.NewDevelopmentConfig()
cfg.Level = zap.NewAtomicLevelAt(parseLogLevel(os.Getenv("LOG_LEVEL")))
cfg.OutputPaths = []string{"stdout"}
cfg.ErrorOutputPaths = []string{"stderr"}
}
if filePath := strings.TrimSpace(os.Getenv("LOG_FILE")); filePath != "" {
if err := os.MkdirAll(filepath.Dir(filePath), 0o755); err != nil {
return nil, nil, err
}
cfg.OutputPaths = append(cfg.OutputPaths, filePath)
}
logger, err := cfg.Build()
if err != nil {
return nil, nil, err
}
cleanup := func() { _ = logger.Sync() }
return logger, cleanup, nil
}
func parseLogLevel(v string) zapcore.Level {
switch strings.ToLower(strings.TrimSpace(v)) {
case "debug":
return zap.DebugLevel
case "warn":
return zap.WarnLevel
case "error":
return zap.ErrorLevel
default:
return zap.InfoLevel
}
}

View File

@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"errors"
"log"
"net/http"
"os"
"path/filepath"
@ -14,6 +13,7 @@ import (
"github.com/a-h/templ"
"github.com/go-chi/chi/v5"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
"satoru/internal/store"
@ -29,20 +29,27 @@ const (
type app struct {
store *store.Store
log *zap.Logger
}
func main() {
logger, cleanup, err := buildLogger()
if err != nil {
panic(err)
}
defer cleanup()
if err := os.MkdirAll("data", 0o755); err != nil {
log.Fatal(err)
logger.Fatal("failed to create data directory", zap.Error(err))
}
dbPath := filepath.Join("data", "satoru.db")
st, err := store.Open(dbPath)
if err != nil {
log.Fatal(err)
logger.Fatal("failed to open store", zap.Error(err), zap.String("db_path", dbPath))
}
defer st.Close()
a := &app{store: st}
a := &app{store: st, log: logger}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -68,9 +75,9 @@ func main() {
r.Post("/signout", a.handleSignoutSubmit)
addr := ":8080"
log.Printf("satoru listening on http://localhost%s", addr)
logger.Info("satoru listening", zap.String("addr", addr))
if err := http.ListenAndServe(addr, r); err != nil {
log.Fatal(err)
logger.Fatal("http server exited", zap.Error(err))
}
}

View File

@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"log"
"os/exec"
"strconv"
"strings"
@ -13,6 +12,8 @@ import (
"satoru/internal/store"
"satoru/internal/webui"
"go.uber.org/zap"
)
func runtimeChecks() []webui.RuntimeCheck {
@ -64,7 +65,7 @@ func (a *app) startSiteScanLoop(ctx context.Context) {
func (a *app) scanAllSites(ctx context.Context) {
sites, err := a.store.ListSites(ctx)
if err != nil {
log.Printf("scan loop: failed to list sites: %v", err)
a.log.Warn("scan loop failed to list sites", zap.Error(err))
return
}
for _, site := range sites {
@ -75,7 +76,7 @@ func (a *app) scanAllSites(ctx context.Context) {
func (a *app) scanDueSites(ctx context.Context) {
sites, err := a.store.ListSites(ctx)
if err != nil {
log.Printf("scan loop: failed to list sites: %v", err)
a.log.Warn("scan due failed to list sites", zap.Error(err))
return
}
@ -91,7 +92,7 @@ func (a *app) scanDueSites(ctx context.Context) {
func (a *app) scanSiteNow(ctx context.Context, siteID int64) {
site, err := a.store.SiteByID(ctx, siteID)
if err != nil {
log.Printf("scan site %d: load failed: %v", siteID, err)
a.log.Warn("scan site load failed", zap.Int64("site_id", siteID), zap.Error(err))
return
}
@ -125,7 +126,7 @@ func (a *app) scanSiteNow(ctx context.Context, siteID int64) {
}
notes := fmt.Sprintf("%d/%d targets scanned", success, len(site.Targets))
if err := a.store.UpdateSiteScanResult(ctx, site.ID, state, notes, scannedAt, updated); err != nil {
log.Printf("scan site %d: update failed: %v", siteID, err)
a.log.Warn("scan site update failed", zap.Int64("site_id", siteID), zap.Error(err))
}
}

2
go.mod
View File

@ -5,6 +5,7 @@ go 1.25.7
require (
github.com/a-h/templ v0.3.977
github.com/go-chi/chi/v5 v5.2.5
go.uber.org/zap v1.27.1
golang.org/x/crypto v0.47.0
modernc.org/sqlite v1.44.3
)
@ -15,6 +16,7 @@ require (
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/ncruces/go-strftime v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect
golang.org/x/sys v0.40.0 // indirect
modernc.org/libc v1.67.6 // indirect

14
go.sum
View File

@ -1,5 +1,7 @@
github.com/a-h/templ v0.3.977 h1:kiKAPXTZE2Iaf8JbtM21r54A8bCNsncrfnokZZSrSDg=
github.com/a-h/templ v0.3.977/go.mod h1:oCZcnKRf5jjsGpf2yELzQfodLphd2mwecwG4Crk5HBo=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug=
@ -16,8 +18,18 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w=
github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc=
go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.47.0 h1:V6e3FRj+n4dbpw86FJ8Fv7XVOql7TEwpHapKoMJ/GO8=
golang.org/x/crypto v0.47.0/go.mod h1:ff3Y9VzzKbwSSEzWqJsJVBnWmRwRSHt/6Op5n9bQc4A=
golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY=
@ -31,6 +43,8 @@ golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ=
golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis=
modernc.org/cc/v4 v4.27.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0=
modernc.org/ccgo/v4 v4.30.1 h1:4r4U1J6Fhj98NKfSjnPUN7Ze2c6MnAdL0hWw6+LrJpc=

View File

@ -8,11 +8,14 @@ import (
"strings"
"time"
"go.uber.org/zap"
_ "modernc.org/sqlite"
)
type Store struct {
db *sql.DB
log *zap.Logger
}
type User struct {
@ -86,6 +89,10 @@ func (s *Store) Close() error {
return s.db.Close()
}
func (s *Store) SetLogger(logger *zap.Logger) {
s.log = logger
}
func (s *Store) migrate(ctx context.Context) error {
const usersSQL = `
CREATE TABLE IF NOT EXISTS users (
@ -171,6 +178,7 @@ CREATE TABLE IF NOT EXISTS job_events (
if _, err := s.db.ExecContext(ctx, jobEventsSQL); err != nil {
return err
}
s.debugDB("schema migrated")
return nil
}
@ -214,6 +222,7 @@ func (s *Store) CreateUser(ctx context.Context, username, passwordHash string) (
if err := tx.Commit(); err != nil {
return User{}, err
}
s.debugDB("user created", zap.Int64("user_id", user.ID), zap.String("username", user.Username), zap.Bool("is_admin", user.IsAdmin))
return user, nil
}
@ -242,11 +251,17 @@ func (s *Store) CreateSession(ctx context.Context, userID int64, tokenHash strin
tokenHash,
expiresAt.UTC().Format(time.RFC3339),
)
if err == nil {
s.debugDB("session created", zap.Int64("user_id", userID), zap.Time("expires_at", expiresAt.UTC()))
}
return err
}
func (s *Store) DeleteSessionByTokenHash(ctx context.Context, tokenHash string) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM sessions WHERE token_hash = ?`, tokenHash)
if err == nil {
s.debugDB("session deleted")
}
return err
}
@ -267,11 +282,15 @@ func (s *Store) TouchSessionByTokenHash(ctx context.Context, tokenHash string, e
if rows == 0 {
return sql.ErrNoRows
}
s.debugDB("session touched", zap.Time("expires_at", expiresAt.UTC()))
return nil
}
func (s *Store) UpdateUserPasswordHash(ctx context.Context, userID int64, passwordHash string) error {
_, err := s.db.ExecContext(ctx, `UPDATE users SET password_hash = ? WHERE id = ?`, passwordHash, userID)
if err == nil {
s.debugDB("user password updated", zap.Int64("user_id", userID))
}
return err
}
@ -306,6 +325,7 @@ func (s *Store) CreateSite(ctx context.Context, sshUser, host string, port int,
if err := tx.Commit(); err != nil {
return Site{}, err
}
s.debugDB("site created", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)))
return s.SiteByID(ctx, id)
}
@ -346,6 +366,7 @@ func (s *Store) UpdateSite(ctx context.Context, id int64, sshUser, host string,
if err := tx.Commit(); err != nil {
return Site{}, err
}
s.debugDB("site updated", zap.Int64("site_id", id), zap.String("ssh_user", sshUser), zap.String("host", host), zap.Int("port", port), zap.Int("targets", len(targets)))
return s.SiteByID(ctx, id)
}
@ -361,6 +382,7 @@ func (s *Store) DeleteSite(ctx context.Context, id int64) error {
if affected == 0 {
return sql.ErrNoRows
}
s.debugDB("site deleted", zap.Int64("site_id", id))
return nil
}
@ -418,6 +440,9 @@ func (s *Store) UpdateSiteRunResult(ctx context.Context, id int64, status, outpu
at.UTC().Format(time.RFC3339),
id,
)
if err == nil {
s.debugDB("site run updated", zap.Int64("site_id", id), zap.String("status", status), zap.Time("at", at.UTC()))
}
return err
}
@ -435,6 +460,7 @@ func (s *Store) CreateJob(ctx context.Context, siteID int64, jobType string) (Jo
if err != nil {
return Job{}, err
}
s.debugDB("job created", zap.Int64("job_id", id), zap.Int64("site_id", siteID), zap.String("job_type", jobType))
return s.JobByID(ctx, id)
}
@ -484,6 +510,7 @@ WHERE id = ?`, id))
if err := tx.Commit(); err != nil {
return Job{}, false, err
}
s.debugDB("job started", zap.Int64("job_id", job.ID), zap.Int64("site_id", job.SiteID), zap.String("job_type", job.Type))
return job, true, nil
}
@ -496,6 +523,9 @@ func (s *Store) CompleteJob(ctx context.Context, jobID int64, status, summary st
time.Now().UTC().Format(time.RFC3339),
jobID,
)
if err == nil {
s.debugDB("job completed", zap.Int64("job_id", jobID), zap.String("status", status), zap.String("summary", summary))
}
return err
}
@ -507,6 +537,9 @@ func (s *Store) AddJobEvent(ctx context.Context, event JobEvent) error {
event.Level,
event.Message,
)
if err == nil {
s.debugDB("job event added", zap.Int64("job_id", event.JobID), zap.String("level", event.Level), zap.String("message", event.Message))
}
return err
}
@ -570,6 +603,7 @@ func (s *Store) UpdateSiteScanResult(ctx context.Context, siteID int64, state, n
); err != nil {
return err
}
s.debugDB("site scan updated", zap.Int64("site_id", siteID), zap.String("state", state), zap.Int("targets", len(targets)), zap.Time("scanned_at", scannedAt.UTC()))
return tx.Commit()
}
@ -725,3 +759,10 @@ func timeOrNil(v sql.NullTime) any {
}
return nil
}
func (s *Store) debugDB(msg string, fields ...zap.Field) {
if s.log == nil {
return
}
s.log.Debug(msg, fields...)
}

1
launchdev.sh Executable file
View File

@ -0,0 +1 @@
LOG_FORMAT=console LOG_LEVEL=debug go run ./cmd/satoru

65
plan/backup-service.md Normal file
View File

@ -0,0 +1,65 @@
# Satoru Backup Service Plan
## Scope
Build a Linux-over-SSH backup system where Satoru pulls edge data locally, snapshots it into a local restic repo, and syncs that repo to B2.
## Locked Decisions
1. Pull model only: edge hosts never push to B2 directly.
2. Directory targets use `rsync`.
3. SQLite targets run remote `.backup`, compress, pull, and cleanup.
4. Staging path: `./backups/<site_uuid>/<target_hash>/` (single persistent path per target).
5. Site runs are background jobs; each site job is serialized, but multiple sites can run concurrently.
6. Partial target failure does not stop the whole site job; site health becomes `warning`.
7. Retention is restic-only (`forget --prune`), no tar archive layer.
## Pipeline
1. Preflight job:
- SSH connectivity/auth.
- Remote tool/path checks (rsync/sqlite3 as needed).
- Local tool checks (`ssh`, `rsync`, `restic`, `gzip`).
- SQLite preflight validates access/temp write capability only.
2. Backup job:
- Pull sqlite artifacts.
- Pull directory targets with rsync.
- `restic backup` against local staging.
- Update health and job status (`success|warning|failed`).
3. Retention job:
- `restic forget --prune` per policy.
4. Sync job:
- restic-native sync/copy to B2 repo on schedule.
## Minimal Data Model
1. `sites`: `site_uuid`, health fields, last preflight/scan.
2. `site_targets`: mode (`directory|sqlite_dump`), path/hash, last scan metadata.
3. `jobs`: type (`preflight|backup|restic_sync`), status, timing, attempts.
4. `job_events`: structured logs per step.
5. `sync_state`: last sync status/timestamp/error.
## Runtime Paths
1. Staging: `./backups/<site_uuid>/<target_hash>/`
2. Local restic repo: `./repos/restic`
## Security Defaults
Recommended: `0700` directories, `0600` files, dedicated `satoru` system user.
## Required Config
1. `staging_root`
2. `restic_repo_path`
3. `restic_password_file` or secret source
4. `restic_retention_policy`
5. `restic_sync_interval_hours`
6. `restic_b2_repository`
7. `restic_b2_account_id` / `restic_b2_account_key` secret source
8. `job_worker_concurrency`
9. `site_scan_interval_hours` (default 24)
## Build Order
1. Phase 1: queue tables + workers + Run->background + preflight-only.
2. Phase 2: sqlite pull + rsync pull + local restic backup.
3. Phase 3: restic retention + scheduled B2 sync + sync health UI.
4. Phase 4: restore UX + retries/backoff + alerts/observability.
## Operational Risks
1. Disk pressure from staging + restic repo -> enforce headroom checks.
2. SSH/command variability -> clear per-target errors and preflight gating.
3. Long-running jobs -> heartbeat, timeout, retry state.