satoru/internal/store/migrations.go

445 lines
13 KiB
Go

package store
import (
"context"
"database/sql"
"fmt"
"strings"
"go.uber.org/zap"
)
type migration struct {
version int
name string
up func(context.Context, *sql.Tx) error
}
func (s *Store) runMigrations(ctx context.Context) error {
if _, err := s.db.ExecContext(ctx, `
CREATE TABLE IF NOT EXISTS schema_migrations (
version INTEGER PRIMARY KEY,
name TEXT NOT NULL,
applied_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)`); err != nil {
return err
}
migrations := []migration{
{
version: 1,
name: "initial_schema",
up: func(ctx context.Context, tx *sql.Tx) error {
stmts := []string{
`CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
is_admin INTEGER NOT NULL DEFAULT 0,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)`,
`CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token_hash TEXT NOT NULL UNIQUE,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
expires_at DATETIME NOT NULL
)`,
`CREATE TABLE IF NOT EXISTS sites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_uuid TEXT NOT NULL UNIQUE,
ssh_user TEXT NOT NULL,
host TEXT NOT NULL,
port INTEGER NOT NULL DEFAULT 22,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_run_status TEXT,
last_run_output TEXT,
last_run_at DATETIME,
last_scan_at DATETIME,
last_scan_state TEXT,
last_scan_notes TEXT
)`,
`CREATE TABLE IF NOT EXISTS site_targets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
path TEXT NOT NULL,
mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump', 'mysql_dump', 'podman_save', 'podman_export')),
mysql_host TEXT,
mysql_user TEXT,
mysql_db TEXT,
mysql_password TEXT,
last_size_bytes INTEGER,
last_scan_at DATETIME,
last_error TEXT,
UNIQUE(site_id, path, mode)
)`,
`CREATE TABLE IF NOT EXISTS jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
type TEXT NOT NULL,
status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'success', 'warning', 'failed')),
summary TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
started_at DATETIME,
finished_at DATETIME
)`,
`CREATE TABLE IF NOT EXISTS job_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
level TEXT NOT NULL,
message TEXT NOT NULL,
occurred_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
)`,
}
for _, stmt := range stmts {
if _, err := tx.ExecContext(ctx, stmt); err != nil {
return err
}
}
return nil
},
},
{
version: 2,
name: "ensure_sites_site_uuid",
up: func(ctx context.Context, tx *sql.Tx) error {
exists, err := tableExists(ctx, tx, "sites")
if err != nil {
return err
}
if !exists {
return nil
}
hasUUID, err := columnExists(ctx, tx, "sites", "site_uuid")
if err != nil {
return err
}
if !hasUUID {
if _, err := tx.ExecContext(ctx, `ALTER TABLE sites ADD COLUMN site_uuid TEXT`); err != nil {
return err
}
}
rows, err := tx.QueryContext(ctx, `SELECT id, site_uuid FROM sites`)
if err != nil {
return err
}
defer rows.Close()
var idsNeedingUUID []int64
for rows.Next() {
var id int64
var uuid sql.NullString
if err := rows.Scan(&id, &uuid); err != nil {
return err
}
if !uuid.Valid || strings.TrimSpace(uuid.String) == "" {
idsNeedingUUID = append(idsNeedingUUID, id)
}
}
if err := rows.Err(); err != nil {
return err
}
for _, id := range idsNeedingUUID {
uuid, err := newSiteUUID()
if err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `UPDATE sites SET site_uuid = ? WHERE id = ?`, uuid, id); err != nil {
return err
}
}
if _, err := tx.ExecContext(ctx, `CREATE UNIQUE INDEX IF NOT EXISTS idx_sites_site_uuid ON sites(site_uuid)`); err != nil {
return err
}
return nil
},
},
{
version: 3,
name: "add_runtime_query_indexes",
up: func(ctx context.Context, tx *sql.Tx) error {
stmts := []string{
`CREATE INDEX IF NOT EXISTS idx_jobs_status_id ON jobs(status, id)`,
`CREATE INDEX IF NOT EXISTS idx_jobs_site_type_id ON jobs(site_id, type, id DESC)`,
`CREATE INDEX IF NOT EXISTS idx_job_events_job_id_id ON job_events(job_id, id DESC)`,
`CREATE INDEX IF NOT EXISTS idx_sessions_expires_at ON sessions(expires_at)`,
}
for _, stmt := range stmts {
if _, err := tx.ExecContext(ctx, stmt); err != nil {
return err
}
}
return nil
},
},
{
version: 4,
name: "site_filters",
up: func(ctx context.Context, tx *sql.Tx) error {
stmts := []string{
`CREATE TABLE IF NOT EXISTS site_filters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
pattern TEXT NOT NULL,
UNIQUE(site_id, pattern)
)`,
`CREATE INDEX IF NOT EXISTS idx_site_filters_site_id ON site_filters(site_id, id)`,
}
for _, stmt := range stmts {
if _, err := tx.ExecContext(ctx, stmt); err != nil {
return err
}
}
return nil
},
},
{
version: 5,
name: "site_targets_mysql_dump",
up: func(ctx context.Context, tx *sql.Tx) error {
exists, err := tableExists(ctx, tx, "site_targets")
if err != nil {
return err
}
if !exists {
return nil
}
if _, err := tx.ExecContext(ctx, `CREATE TABLE site_targets_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
path TEXT NOT NULL,
mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump', 'mysql_dump', 'podman_save', 'podman_export')),
mysql_host TEXT,
mysql_user TEXT,
mysql_db TEXT,
mysql_password TEXT,
last_size_bytes INTEGER,
last_scan_at DATETIME,
last_error TEXT,
UNIQUE(site_id, path, mode)
)`); err != nil {
return err
}
hasMySQLHost, err := columnExists(ctx, tx, "site_targets", "mysql_host")
if err != nil {
return err
}
if hasMySQLHost {
if _, err := tx.ExecContext(ctx, `
INSERT INTO site_targets_new (id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error)
SELECT id, site_id, path, mode, mysql_host, NULL, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error
FROM site_targets`); err != nil {
return err
}
} else {
if _, err := tx.ExecContext(ctx, `
INSERT INTO site_targets_new (id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error)
SELECT id, site_id, path, mode, NULL, NULL, NULL, NULL, last_size_bytes, last_scan_at, last_error
FROM site_targets`); err != nil {
return err
}
}
if _, err := tx.ExecContext(ctx, `DROP TABLE site_targets`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `ALTER TABLE site_targets_new RENAME TO site_targets`); err != nil {
return err
}
return nil
},
},
{
version: 6,
name: "site_targets_mysql_user",
up: func(ctx context.Context, tx *sql.Tx) error {
exists, err := tableExists(ctx, tx, "site_targets")
if err != nil {
return err
}
if !exists {
return nil
}
hasUser, err := columnExists(ctx, tx, "site_targets", "mysql_user")
if err != nil {
return err
}
if hasUser {
return nil
}
_, err = tx.ExecContext(ctx, `ALTER TABLE site_targets ADD COLUMN mysql_user TEXT`)
return err
},
},
{
version: 7,
name: "site_targets_podman_save_mode",
up: func(ctx context.Context, tx *sql.Tx) error {
exists, err := tableExists(ctx, tx, "site_targets")
if err != nil {
return err
}
if !exists {
return nil
}
if _, err := tx.ExecContext(ctx, `CREATE TABLE site_targets_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
path TEXT NOT NULL,
mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump', 'mysql_dump', 'podman_save', 'podman_export')),
mysql_host TEXT,
mysql_user TEXT,
mysql_db TEXT,
mysql_password TEXT,
last_size_bytes INTEGER,
last_scan_at DATETIME,
last_error TEXT,
UNIQUE(site_id, path, mode)
)`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO site_targets_new (id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error)
SELECT id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error
FROM site_targets`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `DROP TABLE site_targets`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `ALTER TABLE site_targets_new RENAME TO site_targets`); err != nil {
return err
}
return nil
},
},
{
version: 8,
name: "site_targets_podman_export_mode",
up: func(ctx context.Context, tx *sql.Tx) error {
exists, err := tableExists(ctx, tx, "site_targets")
if err != nil {
return err
}
if !exists {
return nil
}
if _, err := tx.ExecContext(ctx, `CREATE TABLE site_targets_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
site_id INTEGER NOT NULL REFERENCES sites(id) ON DELETE CASCADE,
path TEXT NOT NULL,
mode TEXT NOT NULL CHECK(mode IN ('directory', 'sqlite_dump', 'mysql_dump', 'podman_save', 'podman_export')),
mysql_host TEXT,
mysql_user TEXT,
mysql_db TEXT,
mysql_password TEXT,
last_size_bytes INTEGER,
last_scan_at DATETIME,
last_error TEXT,
UNIQUE(site_id, path, mode)
)`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO site_targets_new (id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error)
SELECT id, site_id, path, mode, mysql_host, mysql_user, mysql_db, mysql_password, last_size_bytes, last_scan_at, last_error
FROM site_targets`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `DROP TABLE site_targets`); err != nil {
return err
}
if _, err := tx.ExecContext(ctx, `ALTER TABLE site_targets_new RENAME TO site_targets`); err != nil {
return err
}
return nil
},
},
}
for _, m := range migrations {
applied, err := migrationApplied(ctx, s.db, m.version)
if err != nil {
return err
}
if applied {
continue
}
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
if err := m.up(ctx, tx); err != nil {
_ = tx.Rollback()
return fmt.Errorf("migration %d (%s) failed: %w", m.version, m.name, err)
}
if _, err := tx.ExecContext(ctx, `INSERT INTO schema_migrations (version, name) VALUES (?, ?)`, m.version, m.name); err != nil {
_ = tx.Rollback()
return fmt.Errorf("migration %d (%s) record failed: %w", m.version, m.name, err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("migration %d (%s) commit failed: %w", m.version, m.name, err)
}
s.debugDB("migration applied", zap.Int("version", m.version), zap.String("name", m.name))
}
return nil
}
func migrationApplied(ctx context.Context, db *sql.DB, version int) (bool, error) {
var count int
if err := db.QueryRowContext(ctx, `SELECT COUNT(1) FROM schema_migrations WHERE version = ?`, version).Scan(&count); err != nil {
return false, err
}
return count > 0, nil
}
func tableExists(ctx context.Context, tx *sql.Tx, table string) (bool, error) {
var count int
if err := tx.QueryRowContext(ctx, `SELECT COUNT(1) FROM sqlite_master WHERE type='table' AND name = ?`, table).Scan(&count); err != nil {
return false, err
}
return count > 0, nil
}
func columnExists(ctx context.Context, tx *sql.Tx, table, col string) (bool, error) {
rows, err := tx.QueryContext(ctx, "PRAGMA table_info("+table+")")
if err != nil {
return false, err
}
defer rows.Close()
for rows.Next() {
var cid int
var name string
var ctype string
var notNull int
var dflt sql.NullString
var pk int
if err := rows.Scan(&cid, &name, &ctype, &notNull, &dflt, &pk); err != nil {
return false, err
}
if strings.EqualFold(name, col) {
return true, nil
}
}
if err := rows.Err(); err != nil {
return false, err
}
return false, nil
}