package orchestrator import ( "context" "errors" "log/slog" "sync" "time" "github.com/vasyansk/imap-copier/internal/crypto" "github.com/vasyansk/imap-copier/internal/imapx" "github.com/vasyansk/imap-copier/internal/store" "github.com/vasyansk/imap-copier/internal/wshub" ) var ErrNotTested = errors.New("accounts not fully tested") var ErrAlreadyRunning = errors.New("task already running") type Orchestrator struct { store *store.Store hub *wshub.Hub encKey []byte concurrency int mu sync.Mutex cancels map[int64]context.CancelFunc // account_id -> cancel of its in-flight copy } func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator { return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency, cancels: map[int64]context.CancelFunc{}} } // CancelAccount aborts the in-flight copy for one account, if it is running. // Returns true if a running copy was found and signalled to stop. func (o *Orchestrator) CancelAccount(accountID int64) bool { o.mu.Lock() cancel, ok := o.cancels[accountID] o.mu.Unlock() if ok { cancel() } return ok } func (o *Orchestrator) registerCancel(accountID int64, cancel context.CancelFunc) { o.mu.Lock() o.cancels[accountID] = cancel o.mu.Unlock() } func (o *Orchestrator) unregisterCancel(accountID int64) { o.mu.Lock() delete(o.cancels, accountID) o.mu.Unlock() } func gateOK(accs []store.Account) bool { if len(accs) == 0 { return false } for _, a := range accs { if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" { return false } } return true } func (o *Orchestrator) endpoints(ctx context.Context, task store.Task) (imapx.Endpoint, imapx.Endpoint, error) { src, err := o.store.GetEndpoint(ctx, task.SrcEndpointID) if err != nil { return imapx.Endpoint{}, imapx.Endpoint{}, err } dst, err := o.store.GetEndpoint(ctx, task.DstEndpointID) if err != nil { return imapx.Endpoint{}, imapx.Endpoint{}, err } toEP := func(e store.Endpoint) imapx.Endpoint { return imapx.Endpoint{Host: e.Host, Port: e.Port, TLSMode: e.TLSMode} } return toEP(src), toEP(dst), nil } func (o *Orchestrator) TestAccounts(ctx context.Context, taskID int64) error { task, err := o.store.GetTask(ctx, taskID) if err != nil { return err } srcEP, dstEP, err := o.endpoints(ctx, task) if err != nil { return err } accs, err := o.store.ListAccountsByTask(ctx, taskID) if err != nil { return err } for _, a := range accs { o.testSide(ctx, srcEP, a.ID, "src", a.SrcLogin, a.SrcPassEnc, taskID) o.testSide(ctx, dstEP, a.ID, "dst", a.DstLogin, a.DstPassEnc, taskID) } return nil } func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID int64, side, login, passEnc string, taskID int64) { status := "ok" errMsg := "" pass, err := crypto.Decrypt(o.encKey, passEnc) if err == nil { _, err = imapx.TestLogin(ctx, ep, login, string(pass)) } if err != nil { status = "fail" errMsg = err.Error() slog.Warn("account test failed", "account", accID, "side", side, "login", login, "host", ep.Host, "port", ep.Port, "err", err) } _ = o.store.SetAccountTestStatus(ctx, accID, side, status) o.hub.Publish(wshub.Event{Type: "account_test", TaskID: taskID, Data: map[string]any{ "account_id": accID, "side": side, "status": status, "login": login, "host": ep.Host, "port": ep.Port, "error": errMsg, }}) } func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) { task, err := o.store.GetTask(ctx, taskID) if err != nil { return 0, err } accs, err := o.store.ListAccountsByTask(ctx, taskID) if err != nil { return 0, err } if !gateOK(accs) { return 0, ErrNotTested } acquired, err := o.store.TryMarkTaskRunning(ctx, taskID) if err != nil { return 0, err } if !acquired { return 0, ErrAlreadyRunning } srcEP, dstEP, err := o.endpoints(ctx, task) if err != nil { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err } runID, err := o.store.CreateRun(ctx, taskID) if err != nil { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err } o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}}) go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP) return runID, nil } func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) { defer func() { if r := recover(); r != nil { slog.Error("run coordinator panicked", "task", task.ID, "run", runID, "panic", r) _ = o.store.FinishRun(ctx, runID, "error", 0, 0, 0) _ = o.store.SetTaskStatus(ctx, task.ID, "error") } }() var ( mu sync.Mutex totCopied, totSkipped, totErr int64 ) sem := make(chan struct{}, o.concurrency) var wg sync.WaitGroup for _, a := range accs { wg.Add(1) sem <- struct{}{} go func(a store.Account) { defer wg.Done() defer func() { <-sem }() defer func() { if r := recover(); r != nil { slog.Error("account worker panicked", "task", task.ID, "account", a.ID, "panic", r) _ = o.store.SetAccountStatus(ctx, a.ID, "error") o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{"account_id": a.ID, "error": "internal panic"}}) mu.Lock() totErr++ mu.Unlock() } }() c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP) mu.Lock() totCopied += c totSkipped += s totErr += e mu.Unlock() }(a) } wg.Wait() status := "done" if totErr > 0 { status = "done_with_errors" } _ = o.store.FinishRun(ctx, runID, status, totCopied, totSkipped, totErr) _ = o.store.SetTaskStatus(ctx, task.ID, status) o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID, Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}}) } func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) { o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{ "account_id": a.ID, "src_login": a.SrcLogin, "src_host": srcEP.Host, "src_port": srcEP.Port, "dst_login": a.DstLogin, "dst_host": dstEP.Host, "dst_port": dstEP.Port, }}) _ = o.store.SetAccountStatus(ctx, a.ID, "running") // Per-account cancellable context: IMAP work uses actx (so CancelAccount // stops it); DB writes keep the parent ctx so status/counters persist even // after cancellation. ctx is context.WithoutCancel from runAll. actx, cancel := context.WithCancel(ctx) o.registerCancel(a.ID, cancel) defer func() { o.unregisterCancel(a.ID) cancel() }() srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) } dstPass, err := crypto.Decrypt(o.encKey, a.DstPassEnc) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err) } src, err := imapx.Connect(actx, srcEP) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) } defer func() { _ = src.Logout().Wait() }() if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) } dst, err := imapx.Connect(actx, dstEP) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err) } defer func() { _ = dst.Logout().Wait() }() if err := dst.Login(a.DstLogin, string(dstPass)).Wait(); err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err) } // On cancel, close the connections so any in-flight network read (a slow // FETCH/Collect that ctx.Err() checks can't interrupt) unblocks immediately. go func() { <-actx.Done() _ = src.Close() _ = dst.Close() }() folders, err := imapx.ListFolders(src) if err != nil { return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err) } var copied, skipped, errs int64 // Account-level live progress state (all callbacks run on this goroutine, // so plain vars are race-free). base* = totals from completed folders; // c/s inside OnProgress are cumulative within the current folder. var baseCopied, baseSkipped int64 var curFolder string var curTotal int64 var lastEmit time.Time deps := imapx.CopyDeps{ IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) }, MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) }, OnProgress: func(c, s int) { now := time.Now() done := c + s // throttle to ~3/sec per account, but always emit folder completion if now.Sub(lastEmit) < 350*time.Millisecond && int64(done) < curTotal { return } lastEmit = now o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{ "account_id": a.ID, "copied": baseCopied + int64(c), "skipped": baseSkipped + int64(s), "folder": curFolder, "folder_done": done, "folder_total": curTotal, }}) }, // Fires after EXAMINE (before the long fetch) with the folder's message count. OnFolder: func(srcFolder, dstFolder string, total int64) { curFolder, curTotal = srcFolder, total o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{ "account_id": a.ID, "src_login": a.SrcLogin, "folder": srcFolder, "dst_folder": dstFolder, "messages": total, }}) }, } for _, folder := range folders { if actx.Err() != nil { break // cancelled — stop scheduling more folders } dstFolder := folder if m, ok := task.FolderMapping[folder]; ok { dstFolder = m } res, err := imapx.CopyFolder(actx, src, dst, folder, dstFolder, deps) if err != nil && actx.Err() == nil { slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", folder, "err", err) errs++ o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{ "account_id": a.ID, "src_login": a.SrcLogin, "folder": folder, "error": err.Error(), }}) } copied += int64(res.Copied) skipped += int64(res.Skipped) errs += int64(res.Errors) baseCopied += int64(res.Copied) baseSkipped += int64(res.Skipped) _ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors)) } if actx.Err() != nil { _ = o.store.SetAccountStatus(ctx, a.ID, "cancelled") o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: task.ID, Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "copied": copied, "skipped": skipped, "errors": errs}}) slog.Info("account cancelled", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped) return copied, skipped, errs } _ = o.store.SetAccountStatus(ctx, a.ID, "done") o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID, Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "dst_login": a.DstLogin, "copied": copied, "skipped": skipped, "errors": errs}}) slog.Info("account copied", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped, "errors", errs) return copied, skipped, errs } func (o *Orchestrator) accountFailed(ctx context.Context, taskID int64, a store.Account, srcEP, dstEP imapx.Endpoint, side string, err error) (int64, int64, int64) { // A cancellation surfacing as an error is a cancel, not a failure. if errors.Is(err, context.Canceled) { _ = o.store.SetAccountStatus(ctx, a.ID, "cancelled") o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: taskID, Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin}}) return 0, 0, 0 } login, host, port := a.SrcLogin, srcEP.Host, srcEP.Port if side == "dst" { login, host, port = a.DstLogin, dstEP.Host, dstEP.Port } slog.Error("account failed", "account", a.ID, "side", side, "login", login, "host", host, "port", port, "err", err) _ = o.store.SetAccountStatus(ctx, a.ID, "error") o.hub.Publish(wshub.Event{Type: "error", TaskID: taskID, Data: map[string]any{"account_id": a.ID, "side": side, "login": login, "host": host, "port": port, "error": err.Error()}}) return 0, 0, 1 }