From 33e9da953d3d00b2c4c0c8a45955c26552094988 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 18 Dec 2019 18:55:08 -0800 Subject: [PATCH] Log warning and info messages when unfinished tasks get aborted --- internal/rdb/rdb.go | 16 ++++++++++++---- internal/rdb/rdb_test.go | 10 ++++++++-- processor.go | 6 +++++- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index b75922c..1adf18a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -216,8 +216,9 @@ func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { return err } -// RestoreUnfinished moves all tasks from in-progress list to the queue. -func (r *RDB) RestoreUnfinished() error { +// RestoreUnfinished moves all tasks from in-progress list to the queue +// and reports the number of tasks restored. +func (r *RDB) RestoreUnfinished() (int64, error) { script := redis.NewScript(` local len = redis.call("LLEN", KEYS[1]) for i = len, 1, -1 do @@ -225,8 +226,15 @@ func (r *RDB) RestoreUnfinished() error { end return len `) - _, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result() - return err + res, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result() + if err != nil { + return 0, err + } + n, ok := res.(int64) + if !ok { + return 0, fmt.Errorf("could not cast %v to int64", res) + } + return n, nil } // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 951523a..f5d3324 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -255,24 +255,28 @@ func TestRestoreUnfinished(t *testing.T) { tests := []struct { inProgress []*TaskMessage enqueued []*TaskMessage + want int64 wantInProgress []*TaskMessage wantEnqueued []*TaskMessage }{ { inProgress: []*TaskMessage{t1, t2, t3}, enqueued: []*TaskMessage{}, + want: 3, wantInProgress: []*TaskMessage{}, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { inProgress: []*TaskMessage{}, enqueued: []*TaskMessage{t1, t2, t3}, + want: 0, wantInProgress: []*TaskMessage{}, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, { inProgress: []*TaskMessage{t2, t3}, enqueued: []*TaskMessage{t1}, + want: 2, wantInProgress: []*TaskMessage{}, wantEnqueued: []*TaskMessage{t1, t2, t3}, }, @@ -283,8 +287,10 @@ func TestRestoreUnfinished(t *testing.T) { seedInProgressQueue(t, r, tc.inProgress) seedDefaultQueue(t, r, tc.enqueued) - if err := r.RestoreUnfinished(); err != nil { - t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err) + got, err := r.RestoreUnfinished() + + if got != tc.want || err != nil { + t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want) continue } diff --git a/processor.go b/processor.go index f0650fa..4e86282 100644 --- a/processor.go +++ b/processor.go @@ -126,6 +126,7 @@ func (p *processor) exec() { select { case <-p.quit: // time is up, quit this worker goroutine. + log.Printf("[WARN] Terminating in-progress task %+v\n", msg) return case resErr := <-resCh: // Note: One of three things should happen. @@ -150,10 +151,13 @@ func (p *processor) exec() { // restore moves all tasks from "in-progress" back to queue // to restore all unfinished tasks. func (p *processor) restore() { - err := p.rdb.RestoreUnfinished() + n, err := p.rdb.RestoreUnfinished() if err != nil { log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) } + if n > 0 { + log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n) + } } func (p *processor) requeue(msg *rdb.TaskMessage) {