diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index b6b0be0..2836b85 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -174,7 +174,7 @@ func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err } - runID, err := o.store.CreateRun(ctx, taskID) + runID, err := o.store.CreateRun(ctx, taskID, "manual") if err != nil { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err diff --git a/internal/store/crud_test.go b/internal/store/crud_test.go index 1b865dc..fdf3392 100644 --- a/internal/store/crud_test.go +++ b/internal/store/crud_test.go @@ -49,7 +49,7 @@ func TestDeleteTaskCascades(t *testing.T) { ep2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"}) taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: ep1, DstEndpointID: ep2}) accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"}) - _, _ = s.CreateRun(ctx, taskID) + _, _ = s.CreateRun(ctx, taskID, "manual") if err := s.DeleteTask(ctx, taskID); err != nil { t.Fatalf("delete task: %v", err) diff --git a/internal/store/runs.go b/internal/store/runs.go index ac0075a..43ba984 100644 --- a/internal/store/runs.go +++ b/internal/store/runs.go @@ -1,20 +1,26 @@ package store -import "context" +import ( + "context" + "time" +) type Run struct { - ID int64 - TaskID int64 - Status string - TotalCopied int64 - TotalSkipped int64 - TotalErrors int64 + ID int64 `json:"id"` + TaskID int64 `json:"task_id"` + Status string `json:"status"` + StartedAt time.Time `json:"started_at"` + FinishedAt *time.Time `json:"finished_at"` + TotalCopied int64 `json:"total_copied"` + TotalSkipped int64 `json:"total_skipped"` + TotalErrors int64 `json:"total_errors"` + Trigger string `json:"trigger"` } -func (s *Store) CreateRun(ctx context.Context, taskID int64) (int64, error) { +func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) { var id int64 err := s.Pool.QueryRow(ctx, - `INSERT INTO runs (task_id) VALUES ($1) RETURNING id`, taskID).Scan(&id) + `INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id) return id, err } @@ -25,3 +31,34 @@ func (s *Store) FinishRun(ctx context.Context, id int64, status string, copied, id, status, copied, skipped, errs) return err } + +// ListRunsByTask returns a task's runs, newest first, for the run-log modal. +func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) { + rows, err := s.Pool.Query(ctx, + `SELECT id, task_id, started_at, finished_at, status, + total_copied, total_skipped, total_errors, trigger + FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID) + if err != nil { + return nil, err + } + defer rows.Close() + out := []Run{} + for rows.Next() { + var r Run + if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status, + &r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil { + return nil, err + } + out = append(out, r) + } + return out, rows.Err() +} + +// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if +// the task has never completed a run — the baseline for the next scheduled run. +func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) { + var t *time.Time + err := s.Pool.QueryRow(ctx, + `SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t) + return t, err +} diff --git a/internal/store/scheduling_test.go b/internal/store/scheduling_test.go new file mode 100644 index 0000000..71f98f7 --- /dev/null +++ b/internal/store/scheduling_test.go @@ -0,0 +1,81 @@ +package store + +import ( + "context" + "testing" +) + +func TestTaskScheduleAndBroken(t *testing.T) { + s := testStore(t) + ctx := context.Background() + epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"}) + epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"}) + taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst}) + + // Defaults: no schedule, not broken. + tk, _ := s.GetTask(ctx, taskID) + if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil { + t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor) + } + + // Enable → interval set, anchor stamped, appears in schedulable list. + if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil { + t.Fatalf("set schedule: %v", err) + } + tk, _ = s.GetTask(ctx, taskID) + if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil { + t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor) + } + sch, _ := s.ListSchedulableTasks(ctx) + if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil { + t.Fatalf("schedulable: %+v", sch) + } + + // Breaker → broken true, interval 0, drops out of schedulable list. + if err := s.SetTaskBroken(ctx, taskID); err != nil { + t.Fatalf("set broken: %v", err) + } + tk, _ = s.GetTask(ctx, taskID) + if !tk.Broken || tk.ScheduleIntervalSeconds != 0 { + t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds) + } + if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 { + t.Fatalf("broken task still schedulable: %+v", sch) + } + + // Re-enable clears broken. + _ = s.SetTaskSchedule(ctx, taskID, 21600) + if tk, _ = s.GetTask(ctx, taskID); tk.Broken { + t.Fatalf("re-enable did not clear broken") + } +} + +func TestListRunsByTaskWithTrigger(t *testing.T) { + s := testStore(t) + ctx := context.Background() + epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"}) + epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"}) + taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst}) + + r1, _ := s.CreateRun(ctx, taskID, "manual") + _ = s.FinishRun(ctx, r1, "done", 5, 1, 0) + _, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish) + + runs, err := s.ListRunsByTask(ctx, taskID) + if err != nil || len(runs) != 2 { + t.Fatalf("list runs: %v len=%d", err, len(runs)) + } + // newest first + if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil { + t.Fatalf("run[0]: %+v", runs[0]) + } + if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 { + t.Fatalf("run[1]: %+v", runs[1]) + } + + // Last finished-at reflects the finished manual run. + lf, _ := s.LastFinishedRunAt(ctx, taskID) + if lf == nil { + t.Fatalf("LastFinishedRunAt nil, want the finished run's time") + } +} diff --git a/internal/store/tasks.go b/internal/store/tasks.go index b0b8b86..4646aa6 100644 --- a/internal/store/tasks.go +++ b/internal/store/tasks.go @@ -1,14 +1,20 @@ package store -import "context" +import ( + "context" + "time" +) type Task struct { - ID int64 `json:"id"` - Name string `json:"name"` - SrcEndpointID int64 `json:"src_endpoint_id"` - DstEndpointID int64 `json:"dst_endpoint_id"` - Status string `json:"status"` - FolderMapping map[string]string `json:"folder_mapping"` + ID int64 `json:"id"` + Name string `json:"name"` + SrcEndpointID int64 `json:"src_endpoint_id"` + DstEndpointID int64 `json:"dst_endpoint_id"` + Status string `json:"status"` + FolderMapping map[string]string `json:"folder_mapping"` + ScheduleIntervalSeconds int64 `json:"schedule_interval_seconds"` + ScheduleAnchor *time.Time `json:"-"` + Broken bool `json:"broken"` } func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) { @@ -26,9 +32,11 @@ func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) { func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) { var t Task err := s.Pool.QueryRow(ctx, - `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping + `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping, + schedule_interval_seconds, schedule_anchor, broken FROM tasks WHERE id=$1`, id). - Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping) + Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping, + &t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken) return t, err } @@ -40,7 +48,8 @@ func (s *Store) DeleteTask(ctx context.Context, id int64) error { func (s *Store) ListTasks(ctx context.Context) ([]Task, error) { rows, err := s.Pool.Query(ctx, - `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping + `SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping, + schedule_interval_seconds, schedule_anchor, broken FROM tasks ORDER BY id DESC`) if err != nil { return nil, err @@ -49,7 +58,8 @@ func (s *Store) ListTasks(ctx context.Context) ([]Task, error) { out := []Task{} for rows.Next() { var t Task - if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping); err != nil { + if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping, + &t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil { return nil, err } out = append(out, t) @@ -80,3 +90,54 @@ func (s *Store) TryMarkTaskRunning(ctx context.Context, id int64) (bool, error) } return ct.RowsAffected() == 1, nil } + +// SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when +// enabling (the first run is one interval after this), and clears broken — any +// explicit schedule change un-breaks the task. +func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error { + _, err := s.Pool.Exec(ctx, + `UPDATE tasks SET schedule_interval_seconds=$2, + schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END, + broken = false + WHERE id=$1`, id, intervalSeconds) + return err +} + +// SetTaskBroken trips the schedule breaker: disables the schedule and flags the +// task for operator attention. +func (s *Store) SetTaskBroken(ctx context.Context, id int64) error { + _, err := s.Pool.Exec(ctx, + `UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id) + return err +} + +// SchedulableTask is the per-tick decision input for the scheduler. +type SchedulableTask struct { + ID int64 + IntervalSeconds int64 + Anchor time.Time + LastFinished *time.Time +} + +// ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not +// broken, not currently running — each joined with its last finished run time. +func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) { + rows, err := s.Pool.Query(ctx, + `SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor, + (SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL) + FROM tasks t + WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`) + if err != nil { + return nil, err + } + defer rows.Close() + out := []SchedulableTask{} + for rows.Next() { + var st SchedulableTask + if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil { + return nil, err + } + out = append(out, st) + } + return out, rows.Err() +} diff --git a/migrations/0004_task_scheduling.down.sql b/migrations/0004_task_scheduling.down.sql new file mode 100644 index 0000000..6973261 --- /dev/null +++ b/migrations/0004_task_scheduling.down.sql @@ -0,0 +1,4 @@ +ALTER TABLE runs DROP COLUMN trigger; +ALTER TABLE tasks DROP COLUMN broken; +ALTER TABLE tasks DROP COLUMN schedule_anchor; +ALTER TABLE tasks DROP COLUMN schedule_interval_seconds; diff --git a/migrations/0004_task_scheduling.up.sql b/migrations/0004_task_scheduling.up.sql new file mode 100644 index 0000000..526e4f9 --- /dev/null +++ b/migrations/0004_task_scheduling.up.sql @@ -0,0 +1,4 @@ +ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0; +ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ; +ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false; +ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual';