package orchestrator import ( "context" "errors" "log/slog" "sync" "github.com/vasyansk/imap-copier/internal/crypto" "github.com/vasyansk/imap-copier/internal/imapx" "github.com/vasyansk/imap-copier/internal/store" "github.com/vasyansk/imap-copier/internal/wshub" ) var ErrNotTested = errors.New("accounts not fully tested") var ErrAlreadyRunning = errors.New("task already running") type Orchestrator struct { store *store.Store hub *wshub.Hub encKey []byte concurrency int } func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator { return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency} } func gateOK(accs []store.Account) bool { if len(accs) == 0 { return false } for _, a := range accs { if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" { return false } } return true } func (o *Orchestrator) endpoints(ctx context.Context, task store.Task) (imapx.Endpoint, imapx.Endpoint, error) { src, err := o.store.GetEndpoint(ctx, task.SrcEndpointID) if err != nil { return imapx.Endpoint{}, imapx.Endpoint{}, err } dst, err := o.store.GetEndpoint(ctx, task.DstEndpointID) if err != nil { return imapx.Endpoint{}, imapx.Endpoint{}, err } toEP := func(e store.Endpoint) imapx.Endpoint { return imapx.Endpoint{Host: e.Host, Port: e.Port, TLSMode: e.TLSMode} } return toEP(src), toEP(dst), nil } func (o *Orchestrator) TestAccounts(ctx context.Context, taskID int64) error { task, err := o.store.GetTask(ctx, taskID) if err != nil { return err } srcEP, dstEP, err := o.endpoints(ctx, task) if err != nil { return err } accs, err := o.store.ListAccountsByTask(ctx, taskID) if err != nil { return err } for _, a := range accs { o.testSide(ctx, srcEP, a.ID, "src", a.SrcLogin, a.SrcPassEnc, taskID) o.testSide(ctx, dstEP, a.ID, "dst", a.DstLogin, a.DstPassEnc, taskID) } return nil } func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID int64, side, login, passEnc string, taskID int64) { status := "ok" pass, err := crypto.Decrypt(o.encKey, passEnc) if err == nil { _, err = imapx.TestLogin(ctx, ep, login, string(pass)) } if err != nil { status = "fail" slog.Warn("account test failed", "account", accID, "side", side, "err", err) } _ = o.store.SetAccountTestStatus(ctx, accID, side, status) o.hub.Publish(wshub.Event{Type: "account_test", TaskID: taskID, Data: map[string]any{"account_id": accID, "side": side, "status": status}}) } func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) { task, err := o.store.GetTask(ctx, taskID) if err != nil { return 0, err } accs, err := o.store.ListAccountsByTask(ctx, taskID) if err != nil { return 0, err } if !gateOK(accs) { return 0, ErrNotTested } acquired, err := o.store.TryMarkTaskRunning(ctx, taskID) if err != nil { return 0, err } if !acquired { return 0, ErrAlreadyRunning } srcEP, dstEP, err := o.endpoints(ctx, task) if err != nil { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err } runID, err := o.store.CreateRun(ctx, taskID) if err != nil { _ = o.store.SetTaskStatus(ctx, taskID, "error") return 0, err } o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}}) go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP) return runID, nil } func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) { var ( mu sync.Mutex totCopied, totSkipped, totErr int64 ) sem := make(chan struct{}, o.concurrency) var wg sync.WaitGroup for _, a := range accs { wg.Add(1) sem <- struct{}{} go func(a store.Account) { defer wg.Done() defer func() { <-sem }() c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP) mu.Lock() totCopied += c totSkipped += s totErr += e mu.Unlock() }(a) } wg.Wait() status := "done" if totErr > 0 { status = "done_with_errors" } _ = o.store.FinishRun(ctx, runID, status, totCopied, totSkipped, totErr) _ = o.store.SetTaskStatus(ctx, task.ID, status) o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID, Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}}) } func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) { o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{"account_id": a.ID}}) _ = o.store.SetAccountStatus(ctx, a.ID, "running") srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc) if err != nil { return o.accountFailed(ctx, task.ID, a.ID, err) } dstPass, err := crypto.Decrypt(o.encKey, a.DstPassEnc) if err != nil { return o.accountFailed(ctx, task.ID, a.ID, err) } src, err := imapx.Connect(ctx, srcEP) if err != nil { return o.accountFailed(ctx, task.ID, a.ID, err) } defer func() { _ = src.Logout().Wait() }() if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil { return o.accountFailed(ctx, task.ID, a.ID, err) } dst, err := imapx.Connect(ctx, dstEP) if err != nil { return o.accountFailed(ctx, task.ID, a.ID, err) } defer func() { _ = dst.Logout().Wait() }() if err := dst.Login(a.DstLogin, string(dstPass)).Wait(); err != nil { return o.accountFailed(ctx, task.ID, a.ID, err) } folders, err := imapx.ListFolders(src) if err != nil { return o.accountFailed(ctx, task.ID, a.ID, err) } var copied, skipped, errs int64 deps := imapx.CopyDeps{ IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) }, MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) }, OnProgress: func(c, s int) { o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}}) }, } for _, folder := range folders { dstFolder := folder if m, ok := task.FolderMapping[folder]; ok { dstFolder = m } res, err := imapx.CopyFolder(ctx, src, dst, folder, dstFolder, deps) if err != nil { slog.Warn("folder copy error", "account", a.ID, "folder", folder, "err", err) errs++ } copied += int64(res.Copied) skipped += int64(res.Skipped) errs += int64(res.Errors) _ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors)) } _ = o.store.SetAccountStatus(ctx, a.ID, "done") o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID, Data: map[string]any{"account_id": a.ID, "copied": copied, "skipped": skipped, "errors": errs}}) slog.Info("account copied", "account", a.ID, "copied", copied, "skipped", skipped, "errors", errs) return copied, skipped, errs } func (o *Orchestrator) accountFailed(ctx context.Context, taskID, accID int64, err error) (int64, int64, int64) { slog.Error("account failed", "account", accID, "err", err) _ = o.store.SetAccountStatus(ctx, accID, "error") o.hub.Publish(wshub.Event{Type: "error", TaskID: taskID, Data: map[string]any{"account_id": accID, "error": err.Error()}}) return 0, 0, 1 }