|
|
|
@@ -20,10 +20,37 @@ type Orchestrator struct {
|
|
|
|
|
hub *wshub.Hub
|
|
|
|
|
encKey []byte
|
|
|
|
|
concurrency int
|
|
|
|
|
|
|
|
|
|
mu sync.Mutex
|
|
|
|
|
cancels map[int64]context.CancelFunc // account_id -> cancel of its in-flight copy
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator {
|
|
|
|
|
return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency}
|
|
|
|
|
return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency, cancels: map[int64]context.CancelFunc{}}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// CancelAccount aborts the in-flight copy for one account, if it is running.
|
|
|
|
|
// Returns true if a running copy was found and signalled to stop.
|
|
|
|
|
func (o *Orchestrator) CancelAccount(accountID int64) bool {
|
|
|
|
|
o.mu.Lock()
|
|
|
|
|
cancel, ok := o.cancels[accountID]
|
|
|
|
|
o.mu.Unlock()
|
|
|
|
|
if ok {
|
|
|
|
|
cancel()
|
|
|
|
|
}
|
|
|
|
|
return ok
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Orchestrator) registerCancel(accountID int64, cancel context.CancelFunc) {
|
|
|
|
|
o.mu.Lock()
|
|
|
|
|
o.cancels[accountID] = cancel
|
|
|
|
|
o.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Orchestrator) unregisterCancel(accountID int64) {
|
|
|
|
|
o.mu.Lock()
|
|
|
|
|
delete(o.cancels, accountID)
|
|
|
|
|
o.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func gateOK(accs []store.Account) bool {
|
|
|
|
@@ -190,6 +217,16 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|
|
|
|
}})
|
|
|
|
|
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
|
|
|
|
|
|
|
|
|
|
// Per-account cancellable context: IMAP work uses actx (so CancelAccount
|
|
|
|
|
// stops it); DB writes keep the parent ctx so status/counters persist even
|
|
|
|
|
// after cancellation. ctx is context.WithoutCancel from runAll.
|
|
|
|
|
actx, cancel := context.WithCancel(ctx)
|
|
|
|
|
o.registerCancel(a.ID, cancel)
|
|
|
|
|
defer func() {
|
|
|
|
|
o.unregisterCancel(a.ID)
|
|
|
|
|
cancel()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
|
|
|
@@ -199,7 +236,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|
|
|
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
src, err := imapx.Connect(ctx, srcEP)
|
|
|
|
|
src, err := imapx.Connect(actx, srcEP)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
|
|
|
|
}
|
|
|
|
@@ -207,7 +244,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|
|
|
|
if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil {
|
|
|
|
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
|
|
|
|
}
|
|
|
|
|
dst, err := imapx.Connect(ctx, dstEP)
|
|
|
|
|
dst, err := imapx.Connect(actx, dstEP)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
|
|
|
|
}
|
|
|
|
@@ -216,6 +253,14 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|
|
|
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// On cancel, close the connections so any in-flight network read (a slow
|
|
|
|
|
// FETCH/Collect that ctx.Err() checks can't interrupt) unblocks immediately.
|
|
|
|
|
go func() {
|
|
|
|
|
<-actx.Done()
|
|
|
|
|
_ = src.Close()
|
|
|
|
|
_ = dst.Close()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
folders, err := imapx.ListFolders(src)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
|
|
|
@@ -229,14 +274,24 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|
|
|
|
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID,
|
|
|
|
|
Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}})
|
|
|
|
|
},
|
|
|
|
|
// Fires after EXAMINE (before the long fetch) with the folder's message count.
|
|
|
|
|
OnFolder: func(srcFolder, dstFolder string, total int64) {
|
|
|
|
|
o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{
|
|
|
|
|
"account_id": a.ID, "src_login": a.SrcLogin,
|
|
|
|
|
"folder": srcFolder, "dst_folder": dstFolder, "messages": total,
|
|
|
|
|
}})
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
for _, folder := range folders {
|
|
|
|
|
if actx.Err() != nil {
|
|
|
|
|
break // cancelled — stop scheduling more folders
|
|
|
|
|
}
|
|
|
|
|
dstFolder := folder
|
|
|
|
|
if m, ok := task.FolderMapping[folder]; ok {
|
|
|
|
|
dstFolder = m
|
|
|
|
|
}
|
|
|
|
|
res, err := imapx.CopyFolder(ctx, src, dst, folder, dstFolder, deps)
|
|
|
|
|
if err != nil {
|
|
|
|
|
res, err := imapx.CopyFolder(actx, src, dst, folder, dstFolder, deps)
|
|
|
|
|
if err != nil && actx.Err() == nil {
|
|
|
|
|
slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", folder, "err", err)
|
|
|
|
|
errs++
|
|
|
|
|
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{
|
|
|
|
@@ -248,6 +303,16 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|
|
|
|
errs += int64(res.Errors)
|
|
|
|
|
_ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if actx.Err() != nil {
|
|
|
|
|
_ = o.store.SetAccountStatus(ctx, a.ID, "cancelled")
|
|
|
|
|
o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: task.ID,
|
|
|
|
|
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin,
|
|
|
|
|
"copied": copied, "skipped": skipped, "errors": errs}})
|
|
|
|
|
slog.Info("account cancelled", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped)
|
|
|
|
|
return copied, skipped, errs
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
_ = o.store.SetAccountStatus(ctx, a.ID, "done")
|
|
|
|
|
o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID,
|
|
|
|
|
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "dst_login": a.DstLogin,
|
|
|
|
@@ -257,6 +322,13 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (o *Orchestrator) accountFailed(ctx context.Context, taskID int64, a store.Account, srcEP, dstEP imapx.Endpoint, side string, err error) (int64, int64, int64) {
|
|
|
|
|
// A cancellation surfacing as an error is a cancel, not a failure.
|
|
|
|
|
if errors.Is(err, context.Canceled) {
|
|
|
|
|
_ = o.store.SetAccountStatus(ctx, a.ID, "cancelled")
|
|
|
|
|
o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: taskID,
|
|
|
|
|
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin}})
|
|
|
|
|
return 0, 0, 0
|
|
|
|
|
}
|
|
|
|
|
login, host, port := a.SrcLogin, srcEP.Host, srcEP.Port
|
|
|
|
|
if side == "dst" {
|
|
|
|
|
login, host, port = a.DstLogin, dstEP.Host, dstEP.Port
|
|
|
|
|