Files
imap-copier/internal/orchestrator/orchestrator.go
T
vasyansk 477077d9e3 fix: consistent error reporting + reset mapping modal per account
Error consistency: folder-level failures were counted only toward the task's
done_with_errors status, not the account's error_count, so a task showed
DONE_WITH_ERRORS while its only account showed 0 errors / DONE. Now folder
errors increment the account counter and the account status becomes
done_with_errors when errs>0.

Visibility: persist accounts.last_error (migration 0002) so the failing folder
/ login error survives a page reload (shown red under the source login);
cleared at the start of each run.

Modal reset: the folder-mapping modal kept its selections across opens, so
adding a second account showed the first account's mapping. It now mounts
fresh per add (conditional render + key), reflecting the newly-probed folders.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
2026-07-02 15:14:11 +07:00

412 lines
14 KiB
Go

package orchestrator
import (
"context"
"errors"
"log/slog"
"sync"
"time"
"github.com/vasyansk/imap-copier/internal/crypto"
"github.com/vasyansk/imap-copier/internal/imapx"
"github.com/vasyansk/imap-copier/internal/store"
"github.com/vasyansk/imap-copier/internal/wshub"
)
var ErrNotTested = errors.New("accounts not fully tested")
var ErrAlreadyRunning = errors.New("task already running")
type Orchestrator struct {
store *store.Store
hub *wshub.Hub
encKey []byte
concurrency int
mu sync.Mutex
cancels map[int64]context.CancelFunc // account_id -> cancel of its in-flight copy
}
func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator {
return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency, cancels: map[int64]context.CancelFunc{}}
}
// CancelAccount aborts the in-flight copy for one account, if it is running.
// Returns true if a running copy was found and signalled to stop.
func (o *Orchestrator) CancelAccount(accountID int64) bool {
o.mu.Lock()
cancel, ok := o.cancels[accountID]
o.mu.Unlock()
if ok {
cancel()
}
return ok
}
func (o *Orchestrator) registerCancel(accountID int64, cancel context.CancelFunc) {
o.mu.Lock()
o.cancels[accountID] = cancel
o.mu.Unlock()
}
func (o *Orchestrator) unregisterCancel(accountID int64) {
o.mu.Lock()
delete(o.cancels, accountID)
o.mu.Unlock()
}
func gateOK(accs []store.Account) bool {
if len(accs) == 0 {
return false
}
for _, a := range accs {
if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" {
return false
}
}
return true
}
func (o *Orchestrator) endpoints(ctx context.Context, task store.Task) (imapx.Endpoint, imapx.Endpoint, error) {
src, err := o.store.GetEndpoint(ctx, task.SrcEndpointID)
if err != nil {
return imapx.Endpoint{}, imapx.Endpoint{}, err
}
dst, err := o.store.GetEndpoint(ctx, task.DstEndpointID)
if err != nil {
return imapx.Endpoint{}, imapx.Endpoint{}, err
}
toEP := func(e store.Endpoint) imapx.Endpoint {
return imapx.Endpoint{Host: e.Host, Port: e.Port, TLSMode: e.TLSMode}
}
return toEP(src), toEP(dst), nil
}
func (o *Orchestrator) TestAccounts(ctx context.Context, taskID int64) error {
task, err := o.store.GetTask(ctx, taskID)
if err != nil {
return err
}
srcEP, dstEP, err := o.endpoints(ctx, task)
if err != nil {
return err
}
accs, err := o.store.ListAccountsByTask(ctx, taskID)
if err != nil {
return err
}
for _, a := range accs {
o.testSide(ctx, srcEP, a.ID, "src", a.SrcLogin, a.SrcPassEnc, taskID)
o.testSide(ctx, dstEP, a.ID, "dst", a.DstLogin, a.DstPassEnc, taskID)
}
return nil
}
func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID int64, side, login, passEnc string, taskID int64) {
status := "ok"
errMsg := ""
pass, err := crypto.Decrypt(o.encKey, passEnc)
if err == nil {
_, err = imapx.TestLogin(ctx, ep, login, string(pass))
}
if err != nil {
status = "fail"
errMsg = err.Error()
slog.Warn("account test failed", "account", accID, "side", side,
"login", login, "host", ep.Host, "port", ep.Port, "err", err)
}
_ = o.store.SetAccountTestStatus(ctx, accID, side, status)
o.hub.Publish(wshub.Event{Type: "account_test", TaskID: taskID,
Data: map[string]any{
"account_id": accID, "side": side, "status": status,
"login": login, "host": ep.Host, "port": ep.Port, "error": errMsg,
}})
}
func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
task, err := o.store.GetTask(ctx, taskID)
if err != nil {
return 0, err
}
accs, err := o.store.ListAccountsByTask(ctx, taskID)
if err != nil {
return 0, err
}
if !gateOK(accs) {
return 0, ErrNotTested
}
acquired, err := o.store.TryMarkTaskRunning(ctx, taskID)
if err != nil {
return 0, err
}
if !acquired {
return 0, ErrAlreadyRunning
}
srcEP, dstEP, err := o.endpoints(ctx, task)
if err != nil {
_ = o.store.SetTaskStatus(ctx, taskID, "error")
return 0, err
}
runID, err := o.store.CreateRun(ctx, taskID)
if err != nil {
_ = o.store.SetTaskStatus(ctx, taskID, "error")
return 0, err
}
o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}})
go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP)
return runID, nil
}
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) {
defer func() {
if r := recover(); r != nil {
slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r)
_ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0)
_ = o.store.SetTaskStatus(ctx, task.ID, "error")
}
}()
var (
mu sync.Mutex
totCopied, totSkipped, totErr int64
)
sem := make(chan struct{}, o.concurrency)
var wg sync.WaitGroup
for _, a := range accs {
wg.Add(1)
sem <- struct{}{}
go func(a store.Account) {
defer wg.Done()
defer func() { <-sem }()
defer func() {
if r := recover(); r != nil {
slog.Error("account worker panicked", "task", task.ID, "account", a.ID, "panic", r)
_ = o.store.SetAccountStatus(ctx, a.ID, "error")
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID,
Data: map[string]any{"account_id": a.ID, "error": "internal panic"}})
mu.Lock()
totErr++
mu.Unlock()
}
}()
c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP)
mu.Lock()
totCopied += c
totSkipped += s
totErr += e
mu.Unlock()
}(a)
}
wg.Wait()
status := "done"
if totErr > 0 {
status = "done_with_errors"
}
_ = o.store.FinishRun(ctx, runID, status, totCopied, totSkipped, totErr)
_ = o.store.SetTaskStatus(ctx, task.ID, status)
o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID,
Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}})
}
func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) {
o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID,
"src_login": a.SrcLogin, "src_host": srcEP.Host, "src_port": srcEP.Port,
"dst_login": a.DstLogin, "dst_host": dstEP.Host, "dst_port": dstEP.Port,
}})
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
_ = o.store.SetAccountError(ctx, a.ID, "") // clear any error from a previous run
// Per-account cancellable context: IMAP work uses actx (so CancelAccount
// stops it); DB writes keep the parent ctx so status/counters persist even
// after cancellation. ctx is context.WithoutCancel from runAll.
actx, cancel := context.WithCancel(ctx)
o.registerCancel(a.ID, cancel)
defer func() {
o.unregisterCancel(a.ID)
cancel()
}()
srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc)
if err != nil {
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
dstPass, err := crypto.Decrypt(o.encKey, a.DstPassEnc)
if err != nil {
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
}
src, err := imapx.Connect(actx, srcEP)
if err != nil {
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
defer func() { _ = src.Logout().Wait() }()
if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil {
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
dst, err := imapx.Connect(actx, dstEP)
if err != nil {
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
}
defer func() { _ = dst.Logout().Wait() }()
if err := dst.Login(a.DstLogin, string(dstPass)).Wait(); err != nil {
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
}
// On cancel, close the connections so any in-flight network read (a slow
// FETCH/Collect that ctx.Err() checks can't interrupt) unblocks immediately.
go func() {
<-actx.Done()
_ = src.Close()
_ = dst.Close()
}()
folders, err := imapx.ListFolders(src)
if err != nil {
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
// Planning pass: EXAMINE every folder up front to learn the total message
// count, so the UI can show an accurate overall bar / ETA before copying.
type folderPlan struct {
src, dst string
total int64
}
plan := make([]folderPlan, 0, len(folders))
var grandTotal int64
for _, folder := range folders {
if actx.Err() != nil {
break
}
df := folder
if m, ok := task.FolderMapping[folder]; ok {
df = m
}
n, cerr := imapx.FolderMessageCount(src, folder)
if cerr != nil && actx.Err() == nil {
slog.Warn("count folder failed", "account", a.ID, "folder", folder, "err", cerr)
}
plan = append(plan, folderPlan{src: folder, dst: df, total: n})
grandTotal += n
}
o.hub.Publish(wshub.Event{Type: "plan", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID, "src_login": a.SrcLogin, "folders": len(plan), "total": grandTotal,
}})
var copied, skipped, errs int64
// Account-level live progress state (all callbacks run on this goroutine,
// so plain vars are race-free). base* = totals from completed folders;
// c/s inside OnProgress are cumulative within the current folder.
var baseCopied, baseSkipped int64
var curFolder string
var curTotal int64
var lastEmit, lastScanEmit time.Time
deps := imapx.CopyDeps{
IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) },
MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) },
OnProgress: func(c, s int) {
now := time.Now()
done := c + s
// throttle to ~3/sec per account, but always emit folder completion
if now.Sub(lastEmit) < 350*time.Millisecond && int64(done) < curTotal {
return
}
lastEmit = now
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID,
"copied": baseCopied + int64(c),
"skipped": baseSkipped + int64(s),
"folder": curFolder,
"folder_done": done,
"folder_total": curTotal,
"account_total": grandTotal,
}})
},
// Fires after EXAMINE (before the long fetch) with the folder's message count.
OnFolder: func(srcFolder, dstFolder string, total int64) {
curFolder, curTotal = srcFolder, total
o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID, "src_login": a.SrcLogin,
"folder": srcFolder, "dst_folder": dstFolder, "messages": total,
}})
},
// Fires while streaming metadata (dedup scan) so the UI shows movement
// before bodies start copying. Throttled to ~4/sec, always emit the last.
OnScan: func(scanned, total int64) {
now := time.Now()
if now.Sub(lastScanEmit) < 250*time.Millisecond && scanned < total {
return
}
lastScanEmit = now
o.hub.Publish(wshub.Event{Type: "scan", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID, "folder": curFolder, "scanned": scanned, "folder_total": total,
}})
},
}
for _, fp := range plan {
if actx.Err() != nil {
break // cancelled — stop scheduling more folders
}
res, err := imapx.CopyFolder(actx, src, dst, fp.src, fp.dst, deps)
folderErr := int64(0)
if err != nil && actx.Err() == nil {
slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", fp.src, "err", err)
folderErr = 1
_ = o.store.SetAccountError(ctx, a.ID, "folder \""+fp.src+"\": "+err.Error())
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID, "src_login": a.SrcLogin, "folder": fp.src, "error": err.Error(),
}})
}
copied += int64(res.Copied)
skipped += int64(res.Skipped)
errs += int64(res.Errors) + folderErr
baseCopied += int64(res.Copied)
baseSkipped += int64(res.Skipped)
// Persist message-level AND folder-level errors so the account row's
// error count matches the task status (done_with_errors).
_ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors)+folderErr)
}
if actx.Err() != nil {
_ = o.store.SetAccountStatus(ctx, a.ID, "cancelled")
o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: task.ID,
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin,
"copied": copied, "skipped": skipped, "errors": errs}})
slog.Info("account cancelled", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped)
return copied, skipped, errs
}
acctStatus := "done"
if errs > 0 {
acctStatus = "done_with_errors"
}
_ = o.store.SetAccountStatus(ctx, a.ID, acctStatus)
o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID,
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "dst_login": a.DstLogin,
"copied": copied, "skipped": skipped, "errors": errs}})
slog.Info("account copied", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped, "errors", errs)
return copied, skipped, errs
}
func (o *Orchestrator) accountFailed(ctx context.Context, taskID int64, a store.Account, srcEP, dstEP imapx.Endpoint, side string, err error) (int64, int64, int64) {
// A cancellation surfacing as an error is a cancel, not a failure.
if errors.Is(err, context.Canceled) {
_ = o.store.SetAccountStatus(ctx, a.ID, "cancelled")
o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: taskID,
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin}})
return 0, 0, 0
}
login, host, port := a.SrcLogin, srcEP.Host, srcEP.Port
if side == "dst" {
login, host, port = a.DstLogin, dstEP.Host, dstEP.Port
}
slog.Error("account failed", "account", a.ID, "side", side, "login", login, "host", host, "port", port, "err", err)
_ = o.store.SetAccountStatus(ctx, a.ID, "error")
_ = o.store.SetAccountError(ctx, a.ID, side+" "+login+"@"+host+": "+err.Error())
o.hub.Publish(wshub.Event{Type: "error", TaskID: taskID,
Data: map[string]any{"account_id": a.ID, "side": side, "login": login, "host": host, "port": port, "error": err.Error()}})
return 0, 0, 1
}