feat(store): task schedule columns, run trigger, scheduling queries
This commit is contained in:
@@ -174,7 +174,7 @@ func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
|
|||||||
_ = o.store.SetTaskStatus(ctx, taskID, "error")
|
_ = o.store.SetTaskStatus(ctx, taskID, "error")
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
runID, err := o.store.CreateRun(ctx, taskID)
|
runID, err := o.store.CreateRun(ctx, taskID, "manual")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = o.store.SetTaskStatus(ctx, taskID, "error")
|
_ = o.store.SetTaskStatus(ctx, taskID, "error")
|
||||||
return 0, err
|
return 0, err
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ func TestDeleteTaskCascades(t *testing.T) {
|
|||||||
ep2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"})
|
ep2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"})
|
||||||
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: ep1, DstEndpointID: ep2})
|
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: ep1, DstEndpointID: ep2})
|
||||||
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"})
|
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"})
|
||||||
_, _ = s.CreateRun(ctx, taskID)
|
_, _ = s.CreateRun(ctx, taskID, "manual")
|
||||||
|
|
||||||
if err := s.DeleteTask(ctx, taskID); err != nil {
|
if err := s.DeleteTask(ctx, taskID); err != nil {
|
||||||
t.Fatalf("delete task: %v", err)
|
t.Fatalf("delete task: %v", err)
|
||||||
|
|||||||
+46
-9
@@ -1,20 +1,26 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type Run struct {
|
type Run struct {
|
||||||
ID int64
|
ID int64 `json:"id"`
|
||||||
TaskID int64
|
TaskID int64 `json:"task_id"`
|
||||||
Status string
|
Status string `json:"status"`
|
||||||
TotalCopied int64
|
StartedAt time.Time `json:"started_at"`
|
||||||
TotalSkipped int64
|
FinishedAt *time.Time `json:"finished_at"`
|
||||||
TotalErrors int64
|
TotalCopied int64 `json:"total_copied"`
|
||||||
|
TotalSkipped int64 `json:"total_skipped"`
|
||||||
|
TotalErrors int64 `json:"total_errors"`
|
||||||
|
Trigger string `json:"trigger"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) CreateRun(ctx context.Context, taskID int64) (int64, error) {
|
func (s *Store) CreateRun(ctx context.Context, taskID int64, trigger string) (int64, error) {
|
||||||
var id int64
|
var id int64
|
||||||
err := s.Pool.QueryRow(ctx,
|
err := s.Pool.QueryRow(ctx,
|
||||||
`INSERT INTO runs (task_id) VALUES ($1) RETURNING id`, taskID).Scan(&id)
|
`INSERT INTO runs (task_id, trigger) VALUES ($1,$2) RETURNING id`, taskID, trigger).Scan(&id)
|
||||||
return id, err
|
return id, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -25,3 +31,34 @@ func (s *Store) FinishRun(ctx context.Context, id int64, status string, copied,
|
|||||||
id, status, copied, skipped, errs)
|
id, status, copied, skipped, errs)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ListRunsByTask returns a task's runs, newest first, for the run-log modal.
|
||||||
|
func (s *Store) ListRunsByTask(ctx context.Context, taskID int64) ([]Run, error) {
|
||||||
|
rows, err := s.Pool.Query(ctx,
|
||||||
|
`SELECT id, task_id, started_at, finished_at, status,
|
||||||
|
total_copied, total_skipped, total_errors, trigger
|
||||||
|
FROM runs WHERE task_id=$1 ORDER BY id DESC`, taskID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
out := []Run{}
|
||||||
|
for rows.Next() {
|
||||||
|
var r Run
|
||||||
|
if err := rows.Scan(&r.ID, &r.TaskID, &r.StartedAt, &r.FinishedAt, &r.Status,
|
||||||
|
&r.TotalCopied, &r.TotalSkipped, &r.TotalErrors, &r.Trigger); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, r)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// LastFinishedRunAt returns the most recent finished run's timestamp, or nil if
|
||||||
|
// the task has never completed a run — the baseline for the next scheduled run.
|
||||||
|
func (s *Store) LastFinishedRunAt(ctx context.Context, taskID int64) (*time.Time, error) {
|
||||||
|
var t *time.Time
|
||||||
|
err := s.Pool.QueryRow(ctx,
|
||||||
|
`SELECT max(finished_at) FROM runs WHERE task_id=$1 AND finished_at IS NOT NULL`, taskID).Scan(&t)
|
||||||
|
return t, err
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,81 @@
|
|||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestTaskScheduleAndBroken(t *testing.T) {
|
||||||
|
s := testStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
|
||||||
|
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
|
||||||
|
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
|
||||||
|
|
||||||
|
// Defaults: no schedule, not broken.
|
||||||
|
tk, _ := s.GetTask(ctx, taskID)
|
||||||
|
if tk.ScheduleIntervalSeconds != 0 || tk.Broken || tk.ScheduleAnchor != nil {
|
||||||
|
t.Fatalf("defaults: interval=%d broken=%v anchor=%v", tk.ScheduleIntervalSeconds, tk.Broken, tk.ScheduleAnchor)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enable → interval set, anchor stamped, appears in schedulable list.
|
||||||
|
if err := s.SetTaskSchedule(ctx, taskID, 3600); err != nil {
|
||||||
|
t.Fatalf("set schedule: %v", err)
|
||||||
|
}
|
||||||
|
tk, _ = s.GetTask(ctx, taskID)
|
||||||
|
if tk.ScheduleIntervalSeconds != 3600 || tk.ScheduleAnchor == nil {
|
||||||
|
t.Fatalf("after enable: interval=%d anchor=%v", tk.ScheduleIntervalSeconds, tk.ScheduleAnchor)
|
||||||
|
}
|
||||||
|
sch, _ := s.ListSchedulableTasks(ctx)
|
||||||
|
if len(sch) != 1 || sch[0].ID != taskID || sch[0].IntervalSeconds != 3600 || sch[0].LastFinished != nil {
|
||||||
|
t.Fatalf("schedulable: %+v", sch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Breaker → broken true, interval 0, drops out of schedulable list.
|
||||||
|
if err := s.SetTaskBroken(ctx, taskID); err != nil {
|
||||||
|
t.Fatalf("set broken: %v", err)
|
||||||
|
}
|
||||||
|
tk, _ = s.GetTask(ctx, taskID)
|
||||||
|
if !tk.Broken || tk.ScheduleIntervalSeconds != 0 {
|
||||||
|
t.Fatalf("after break: broken=%v interval=%d", tk.Broken, tk.ScheduleIntervalSeconds)
|
||||||
|
}
|
||||||
|
if sch, _ := s.ListSchedulableTasks(ctx); len(sch) != 0 {
|
||||||
|
t.Fatalf("broken task still schedulable: %+v", sch)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Re-enable clears broken.
|
||||||
|
_ = s.SetTaskSchedule(ctx, taskID, 21600)
|
||||||
|
if tk, _ = s.GetTask(ctx, taskID); tk.Broken {
|
||||||
|
t.Fatalf("re-enable did not clear broken")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListRunsByTaskWithTrigger(t *testing.T) {
|
||||||
|
s := testStore(t)
|
||||||
|
ctx := context.Background()
|
||||||
|
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
|
||||||
|
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
|
||||||
|
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
|
||||||
|
|
||||||
|
r1, _ := s.CreateRun(ctx, taskID, "manual")
|
||||||
|
_ = s.FinishRun(ctx, r1, "done", 5, 1, 0)
|
||||||
|
_, _ = s.CreateRun(ctx, taskID, "scheduled") // still running (no finish)
|
||||||
|
|
||||||
|
runs, err := s.ListRunsByTask(ctx, taskID)
|
||||||
|
if err != nil || len(runs) != 2 {
|
||||||
|
t.Fatalf("list runs: %v len=%d", err, len(runs))
|
||||||
|
}
|
||||||
|
// newest first
|
||||||
|
if runs[0].Trigger != "scheduled" || runs[0].FinishedAt != nil {
|
||||||
|
t.Fatalf("run[0]: %+v", runs[0])
|
||||||
|
}
|
||||||
|
if runs[1].Trigger != "manual" || runs[1].FinishedAt == nil || runs[1].TotalCopied != 5 {
|
||||||
|
t.Fatalf("run[1]: %+v", runs[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Last finished-at reflects the finished manual run.
|
||||||
|
lf, _ := s.LastFinishedRunAt(ctx, taskID)
|
||||||
|
if lf == nil {
|
||||||
|
t.Fatalf("LastFinishedRunAt nil, want the finished run's time")
|
||||||
|
}
|
||||||
|
}
|
||||||
+72
-11
@@ -1,14 +1,20 @@
|
|||||||
package store
|
package store
|
||||||
|
|
||||||
import "context"
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
type Task struct {
|
type Task struct {
|
||||||
ID int64 `json:"id"`
|
ID int64 `json:"id"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
SrcEndpointID int64 `json:"src_endpoint_id"`
|
SrcEndpointID int64 `json:"src_endpoint_id"`
|
||||||
DstEndpointID int64 `json:"dst_endpoint_id"`
|
DstEndpointID int64 `json:"dst_endpoint_id"`
|
||||||
Status string `json:"status"`
|
Status string `json:"status"`
|
||||||
FolderMapping map[string]string `json:"folder_mapping"`
|
FolderMapping map[string]string `json:"folder_mapping"`
|
||||||
|
ScheduleIntervalSeconds int64 `json:"schedule_interval_seconds"`
|
||||||
|
ScheduleAnchor *time.Time `json:"-"`
|
||||||
|
Broken bool `json:"broken"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) {
|
func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) {
|
||||||
@@ -26,9 +32,11 @@ func (s *Store) CreateTask(ctx context.Context, t Task) (int64, error) {
|
|||||||
func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) {
|
func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) {
|
||||||
var t Task
|
var t Task
|
||||||
err := s.Pool.QueryRow(ctx,
|
err := s.Pool.QueryRow(ctx,
|
||||||
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
|
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
|
||||||
|
schedule_interval_seconds, schedule_anchor, broken
|
||||||
FROM tasks WHERE id=$1`, id).
|
FROM tasks WHERE id=$1`, id).
|
||||||
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping)
|
Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
|
||||||
|
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken)
|
||||||
return t, err
|
return t, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -40,7 +48,8 @@ func (s *Store) DeleteTask(ctx context.Context, id int64) error {
|
|||||||
|
|
||||||
func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
|
func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
|
||||||
rows, err := s.Pool.Query(ctx,
|
rows, err := s.Pool.Query(ctx,
|
||||||
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping
|
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping,
|
||||||
|
schedule_interval_seconds, schedule_anchor, broken
|
||||||
FROM tasks ORDER BY id DESC`)
|
FROM tasks ORDER BY id DESC`)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -49,7 +58,8 @@ func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
|
|||||||
out := []Task{}
|
out := []Task{}
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var t Task
|
var t Task
|
||||||
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping); err != nil {
|
if err := rows.Scan(&t.ID, &t.Name, &t.SrcEndpointID, &t.DstEndpointID, &t.Status, &t.FolderMapping,
|
||||||
|
&t.ScheduleIntervalSeconds, &t.ScheduleAnchor, &t.Broken); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out = append(out, t)
|
out = append(out, t)
|
||||||
@@ -80,3 +90,54 @@ func (s *Store) TryMarkTaskRunning(ctx context.Context, id int64) (bool, error)
|
|||||||
}
|
}
|
||||||
return ct.RowsAffected() == 1, nil
|
return ct.RowsAffected() == 1, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetTaskSchedule sets the recurrence interval (0 = off), stamps the anchor when
|
||||||
|
// enabling (the first run is one interval after this), and clears broken — any
|
||||||
|
// explicit schedule change un-breaks the task.
|
||||||
|
func (s *Store) SetTaskSchedule(ctx context.Context, id, intervalSeconds int64) error {
|
||||||
|
_, err := s.Pool.Exec(ctx,
|
||||||
|
`UPDATE tasks SET schedule_interval_seconds=$2,
|
||||||
|
schedule_anchor = CASE WHEN $2 > 0 THEN now() ELSE schedule_anchor END,
|
||||||
|
broken = false
|
||||||
|
WHERE id=$1`, id, intervalSeconds)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTaskBroken trips the schedule breaker: disables the schedule and flags the
|
||||||
|
// task for operator attention.
|
||||||
|
func (s *Store) SetTaskBroken(ctx context.Context, id int64) error {
|
||||||
|
_, err := s.Pool.Exec(ctx,
|
||||||
|
`UPDATE tasks SET broken=true, schedule_interval_seconds=0 WHERE id=$1`, id)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SchedulableTask is the per-tick decision input for the scheduler.
|
||||||
|
type SchedulableTask struct {
|
||||||
|
ID int64
|
||||||
|
IntervalSeconds int64
|
||||||
|
Anchor time.Time
|
||||||
|
LastFinished *time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListSchedulableTasks returns tasks eligible to auto-run: schedule on, not
|
||||||
|
// broken, not currently running — each joined with its last finished run time.
|
||||||
|
func (s *Store) ListSchedulableTasks(ctx context.Context) ([]SchedulableTask, error) {
|
||||||
|
rows, err := s.Pool.Query(ctx,
|
||||||
|
`SELECT t.id, t.schedule_interval_seconds, t.schedule_anchor,
|
||||||
|
(SELECT max(finished_at) FROM runs r WHERE r.task_id=t.id AND r.finished_at IS NOT NULL)
|
||||||
|
FROM tasks t
|
||||||
|
WHERE t.schedule_interval_seconds > 0 AND NOT t.broken AND t.status <> 'running'`)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
out := []SchedulableTask{}
|
||||||
|
for rows.Next() {
|
||||||
|
var st SchedulableTask
|
||||||
|
if err := rows.Scan(&st.ID, &st.IntervalSeconds, &st.Anchor, &st.LastFinished); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
out = append(out, st)
|
||||||
|
}
|
||||||
|
return out, rows.Err()
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,4 @@
|
|||||||
|
ALTER TABLE runs DROP COLUMN trigger;
|
||||||
|
ALTER TABLE tasks DROP COLUMN broken;
|
||||||
|
ALTER TABLE tasks DROP COLUMN schedule_anchor;
|
||||||
|
ALTER TABLE tasks DROP COLUMN schedule_interval_seconds;
|
||||||
@@ -0,0 +1,4 @@
|
|||||||
|
ALTER TABLE tasks ADD COLUMN schedule_interval_seconds INT NOT NULL DEFAULT 0;
|
||||||
|
ALTER TABLE tasks ADD COLUMN schedule_anchor TIMESTAMPTZ;
|
||||||
|
ALTER TABLE tasks ADD COLUMN broken BOOLEAN NOT NULL DEFAULT false;
|
||||||
|
ALTER TABLE runs ADD COLUMN trigger TEXT NOT NULL DEFAULT 'manual';
|
||||||
Reference in New Issue
Block a user