2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Log warning and info messages when unfinished tasks get aborted

This commit is contained in:
Ken Hibino 2019-12-18 18:55:08 -08:00
parent b2bc0ef91c
commit 33e9da953d
3 changed files with 25 additions and 7 deletions

View File

@ -216,8 +216,9 @@ func (r *RDB) Kill(msg *TaskMessage, errMsg string) error {
return err return err
} }
// RestoreUnfinished moves all tasks from in-progress list to the queue. // RestoreUnfinished moves all tasks from in-progress list to the queue
func (r *RDB) RestoreUnfinished() error { // and reports the number of tasks restored.
func (r *RDB) RestoreUnfinished() (int64, error) {
script := redis.NewScript(` script := redis.NewScript(`
local len = redis.call("LLEN", KEYS[1]) local len = redis.call("LLEN", KEYS[1])
for i = len, 1, -1 do for i = len, 1, -1 do
@ -225,8 +226,15 @@ func (r *RDB) RestoreUnfinished() error {
end end
return len return len
`) `)
_, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result() res, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result()
return err if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
} }
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that

View File

@ -255,24 +255,28 @@ func TestRestoreUnfinished(t *testing.T) {
tests := []struct { tests := []struct {
inProgress []*TaskMessage inProgress []*TaskMessage
enqueued []*TaskMessage enqueued []*TaskMessage
want int64
wantInProgress []*TaskMessage wantInProgress []*TaskMessage
wantEnqueued []*TaskMessage wantEnqueued []*TaskMessage
}{ }{
{ {
inProgress: []*TaskMessage{t1, t2, t3}, inProgress: []*TaskMessage{t1, t2, t3},
enqueued: []*TaskMessage{}, enqueued: []*TaskMessage{},
want: 3,
wantInProgress: []*TaskMessage{}, wantInProgress: []*TaskMessage{},
wantEnqueued: []*TaskMessage{t1, t2, t3}, wantEnqueued: []*TaskMessage{t1, t2, t3},
}, },
{ {
inProgress: []*TaskMessage{}, inProgress: []*TaskMessage{},
enqueued: []*TaskMessage{t1, t2, t3}, enqueued: []*TaskMessage{t1, t2, t3},
want: 0,
wantInProgress: []*TaskMessage{}, wantInProgress: []*TaskMessage{},
wantEnqueued: []*TaskMessage{t1, t2, t3}, wantEnqueued: []*TaskMessage{t1, t2, t3},
}, },
{ {
inProgress: []*TaskMessage{t2, t3}, inProgress: []*TaskMessage{t2, t3},
enqueued: []*TaskMessage{t1}, enqueued: []*TaskMessage{t1},
want: 2,
wantInProgress: []*TaskMessage{}, wantInProgress: []*TaskMessage{},
wantEnqueued: []*TaskMessage{t1, t2, t3}, wantEnqueued: []*TaskMessage{t1, t2, t3},
}, },
@ -283,8 +287,10 @@ func TestRestoreUnfinished(t *testing.T) {
seedInProgressQueue(t, r, tc.inProgress) seedInProgressQueue(t, r, tc.inProgress)
seedDefaultQueue(t, r, tc.enqueued) seedDefaultQueue(t, r, tc.enqueued)
if err := r.RestoreUnfinished(); err != nil { got, err := r.RestoreUnfinished()
t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err)
if got != tc.want || err != nil {
t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want)
continue continue
} }

View File

@ -126,6 +126,7 @@ func (p *processor) exec() {
select { select {
case <-p.quit: case <-p.quit:
// time is up, quit this worker goroutine. // time is up, quit this worker goroutine.
log.Printf("[WARN] Terminating in-progress task %+v\n", msg)
return return
case resErr := <-resCh: case resErr := <-resCh:
// Note: One of three things should happen. // Note: One of three things should happen.
@ -150,10 +151,13 @@ func (p *processor) exec() {
// restore moves all tasks from "in-progress" back to queue // restore moves all tasks from "in-progress" back to queue
// to restore all unfinished tasks. // to restore all unfinished tasks.
func (p *processor) restore() { func (p *processor) restore() {
err := p.rdb.RestoreUnfinished() n, err := p.rdb.RestoreUnfinished()
if err != nil { if err != nil {
log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err) log.Printf("[ERROR] Could not restore unfinished tasks: %v\n", err)
} }
if n > 0 {
log.Printf("[INFO] Restored %d unfinished tasks back to queue.\n", n)
}
} }
func (p *processor) requeue(msg *rdb.TaskMessage) { func (p *processor) requeue(msg *rdb.TaskMessage) {