Merge feat/cancel-and-folder-progress: cancel + folder counts
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
@@ -24,6 +24,7 @@ func (s *Server) Router() http.Handler {
|
|||||||
api.HandleFunc("POST /api/tasks/{id}/import", s.handleImportCSV)
|
api.HandleFunc("POST /api/tasks/{id}/import", s.handleImportCSV)
|
||||||
api.HandleFunc("POST /api/tasks/{id}/test", s.handleTestAccounts)
|
api.HandleFunc("POST /api/tasks/{id}/test", s.handleTestAccounts)
|
||||||
api.HandleFunc("POST /api/tasks/{id}/run", s.handleRun)
|
api.HandleFunc("POST /api/tasks/{id}/run", s.handleRun)
|
||||||
|
api.HandleFunc("POST /api/tasks/{id}/accounts/{accountId}/cancel", s.handleCancelAccount)
|
||||||
mux.Handle("/api/", s.requireAuth(api))
|
mux.Handle("/api/", s.requireAuth(api))
|
||||||
mux.Handle("/ws", s.requireAuth(http.HandlerFunc(s.handleWS)))
|
mux.Handle("/ws", s.requireAuth(http.HandlerFunc(s.handleWS)))
|
||||||
|
|
||||||
|
|||||||
@@ -85,6 +85,19 @@ func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
|
|||||||
writeJSON(w, http.StatusAccepted, map[string]int64{"run_id": runID})
|
writeJSON(w, http.StatusAccepted, map[string]int64{"run_id": runID})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleCancelAccount(w http.ResponseWriter, r *http.Request) {
|
||||||
|
accID, err := pathID(r, "accountId")
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "bad account id", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !s.orch.CancelAccount(accID) {
|
||||||
|
http.Error(w, "account is not running", http.StatusConflict)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.WriteHeader(http.StatusAccepted)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
|
||||||
id, err := pathID(r, "id")
|
id, err := pathID(r, "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -19,6 +19,10 @@ type CopyDeps struct {
|
|||||||
IsMigrated func(key string) (bool, error)
|
IsMigrated func(key string) (bool, error)
|
||||||
MarkMigrated func(folder, key string) error
|
MarkMigrated func(folder, key string) error
|
||||||
OnProgress func(copied, skipped int)
|
OnProgress func(copied, skipped int)
|
||||||
|
// OnFolder is called once per folder right after EXAMINE, before the
|
||||||
|
// (potentially long) envelope fetch, with the message count in the source
|
||||||
|
// folder — for progress visibility.
|
||||||
|
OnFolder func(srcFolder, dstFolder string, total int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyResult summarizes the outcome of one CopyFolder run.
|
// CopyResult summarizes the outcome of one CopyFolder run.
|
||||||
@@ -42,6 +46,9 @@ func CopyFolder(ctx context.Context, src, dst *imapclient.Client, srcFolder, dst
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return res, fmt.Errorf("examine src %q: %w", srcFolder, err)
|
return res, fmt.Errorf("examine src %q: %w", srcFolder, err)
|
||||||
}
|
}
|
||||||
|
if deps.OnFolder != nil {
|
||||||
|
deps.OnFolder(srcFolder, dstFolder, int64(sel.NumMessages))
|
||||||
|
}
|
||||||
if sel.NumMessages == 0 {
|
if sel.NumMessages == 0 {
|
||||||
return res, nil
|
return res, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,10 +20,37 @@ type Orchestrator struct {
|
|||||||
hub *wshub.Hub
|
hub *wshub.Hub
|
||||||
encKey []byte
|
encKey []byte
|
||||||
concurrency int
|
concurrency int
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
cancels map[int64]context.CancelFunc // account_id -> cancel of its in-flight copy
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator {
|
func New(s *store.Store, hub *wshub.Hub, encKey []byte, concurrency int) *Orchestrator {
|
||||||
return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency}
|
return &Orchestrator{store: s, hub: hub, encKey: encKey, concurrency: concurrency, cancels: map[int64]context.CancelFunc{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CancelAccount aborts the in-flight copy for one account, if it is running.
|
||||||
|
// Returns true if a running copy was found and signalled to stop.
|
||||||
|
func (o *Orchestrator) CancelAccount(accountID int64) bool {
|
||||||
|
o.mu.Lock()
|
||||||
|
cancel, ok := o.cancels[accountID]
|
||||||
|
o.mu.Unlock()
|
||||||
|
if ok {
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *Orchestrator) registerCancel(accountID int64, cancel context.CancelFunc) {
|
||||||
|
o.mu.Lock()
|
||||||
|
o.cancels[accountID] = cancel
|
||||||
|
o.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *Orchestrator) unregisterCancel(accountID int64) {
|
||||||
|
o.mu.Lock()
|
||||||
|
delete(o.cancels, accountID)
|
||||||
|
o.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func gateOK(accs []store.Account) bool {
|
func gateOK(accs []store.Account) bool {
|
||||||
@@ -190,6 +217,16 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
}})
|
}})
|
||||||
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
|
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
|
||||||
|
|
||||||
|
// Per-account cancellable context: IMAP work uses actx (so CancelAccount
|
||||||
|
// stops it); DB writes keep the parent ctx so status/counters persist even
|
||||||
|
// after cancellation. ctx is context.WithoutCancel from runAll.
|
||||||
|
actx, cancel := context.WithCancel(ctx)
|
||||||
|
o.registerCancel(a.ID, cancel)
|
||||||
|
defer func() {
|
||||||
|
o.unregisterCancel(a.ID)
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc)
|
srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
||||||
@@ -199,7 +236,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
src, err := imapx.Connect(ctx, srcEP)
|
src, err := imapx.Connect(actx, srcEP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
||||||
}
|
}
|
||||||
@@ -207,7 +244,7 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil {
|
if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil {
|
||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
||||||
}
|
}
|
||||||
dst, err := imapx.Connect(ctx, dstEP)
|
dst, err := imapx.Connect(actx, dstEP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
||||||
}
|
}
|
||||||
@@ -216,6 +253,14 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// On cancel, close the connections so any in-flight network read (a slow
|
||||||
|
// FETCH/Collect that ctx.Err() checks can't interrupt) unblocks immediately.
|
||||||
|
go func() {
|
||||||
|
<-actx.Done()
|
||||||
|
_ = src.Close()
|
||||||
|
_ = dst.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
folders, err := imapx.ListFolders(src)
|
folders, err := imapx.ListFolders(src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
|
||||||
@@ -229,14 +274,24 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID,
|
o.hub.Publish(wshub.Event{Type: "progress", TaskID: task.ID,
|
||||||
Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}})
|
Data: map[string]any{"account_id": a.ID, "copied": c, "skipped": s}})
|
||||||
},
|
},
|
||||||
|
// Fires after EXAMINE (before the long fetch) with the folder's message count.
|
||||||
|
OnFolder: func(srcFolder, dstFolder string, total int64) {
|
||||||
|
o.hub.Publish(wshub.Event{Type: "folder", TaskID: task.ID, Data: map[string]any{
|
||||||
|
"account_id": a.ID, "src_login": a.SrcLogin,
|
||||||
|
"folder": srcFolder, "dst_folder": dstFolder, "messages": total,
|
||||||
|
}})
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, folder := range folders {
|
for _, folder := range folders {
|
||||||
|
if actx.Err() != nil {
|
||||||
|
break // cancelled — stop scheduling more folders
|
||||||
|
}
|
||||||
dstFolder := folder
|
dstFolder := folder
|
||||||
if m, ok := task.FolderMapping[folder]; ok {
|
if m, ok := task.FolderMapping[folder]; ok {
|
||||||
dstFolder = m
|
dstFolder = m
|
||||||
}
|
}
|
||||||
res, err := imapx.CopyFolder(ctx, src, dst, folder, dstFolder, deps)
|
res, err := imapx.CopyFolder(actx, src, dst, folder, dstFolder, deps)
|
||||||
if err != nil {
|
if err != nil && actx.Err() == nil {
|
||||||
slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", folder, "err", err)
|
slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", folder, "err", err)
|
||||||
errs++
|
errs++
|
||||||
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{
|
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{
|
||||||
@@ -248,6 +303,16 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
errs += int64(res.Errors)
|
errs += int64(res.Errors)
|
||||||
_ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors))
|
_ = o.store.IncAccountCounters(ctx, a.ID, int64(res.Copied), int64(res.Skipped), int64(res.Errors))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if actx.Err() != nil {
|
||||||
|
_ = o.store.SetAccountStatus(ctx, a.ID, "cancelled")
|
||||||
|
o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: task.ID,
|
||||||
|
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin,
|
||||||
|
"copied": copied, "skipped": skipped, "errors": errs}})
|
||||||
|
slog.Info("account cancelled", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped)
|
||||||
|
return copied, skipped, errs
|
||||||
|
}
|
||||||
|
|
||||||
_ = o.store.SetAccountStatus(ctx, a.ID, "done")
|
_ = o.store.SetAccountStatus(ctx, a.ID, "done")
|
||||||
o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID,
|
o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID,
|
||||||
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "dst_login": a.DstLogin,
|
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "dst_login": a.DstLogin,
|
||||||
@@ -257,6 +322,13 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (o *Orchestrator) accountFailed(ctx context.Context, taskID int64, a store.Account, srcEP, dstEP imapx.Endpoint, side string, err error) (int64, int64, int64) {
|
func (o *Orchestrator) accountFailed(ctx context.Context, taskID int64, a store.Account, srcEP, dstEP imapx.Endpoint, side string, err error) (int64, int64, int64) {
|
||||||
|
// A cancellation surfacing as an error is a cancel, not a failure.
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
_ = o.store.SetAccountStatus(ctx, a.ID, "cancelled")
|
||||||
|
o.hub.Publish(wshub.Event{Type: "cancelled", TaskID: taskID,
|
||||||
|
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin}})
|
||||||
|
return 0, 0, 0
|
||||||
|
}
|
||||||
login, host, port := a.SrcLogin, srcEP.Host, srcEP.Port
|
login, host, port := a.SrcLogin, srcEP.Host, srcEP.Port
|
||||||
if side == "dst" {
|
if side == "dst" {
|
||||||
login, host, port = a.DstLogin, dstEP.Host, dstEP.Port
|
login, host, port = a.DstLogin, dstEP.Host, dstEP.Port
|
||||||
|
|||||||
@@ -81,6 +81,9 @@ export const deleteTask = (id: number) => api(`/api/tasks/${id}`, { method: 'DEL
|
|||||||
export const deleteAccount = (taskId: number, accountId: number) =>
|
export const deleteAccount = (taskId: number, accountId: number) =>
|
||||||
api(`/api/tasks/${taskId}/accounts/${accountId}`, { method: 'DELETE' })
|
api(`/api/tasks/${taskId}/accounts/${accountId}`, { method: 'DELETE' })
|
||||||
|
|
||||||
|
export const cancelAccount = (taskId: number, accountId: number) =>
|
||||||
|
api(`/api/tasks/${taskId}/accounts/${accountId}/cancel`, { method: 'POST' })
|
||||||
|
|
||||||
export const listTasks = () => api<Task[]>('/api/tasks')
|
export const listTasks = () => api<Task[]>('/api/tasks')
|
||||||
|
|
||||||
export const getTask = (id: number) => api<TaskDetail>(`/api/tasks/${id}`)
|
export const getTask = (id: number) => api<TaskDetail>(`/api/tasks/${id}`)
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ export function StatusBadge({ status }: { status: string }) {
|
|||||||
const s = (status || 'pending').toLowerCase()
|
const s = (status || 'pending').toLowerCase()
|
||||||
let cls = 'badge-pending'
|
let cls = 'badge-pending'
|
||||||
if (s === 'ok' || s === 'done' || s === 'success') cls = 'badge-ok'
|
if (s === 'ok' || s === 'done' || s === 'success') cls = 'badge-ok'
|
||||||
else if (s === 'fail' || s === 'failed' || s === 'error') cls = 'badge-fail'
|
else if (s === 'fail' || s === 'failed' || s.includes('error')) cls = 'badge-fail'
|
||||||
else if (s === 'running' || s === 'testing' || s === 'in_progress') cls = 'badge-info'
|
else if (s === 'running' || s === 'testing' || s === 'in_progress') cls = 'badge-info'
|
||||||
return (
|
return (
|
||||||
<span className={`badge ${cls}`}>
|
<span className={`badge ${cls}`}>
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
import { useEffect, useRef, useState, type ChangeEvent, type FormEvent } from 'react'
|
import { useEffect, useRef, useState, type ChangeEvent, type FormEvent } from 'react'
|
||||||
import { createAccount, deleteAccount, getTask, importCSV, runTask, testAccounts, type TaskDetail as TaskDetailData } from '../api'
|
import { cancelAccount, createAccount, deleteAccount, getTask, importCSV, runTask, testAccounts, type TaskDetail as TaskDetailData } from '../api'
|
||||||
import { connectTaskWS, type TaskEvent } from '../ws'
|
import { connectTaskWS, type TaskEvent } from '../ws'
|
||||||
import { StatusBadge } from '../components/StatusBadge'
|
import { StatusBadge } from '../components/StatusBadge'
|
||||||
import { useConfirm } from '../components/ConfirmProvider'
|
import { useConfirm } from '../components/ConfirmProvider'
|
||||||
@@ -22,6 +22,12 @@ function describeEvent(ev: TaskEvent): string {
|
|||||||
return `DONE #${d.account_id} (${d.src_login} → ${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}`
|
return `DONE #${d.account_id} (${d.src_login} → ${d.dst_login}): copied ${d.copied}, skipped ${d.skipped}, errors ${d.errors}`
|
||||||
case 'progress':
|
case 'progress':
|
||||||
return `progress #${d.account_id}: copied ${d.copied}, skipped ${d.skipped}`
|
return `progress #${d.account_id}: copied ${d.copied}, skipped ${d.skipped}`
|
||||||
|
case 'folder': {
|
||||||
|
const route = d.dst_folder && d.dst_folder !== d.folder ? ` → "${d.dst_folder}"` : ''
|
||||||
|
return `folder "${d.folder}"${route}: ${d.messages ?? 0} messages — fetching (#${d.account_id})`
|
||||||
|
}
|
||||||
|
case 'cancelled':
|
||||||
|
return `CANCELLED #${d.account_id} (${d.src_login}): copied ${d.copied ?? 0}, skipped ${d.skipped ?? 0}`
|
||||||
case 'error': {
|
case 'error': {
|
||||||
const where = d.folder ? ` folder "${d.folder}"` : d.side ? ` (${d.side} ${at})` : ''
|
const where = d.folder ? ` folder "${d.folder}"` : d.side ? ` (${d.side} ${at})` : ''
|
||||||
return `ERROR #${d.account_id}${where}: ${d.error}`
|
return `ERROR #${d.account_id}${where}: ${d.error}`
|
||||||
@@ -60,7 +66,7 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
() =>
|
() =>
|
||||||
connectTaskWS(id, (ev: TaskEvent) => {
|
connectTaskWS(id, (ev: TaskEvent) => {
|
||||||
setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300))
|
setLog((l) => [{ type: ev.type, text: describeEvent(ev) }, ...l].slice(0, 300))
|
||||||
if (['account_started', 'account_test', 'account_done', 'progress', 'run_started', 'run_done', 'error'].includes(ev.type)) {
|
if (['account_started', 'account_test', 'account_done', 'progress', 'run_started', 'run_done', 'error', 'folder', 'cancelled'].includes(ev.type)) {
|
||||||
reload()
|
reload()
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
@@ -132,6 +138,15 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function onCancelAccount(accId: number) {
|
||||||
|
setError(null)
|
||||||
|
try {
|
||||||
|
await cancelAccount(id, accId)
|
||||||
|
} catch (err) {
|
||||||
|
setError(err instanceof Error ? err.message : 'Failed to cancel account')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async function onTest() {
|
async function onTest() {
|
||||||
setBusy('test')
|
setBusy('test')
|
||||||
setError(null)
|
setError(null)
|
||||||
@@ -351,6 +366,11 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
<td className="num-cell">{a.skipped}</td>
|
<td className="num-cell">{a.skipped}</td>
|
||||||
<td className="num-cell">{a.errors}</td>
|
<td className="num-cell">{a.errors}</td>
|
||||||
<td className="num-cell">
|
<td className="num-cell">
|
||||||
|
{a.status === 'running' ? (
|
||||||
|
<button type="button" className="link-btn danger" onClick={() => onCancelAccount(a.id)}>
|
||||||
|
cancel
|
||||||
|
</button>
|
||||||
|
) : (
|
||||||
<button
|
<button
|
||||||
type="button"
|
type="button"
|
||||||
className="link-btn danger"
|
className="link-btn danger"
|
||||||
@@ -359,6 +379,7 @@ export function TaskDetail({ id }: { id: number }) {
|
|||||||
>
|
>
|
||||||
remove
|
remove
|
||||||
</button>
|
</button>
|
||||||
|
)}
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
))
|
))
|
||||||
|
|||||||
Reference in New Issue
Block a user