From 8c871d9d2681500bb2f80b2f3fcf399dbd398061 Mon Sep 17 00:00:00 2001 From: Vassiliy Yegorov Date: Wed, 1 Jul 2026 18:04:44 +0700 Subject: [PATCH] feat(orchestrator): worker pool run + account testing gate --- internal/orchestrator/orchestrator.go | 219 +++++++++++++++++++++ internal/orchestrator/orchestrator_test.go | 24 +++ 2 files changed, 243 insertions(+) create mode 100644 internal/orchestrator/orchestrator.go create mode 100644 internal/orchestrator/orchestrator_test.go diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go new file mode 100644 index 0000000..de34e44 --- /dev/null +++ b/internal/orchestrator/orchestrator.go @@ -0,0 +1,219 @@ +package orchestrator + +import ( + "context" + "errors" + "log/slog" + "sync" + + "github.com/vasyansk/imap-copier/internal/crypto" + "github.com/vasyansk/imap-copier/internal/imapx" + "github.com/vasyansk/imap-copier/internal/store" + "github.com/vasyansk/imap-copier/internal/wshub" +) + +var ErrNotTested = errors.New("accounts not fully tested") + +type Orchestrator struct { + store *store.Store + hub *wshub.Hub + encKey []byte + concurrency int +} + +func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator { + return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency} +} + +func gateOK(accs []store.Account) bool { + if len(accs) == 0 { + return false + } + for _, a := range accs { + if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" { + return false + } + } + return true +} + +func (o *Orchestrator) endpoints(ctx context.Context, task store.Task) (imapx.Endpoint, imapx.Endpoint, error) { + src, err := o.store.GetEndpoint(ctx, task.SrcEndpointID) + if err != nil { + return imapx.Endpoint{}, imapx.Endpoint{}, err + } + dst, err := o.store.GetEndpoint(ctx, task.DstEndpointID) + if err != nil { + return imapx.Endpoint{}, imapx.Endpoint{}, err + } + toEP := func(e store.Endpoint) imapx.Endpoint { + return imapx.Endpoint{Host: e.Host, Port: e.Port, TLSMode: e.TLSMode} + } + return toEP(src), toEP(dst), nil +} + +func (o *Orchestrator) TestAccounts(ctx context.Context, taskID int64) error { + task, err := o.store.GetTask(ctx, taskID) + if err != nil { + return err + } + srcEP, dstEP, err := o.endpoints(ctx, task) + if err != nil { + return err + } + accs, err := o.store.ListAccountsByTask(ctx, taskID) + if err != nil { + return err + } + for _, a := range accs { + o.testSide(ctx, srcEP, a.ID, "src", a.SrcLogin, a.SrcPassEnc, taskID) + o.testSide(ctx, dstEP, a.ID, "dst", a.DstLogin, a.DstPassEnc, taskID) + } + return nil +} + +func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID int64, side, login, passEnc string, taskID int64) { + status := "ok" + pass, err := crypto.Decrypt(o.encKey, passEnc) + if err == nil { + _, err = imapx.TestLogin(ctx, ep, login, string(pass)) + } + if err != nil { + status = "fail" + slog.Warn("account test failed", "account", accID, "side", side, "err", err) + } + _ = o.store.SetAccountTestStatus(ctx, accID, side, status) + o.hub.Publish(wshub.Event{Type: "account_test", TaskID: taskID, + Data: map[string]any{"account_id": accID, "side": side, "status": status}}) +} + +func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) { + task, err := o.store.GetTask(ctx, taskID) + if err != nil { + return 0, err + } + accs, err := o.store.ListAccountsByTask(ctx, taskID) + if err != nil { + return 0, err + } + if !gateOK(accs) { + return 0, ErrNotTested + } + srcEP, dstEP, err := o.endpoints(ctx, task) + if err != nil { + return 0, err + } + runID, err := o.store.CreateRun(ctx, taskID) + if err != nil { + return 0, err + } + _ = o.store.SetTaskStatus(ctx, taskID, "running") + o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}}) + + go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP) + return runID, nil +} + +func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) { + var ( + mu sync.Mutex + totCopied, totSkipped, totErr int64 + ) + sem := make(chan struct{}, o.concurrency) + var wg sync.WaitGroup + + for _, a := range accs { + wg.Add(1) + sem <- struct{}{} + go func(a store.Account) { + defer wg.Done() + defer func() { <-sem }() + c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP) + mu.Lock() + totCopied += c + totSkipped += s + totErr += e + mu.Unlock() + }(a) + } + wg.Wait() + + _ = o.store.FinishRun(ctx, runID, "done", totCopied, totSkipped, totErr) + _ = o.store.SetTaskStatus(ctx, task.ID, "done") + o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID, + Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}}) +} + +func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) { + o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{"account_id": a.ID}}) + _ = o.store.SetAccountStatus(ctx, a.ID, "running") + + srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc) + if err != nil { + return o.accountFailed(ctx, task.ID, a.ID, err) + } + dstPass, err := crypto.Decrypt(o.encKey, a.DstPassEnc) + if err != nil { + return o.accountFailed(ctx, task.ID, a.ID, err) + } + + src, err := imapx.Connect(ctx, srcEP) + if err != nil { + return o.accountFailed(ctx, task.ID, a.ID, err) + } + defer func() { _ = src.Logout().Wait() }() + if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil { + return o.accountFailed(ctx, task.ID, a.ID, err) + } + dst, err := imapx.Connect(ctx, dstEP) + if err != nil { + return o.accountFailed(ctx, task.ID, a.ID, err) + } + defer func() { _ = dst.Logout().Wait() }() + if err := dst.Login(a.DstLogin, string(dstPass)).Wait(); err != nil { + return o.accountFailed(ctx, task.ID, a.ID, err) + } + + folders, err := imapx.TestLogin(ctx, srcEP, a.SrcLogin, string(srcPass)) + if err != nil { + return o.accountFailed(ctx, task.ID, a.ID, err) + } + + var copied, skipped, errs int64 + deps := imapx.CopyDeps{ + IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) }, + MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) }, + OnProgress: func(c, s int) { + o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID, + Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}}) + }, + } + for _, folder := range folders { + dstFolder := folder + if m, ok := task.FolderMapping[folder]; ok { + dstFolder = m + } + res, err := imapx.CopyFolder(ctx, src, dst, folder, dstFolder, deps) + if err != nil { + slog.Warn("folder copy error", "account", a.ID, "folder", folder, "err", err) + errs++ + } + copied += int64(res.Copied) + skipped += int64(res.Skipped) + errs += int64(res.Errors) + _ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors)) + } + _ = o.store.SetAccountStatus(ctx, a.ID, "done") + o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID, + Data: map[string]any{"account_id": a.ID, "copied": copied, "skipped": skipped, "errors": errs}}) + slog.Info("account copied", "account", a.ID, "copied", copied, "skipped", skipped, "errors", errs) + return copied, skipped, errs +} + +func (o *Orchestrator) accountFailed(ctx context.Context, taskID, accID int64, err error) (int64, int64, int64) { + slog.Error("account failed", "account", accID, "err", err) + _ = o.store.SetAccountStatus(ctx, accID, "error") + o.hub.Publish(wshub.Event{Type: "error", TaskID: taskID, + Data: map[string]any{"account_id": accID, "error": err.Error()}}) + return 0, 0, 1 +} diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go new file mode 100644 index 0000000..d83ea6c --- /dev/null +++ b/internal/orchestrator/orchestrator_test.go @@ -0,0 +1,24 @@ +package orchestrator + +import ( + "testing" + + "github.com/vasyansk/imap-copier/internal/store" +) + +func TestGateOK(t *testing.T) { + ok := []store.Account{ + {TestSrcStatus: "ok", TestDstStatus: "ok"}, + {TestSrcStatus: "ok", TestDstStatus: "ok"}, + } + if !gateOK(ok) { + t.Fatal("all ok must pass gate") + } + bad := []store.Account{{TestSrcStatus: "ok", TestDstStatus: "fail"}} + if gateOK(bad) { + t.Fatal("any non-ok must fail gate") + } + if gateOK(nil) { + t.Fatal("empty accounts must fail gate") + } +}