feat(orchestrator): worker pool run + account testing gate
This commit is contained in:
@@ -0,0 +1,219 @@
|
||||
package orchestrator
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/vasyansk/imap-copier/internal/crypto"
|
||||
"github.com/vasyansk/imap-copier/internal/imapx"
|
||||
"github.com/vasyansk/imap-copier/internal/store"
|
||||
"github.com/vasyansk/imap-copier/internal/wshub"
|
||||
)
|
||||
|
||||
var ErrNotTested = errors.New("accounts not fully tested")
|
||||
|
||||
type Orchestrator struct {
|
||||
store *store.Store
|
||||
hub *wshub.Hub
|
||||
encKey []byte
|
||||
concurrency int
|
||||
}
|
||||
|
||||
func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator {
|
||||
return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency}
|
||||
}
|
||||
|
||||
func gateOK(accs []store.Account) bool {
|
||||
if len(accs) == 0 {
|
||||
return false
|
||||
}
|
||||
for _, a := range accs {
|
||||
if a.TestSrcStatus != "ok" || a.TestDstStatus != "ok" {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (o *Orchestrator) endpoints(ctx context.Context, task store.Task) (imapx.Endpoint, imapx.Endpoint, error) {
|
||||
src, err := o.store.GetEndpoint(ctx, task.SrcEndpointID)
|
||||
if err != nil {
|
||||
return imapx.Endpoint{}, imapx.Endpoint{}, err
|
||||
}
|
||||
dst, err := o.store.GetEndpoint(ctx, task.DstEndpointID)
|
||||
if err != nil {
|
||||
return imapx.Endpoint{}, imapx.Endpoint{}, err
|
||||
}
|
||||
toEP := func(e store.Endpoint) imapx.Endpoint {
|
||||
return imapx.Endpoint{Host: e.Host, Port: e.Port, TLSMode: e.TLSMode}
|
||||
}
|
||||
return toEP(src), toEP(dst), nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) TestAccounts(ctx context.Context, taskID int64) error {
|
||||
task, err := o.store.GetTask(ctx, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
srcEP, dstEP, err := o.endpoints(ctx, task)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
accs, err := o.store.ListAccountsByTask(ctx, taskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, a := range accs {
|
||||
o.testSide(ctx, srcEP, a.ID, "src", a.SrcLogin, a.SrcPassEnc, taskID)
|
||||
o.testSide(ctx, dstEP, a.ID, "dst", a.DstLogin, a.DstPassEnc, taskID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID int64, side, login, passEnc string, taskID int64) {
|
||||
status := "ok"
|
||||
pass, err := crypto.Decrypt(o.encKey, passEnc)
|
||||
if err == nil {
|
||||
_, err = imapx.TestLogin(ctx, ep, login, string(pass))
|
||||
}
|
||||
if err != nil {
|
||||
status = "fail"
|
||||
slog.Warn("account test failed", "account", accID, "side", side, "err", err)
|
||||
}
|
||||
_ = o.store.SetAccountTestStatus(ctx, accID, side, status)
|
||||
o.hub.Publish(wshub.Event{Type: "account_test", TaskID: taskID,
|
||||
Data: map[string]any{"account_id": accID, "side": side, "status": status}})
|
||||
}
|
||||
|
||||
func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
|
||||
task, err := o.store.GetTask(ctx, taskID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
accs, err := o.store.ListAccountsByTask(ctx, taskID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if !gateOK(accs) {
|
||||
return 0, ErrNotTested
|
||||
}
|
||||
srcEP, dstEP, err := o.endpoints(ctx, task)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
runID, err := o.store.CreateRun(ctx, taskID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
_ = o.store.SetTaskStatus(ctx, taskID, "running")
|
||||
o.hub.Publish(wshub.Event{Type: "run_started", TaskID: taskID, Data: map[string]any{"run_id": runID}})
|
||||
|
||||
go o.runAll(context.WithoutCancel(ctx), task, runID, accs, srcEP, dstEP)
|
||||
return runID, nil
|
||||
}
|
||||
|
||||
func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64, accs []store.Account, srcEP, dstEP imapx.Endpoint) {
|
||||
var (
|
||||
mu sync.Mutex
|
||||
totCopied, totSkipped, totErr int64
|
||||
)
|
||||
sem := make(chan struct{}, o.concurrency)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for _, a := range accs {
|
||||
wg.Add(1)
|
||||
sem <- struct{}{}
|
||||
go func(a store.Account) {
|
||||
defer wg.Done()
|
||||
defer func() { <-sem }()
|
||||
c, s, e := o.runAccount(ctx, task, runID, a, srcEP, dstEP)
|
||||
mu.Lock()
|
||||
totCopied += c
|
||||
totSkipped += s
|
||||
totErr += e
|
||||
mu.Unlock()
|
||||
}(a)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
_ = o.store.FinishRun(ctx, runID, "done", totCopied, totSkipped, totErr)
|
||||
_ = o.store.SetTaskStatus(ctx, task.ID, "done")
|
||||
o.hub.Publish(wshub.Event{Type: "run_done", TaskID: task.ID,
|
||||
Data: map[string]any{"run_id": runID, "copied": totCopied, "skipped": totSkipped, "errors": totErr}})
|
||||
}
|
||||
|
||||
func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) {
|
||||
o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{"account_id": a.ID}})
|
||||
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
|
||||
|
||||
srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc)
|
||||
if err != nil {
|
||||
return o.accountFailed(ctx, task.ID, a.ID, err)
|
||||
}
|
||||
dstPass, err := crypto.Decrypt(o.encKey, a.DstPassEnc)
|
||||
if err != nil {
|
||||
return o.accountFailed(ctx, task.ID, a.ID, err)
|
||||
}
|
||||
|
||||
src, err := imapx.Connect(ctx, srcEP)
|
||||
if err != nil {
|
||||
return o.accountFailed(ctx, task.ID, a.ID, err)
|
||||
}
|
||||
defer func() { _ = src.Logout().Wait() }()
|
||||
if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil {
|
||||
return o.accountFailed(ctx, task.ID, a.ID, err)
|
||||
}
|
||||
dst, err := imapx.Connect(ctx, dstEP)
|
||||
if err != nil {
|
||||
return o.accountFailed(ctx, task.ID, a.ID, err)
|
||||
}
|
||||
defer func() { _ = dst.Logout().Wait() }()
|
||||
if err := dst.Login(a.DstLogin, string(dstPass)).Wait(); err != nil {
|
||||
return o.accountFailed(ctx, task.ID, a.ID, err)
|
||||
}
|
||||
|
||||
folders, err := imapx.TestLogin(ctx, srcEP, a.SrcLogin, string(srcPass))
|
||||
if err != nil {
|
||||
return o.accountFailed(ctx, task.ID, a.ID, err)
|
||||
}
|
||||
|
||||
var copied, skipped, errs int64
|
||||
deps := imapx.CopyDeps{
|
||||
IsMigrated: func(k string) (bool, error) { return o.store.IsMigrated(ctx, a.ID, k) },
|
||||
MarkMigrated: func(folder, k string) error { return o.store.MarkMigrated(ctx, a.ID, folder, k) },
|
||||
OnProgress: func(c, s int) {
|
||||
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID,
|
||||
Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}})
|
||||
},
|
||||
}
|
||||
for _, folder := range folders {
|
||||
dstFolder := folder
|
||||
if m, ok := task.FolderMapping[folder]; ok {
|
||||
dstFolder = m
|
||||
}
|
||||
res, err := imapx.CopyFolder(ctx, src, dst, folder, dstFolder, deps)
|
||||
if err != nil {
|
||||
slog.Warn("folder copy error", "account", a.ID, "folder", folder, "err", err)
|
||||
errs++
|
||||
}
|
||||
copied += int64(res.Copied)
|
||||
skipped += int64(res.Skipped)
|
||||
errs += int64(res.Errors)
|
||||
_ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors))
|
||||
}
|
||||
_ = o.store.SetAccountStatus(ctx, a.ID, "done")
|
||||
o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID,
|
||||
Data: map[string]any{"account_id": a.ID, "copied": copied, "skipped": skipped, "errors": errs}})
|
||||
slog.Info("account copied", "account", a.ID, "copied", copied, "skipped", skipped, "errors", errs)
|
||||
return copied, skipped, errs
|
||||
}
|
||||
|
||||
func (o *Orchestrator) accountFailed(ctx context.Context, taskID, accID int64, err error) (int64, int64, int64) {
|
||||
slog.Error("account failed", "account", accID, "err", err)
|
||||
_ = o.store.SetAccountStatus(ctx, accID, "error")
|
||||
o.hub.Publish(wshub.Event{Type: "error", TaskID: taskID,
|
||||
Data: map[string]any{"account_id": accID, "error": err.Error()}})
|
||||
return 0, 0, 1
|
||||
}
|
||||
Reference in New Issue
Block a user