feat: delete task/account, edit endpoint, richer event log

- store: DeleteAccount, DeleteTask, UpdateEndpoint (+ cascade/update tests)
- httpapi: DELETE /tasks/{id}, DELETE /tasks/{id}/accounts/{accountId},
  PUT /endpoints/{id}; delete guarded with 409 while task running
- orchestrator: enrich WS events with login/host/port/error (test + run + errors)
- web: delete buttons (task, account) with confirm, endpoint edit form,
  human-readable event log (source/dest, host:port, error text)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01MMHQTtnQtQqL8muAXHr9kd
This commit is contained in:
2026-07-02 09:23:14 +07:00
parent 9019511d6f
commit b88f88c1a3
13 changed files with 344 additions and 36 deletions
+23
View File
@@ -31,6 +31,29 @@ func (s *Server) handleCreateEndpoint(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusCreated, map[string]int64{"id": id})
}
func (s *Server) handleUpdateEndpoint(w http.ResponseWriter, r *http.Request) {
id, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
var e store.Endpoint
if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if e.TLSMode != "ssl" && e.TLSMode != "starttls" && e.TLSMode != "plain" {
http.Error(w, "tls_mode must be ssl|starttls|plain", http.StatusBadRequest)
return
}
e.ID = id
if err := s.store.UpdateEndpoint(r.Context(), e); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (s *Server) handleListEndpoints(w http.ResponseWriter, r *http.Request) {
eps, err := s.store.ListEndpoints(r.Context())
if err != nil {
+3
View File
@@ -14,10 +14,13 @@ func (s *Server) Router() http.Handler {
api := http.NewServeMux()
api.HandleFunc("GET /api/endpoints", s.handleListEndpoints)
api.HandleFunc("POST /api/endpoints", s.handleCreateEndpoint)
api.HandleFunc("PUT /api/endpoints/{id}", s.handleUpdateEndpoint)
api.HandleFunc("GET /api/tasks", s.handleListTasks)
api.HandleFunc("POST /api/tasks", s.handleCreateTask)
api.HandleFunc("GET /api/tasks/{id}", s.handleGetTask)
api.HandleFunc("DELETE /api/tasks/{id}", s.handleDeleteTask)
api.HandleFunc("POST /api/tasks/{id}/accounts", s.handleCreateAccount)
api.HandleFunc("DELETE /api/tasks/{id}/accounts/{accountId}", s.handleDeleteAccount)
api.HandleFunc("POST /api/tasks/{id}/import", s.handleImportCSV)
api.HandleFunc("POST /api/tasks/{id}/test", s.handleTestAccounts)
api.HandleFunc("POST /api/tasks/{id}/run", s.handleRun)
+49
View File
@@ -84,3 +84,52 @@ func (s *Server) handleRun(w http.ResponseWriter, r *http.Request) {
}
writeJSON(w, http.StatusAccepted, map[string]int64{"run_id": runID})
}
func (s *Server) handleDeleteTask(w http.ResponseWriter, r *http.Request) {
id, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
task, err := s.store.GetTask(r.Context(), id)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
if task.Status == "running" {
http.Error(w, "cannot delete a running task", http.StatusConflict)
return
}
if err := s.store.DeleteTask(r.Context(), id); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func (s *Server) handleDeleteAccount(w http.ResponseWriter, r *http.Request) {
taskID, err := pathID(r, "id")
if err != nil {
http.Error(w, "bad id", http.StatusBadRequest)
return
}
accID, err := pathID(r, "accountId")
if err != nil {
http.Error(w, "bad account id", http.StatusBadRequest)
return
}
task, err := s.store.GetTask(r.Context(), taskID)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
if task.Status == "running" {
http.Error(w, "cannot modify accounts while task is running", http.StatusConflict)
return
}
if err := s.store.DeleteAccount(r.Context(), accID); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
+35 -17
View File
@@ -75,17 +75,23 @@ func (o *Orchestrator) TestAccounts(ctx context.Context, taskID int64) error {
func (o *Orchestrator) testSide(ctx context.Context, ep imapx.Endpoint, accID int64, side, login, passEnc string, taskID int64) {
status := "ok"
errMsg := ""
pass, err := crypto.Decrypt(o.encKey, passEnc)
if err == nil {
_, err = imapx.TestLogin(ctx, ep, login, string(pass))
}
if err != nil {
status = "fail"
slog.Warn("account test failed", "account", accID, "side", side, "err", err)
errMsg = err.Error()
slog.Warn("account test failed", "account", accID, "side", side,
"login", login, "host", ep.Host, "port", ep.Port, "err", err)
}
_ = o.store.SetAccountTestStatus(ctx, accID, side, status)
o.hub.Publish(wshub.Event{Type: "account_test", TaskID: taskID,
Data: map[string]any{"account_id": accID, "side": side, "status": status}})
Data: map[string]any{
"account_id": accID, "side": side, "status": status,
"login": login, "host": ep.Host, "port": ep.Port, "error": errMsg,
}})
}
func (o *Orchestrator) Run(ctx context.Context, taskID int64) (int64, error) {
@@ -177,38 +183,42 @@ func (o *Orchestrator) runAll(ctx context.Context, task store.Task, runID int64,
}
func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID int64, a store.Account, srcEP, dstEP imapx.Endpoint) (int64, int64, int64) {
o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{"account_id": a.ID}})
o.hub.Publish(wshub.Event{Type: "account_started", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID,
"src_login": a.SrcLogin, "src_host": srcEP.Host, "src_port": srcEP.Port,
"dst_login": a.DstLogin, "dst_host": dstEP.Host, "dst_port": dstEP.Port,
}})
_ = o.store.SetAccountStatus(ctx, a.ID, "running")
srcPass, err := crypto.Decrypt(o.encKey, a.SrcPassEnc)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
dstPass, err := crypto.Decrypt(o.encKey, a.DstPassEnc)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
}
src, err := imapx.Connect(ctx, srcEP)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
defer func() { _ = src.Logout().Wait() }()
if err := src.Login(a.SrcLogin, string(srcPass)).Wait(); err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
dst, err := imapx.Connect(ctx, dstEP)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
}
defer func() { _ = dst.Logout().Wait() }()
if err := dst.Login(a.DstLogin, string(dstPass)).Wait(); err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "dst", err)
}
folders, err := imapx.ListFolders(src)
if err != nil {
return o.accountFailed(ctx, task.ID, a.ID, err)
return o.accountFailed(ctx, task.ID, a, srcEP, dstEP, "src", err)
}
var copied, skipped, errs int64
@@ -227,8 +237,11 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
}
res, err := imapx.CopyFolder(ctx, src, dst, folder, dstFolder, deps)
if err != nil {
slog.Warn("folder copy error", "account", a.ID, "folder", folder, "err", err)
slog.Warn("folder copy error", "account", a.ID, "src_login", a.SrcLogin, "folder", folder, "err", err)
errs++
o.hub.Publish(wshub.Event{Type: "error", TaskID: task.ID, Data: map[string]any{
"account_id": a.ID, "src_login": a.SrcLogin, "folder": folder, "error": err.Error(),
}})
}
copied += int64(res.Copied)
skipped += int64(res.Skipped)
@@ -237,15 +250,20 @@ func (o *Orchestrator) runAccount(ctx context.Context, task store.Task, runID in
}
_ = o.store.SetAccountStatus(ctx, a.ID, "done")
o.hub.Publish(wshub.Event{Type: "account_done", TaskID: task.ID,
Data: map[string]any{"account_id": a.ID, "copied": copied, "skipped": skipped, "errors": errs}})
slog.Info("account copied", "account", a.ID, "copied", copied, "skipped", skipped, "errors", errs)
Data: map[string]any{"account_id": a.ID, "src_login": a.SrcLogin, "dst_login": a.DstLogin,
"copied": copied, "skipped": skipped, "errors": errs}})
slog.Info("account copied", "account", a.ID, "src_login", a.SrcLogin, "copied", copied, "skipped", skipped, "errors", errs)
return copied, skipped, errs
}
func (o *Orchestrator) accountFailed(ctx context.Context, taskID, accID int64, err error) (int64, int64, int64) {
slog.Error("account failed", "account", accID, "err", err)
_ = o.store.SetAccountStatus(ctx, accID, "error")
func (o *Orchestrator) accountFailed(ctx context.Context, taskID int64, a store.Account, srcEP, dstEP imapx.Endpoint, side string, err error) (int64, int64, int64) {
login, host, port := a.SrcLogin, srcEP.Host, srcEP.Port
if side == "dst" {
login, host, port = a.DstLogin, dstEP.Host, dstEP.Port
}
slog.Error("account failed", "account", a.ID, "side", side, "login", login, "host", host, "port", port, "err", err)
_ = o.store.SetAccountStatus(ctx, a.ID, "error")
o.hub.Publish(wshub.Event{Type: "error", TaskID: taskID,
Data: map[string]any{"account_id": accID, "error": err.Error()}})
Data: map[string]any{"account_id": a.ID, "side": side, "login": login, "host": host, "port": port, "error": err.Error()}})
return 0, 0, 1
}
+6
View File
@@ -29,6 +29,12 @@ func (s *Store) CreateAccount(ctx context.Context, a Account) (int64, error) {
return id, err
}
// DeleteAccount removes one account (and its migrated_messages via ON DELETE CASCADE).
func (s *Store) DeleteAccount(ctx context.Context, id int64) error {
_, err := s.Pool.Exec(ctx, `DELETE FROM accounts WHERE id=$1`, id)
return err
}
func (s *Store) ListAccountsByTask(ctx context.Context, taskID int64) ([]Account, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, task_id, src_login, src_pass_enc, dst_login, dst_pass_enc,
+66
View File
@@ -0,0 +1,66 @@
package store
import (
"context"
"testing"
)
func TestUpdateEndpoint(t *testing.T) {
s := testStore(t)
ctx := context.Background()
id, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "src", Host: "old.host", Port: 143, TLSMode: "plain"})
if err := s.UpdateEndpoint(ctx, Endpoint{ID: id, RoleLabel: "src2", Host: "new.host", Port: 993, TLSMode: "ssl"}); err != nil {
t.Fatalf("update: %v", err)
}
got, _ := s.GetEndpoint(ctx, id)
if got.Host != "new.host" || got.Port != 993 || got.TLSMode != "ssl" || got.RoleLabel != "src2" {
t.Fatalf("update not applied: %+v", got)
}
}
func TestDeleteAccountCascadesJournal(t *testing.T) {
s := testStore(t)
ctx := context.Background()
ep1, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "s", Host: "a", Port: 993, TLSMode: "ssl"})
ep2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: ep1, DstEndpointID: ep2})
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"})
_ = s.MarkMigrated(ctx, accID, "INBOX", "<m1>")
if err := s.DeleteAccount(ctx, accID); err != nil {
t.Fatalf("delete account: %v", err)
}
accs, _ := s.ListAccountsByTask(ctx, taskID)
if len(accs) != 0 {
t.Fatalf("account not deleted: %d remain", len(accs))
}
// journal row must be gone via ON DELETE CASCADE
var n int
_ = s.Pool.QueryRow(ctx, `SELECT count(*) FROM migrated_messages WHERE account_id=$1`, accID).Scan(&n)
if n != 0 {
t.Fatalf("migrated_messages not cascaded: %d rows", n)
}
}
func TestDeleteTaskCascades(t *testing.T) {
s := testStore(t)
ctx := context.Background()
ep1, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "s", Host: "a", Port: 993, TLSMode: "ssl"})
ep2, _ := s.CreateEndpoint(ctx, Endpoint{RoleLabel: "d", Host: "b", Port: 993, TLSMode: "ssl"})
taskID, _ := s.CreateTask(ctx, Task{Name: "t", SrcEndpointID: ep1, DstEndpointID: ep2})
accID, _ := s.CreateAccount(ctx, Account{TaskID: taskID, SrcLogin: "u", SrcPassEnc: "x", DstLogin: "v", DstPassEnc: "y"})
_, _ = s.CreateRun(ctx, taskID)
if err := s.DeleteTask(ctx, taskID); err != nil {
t.Fatalf("delete task: %v", err)
}
tasks, _ := s.ListTasks(ctx)
if len(tasks) != 0 {
t.Fatalf("task not deleted: %d remain", len(tasks))
}
accs, _ := s.ListAccountsByTask(ctx, taskID)
if len(accs) != 0 {
t.Fatalf("accounts not cascaded: %d remain", len(accs))
}
_ = accID
}
+7
View File
@@ -19,6 +19,13 @@ func (s *Store) CreateEndpoint(ctx context.Context, e Endpoint) (int64, error) {
return id, err
}
func (s *Store) UpdateEndpoint(ctx context.Context, e Endpoint) error {
_, err := s.Pool.Exec(ctx,
`UPDATE endpoints SET role_label=$2, host=$3, port=$4, tls_mode=$5 WHERE id=$1`,
e.ID, e.RoleLabel, e.Host, e.Port, e.TLSMode)
return err
}
func (s *Store) GetEndpoint(ctx context.Context, id int64) (Endpoint, error) {
var e Endpoint
err := s.Pool.QueryRow(ctx,
+6
View File
@@ -32,6 +32,12 @@ func (s *Store) GetTask(ctx context.Context, id int64) (Task, error) {
return t, err
}
// DeleteTask removes a task and its accounts/runs/migrated_messages via ON DELETE CASCADE.
func (s *Store) DeleteTask(ctx context.Context, id int64) error {
_, err := s.Pool.Exec(ctx, `DELETE FROM tasks WHERE id=$1`, id)
return err
}
func (s *Store) ListTasks(ctx context.Context) ([]Task, error) {
rows, err := s.Pool.Query(ctx,
`SELECT id, name, src_endpoint_id, dst_endpoint_id, status, folder_mapping