many fixes

This commit is contained in:
2026-07-03 11:18:40 +07:00
parent d909618ced
commit 79fd200e57
12 changed files with 348 additions and 18 deletions
+2 -1
View File
@@ -217,7 +217,8 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
"dst_login": a.DstLogin, "dst_host": dstEP.Host, "dst_port": dstEP.Port,
}})
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
_ = o.store.SetAccountError(ctx, a.ID, "") // clear any error from a previous run
_ = o.store.SetAccountError(ctx, a.ID, "") // clear any error from a previous run
_ = o.store.ResetAccountCounters(ctx, a.ID) // start from zero; IncAccountCounters is additive
// Per-account cancellable context: IMAP work uses actx (so CancelAccount
// stops it); DB writes keep the parent ctx so status/counters persist even
+9
View File
@@ -79,6 +79,15 @@ func (s *Store) SetAccountStatus(ctx context.Context, id int64, status string) e
return err
}
// ResetAccountCounters zeroes the per-account copied/skipped/error counts at the
// start of a run so a re-run reflects only the current run's totals instead of
// accumulating on top of the previous run (IncAccountCounters is additive).
func (s *Store) ResetAccountCounters(ctx context.Context, id int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE accounts SET copied_count=0, skipped_count=0, error_count=0 WHERE id=$1`, id)
return err
}
func (s *Store) IncAccountCounters(ctx context.Context, id, copied, skipped, errs int64) error {
_, err := s.Pool.Exec(ctx,
`UPDATE accounts SET copied_count=copied_count+$2,
+36
View File
@@ -28,3 +28,39 @@ func TestMigratedIdempotency(t *testing.T) {
t.Fatal("unknown key must be false")
}
}
// A re-run must start from zero counters, not accumulate on top of the previous
// run's totals — otherwise a run that skips everything still shows the prior
// run's Copied count. ResetAccountCounters is called at the start of runAccount.
func TestResetAccountCounters(t *testing.T) {
s := testStore(t)
ctx := context.Background()
epSrc, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "a", Port: 993, TLSMode: "ssl"})
epDst, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "dst", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: epSrc, DstEndpointID: epDst})
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "u2", DstPassEnc: "y"})
// First run: everything copied.
if err := s.IncAccountCounters(ctx, accID, 100, 0, 1); err != nil {
t.Fatalf("inc: %v", err)
}
// Second run starts: counters reset to zero.
if err := s.ResetAccountCounters(ctx, accID); err != nil {
t.Fatalf("reset: %v", err)
}
// Second run skips everything.
if err := s.IncAccountCounters(ctx, accID, 0, 100, 0); err != nil {
t.Fatalf("inc2: %v", err)
}
accs, err := s.ListAccountsByTask(ctx, taskID)
if err != nil || len(accs) != 1 {
t.Fatalf("list: %v len=%d", err, len(accs))
}
a := accs[0]
if a.Copied != 0 || a.Skipped != 100 || a.Errors != 0 {
t.Fatalf("after reset+rerun: copied=%d skipped=%d errors=%d want 0/100/0",
a.Copied, a.Skipped, a.Errors)
}
}