From 319d157d47f4b8d34e2022e29735bef7183345e0 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 2 Dec 2019 07:09:43 -0800 Subject: [PATCH] Change inspector's list methods to return specific task type for each queue --- asynq.go | 16 +-- inspector.go | 48 +++++---- inspector_test.go | 251 +++++++++++++++++++++++++++++++++++----------- rdb.go | 85 ++++++++++++++-- 4 files changed, 308 insertions(+), 92 deletions(-) diff --git a/asynq.go b/asynq.go index ad99164..20071a4 100644 --- a/asynq.go +++ b/asynq.go @@ -103,17 +103,19 @@ type RetryTask struct { ID uuid.UUID Type string Payload map[string]interface{} - // TODO(hibiken): add LastRetry time.Time - NextRetry time.Time + // TODO(hibiken): add LastFailedAt time.Time + ProcessAt time.Time ErrorMsg string + Retried int + Retry int } // DeadTask is a task in that has exhausted all retries. // This is read only and used for inspection purpose. type DeadTask struct { - ID uuid.UUID - Type string - Payload map[string]interface{} - // TODO(hibiken): add LastRetry time.Time - ErrorMsg string + ID uuid.UUID + Type string + Payload map[string]interface{} + LastFailedAt time.Time + ErrorMsg string } diff --git a/inspector.go b/inspector.go index ac347dd..fe4dbb7 100644 --- a/inspector.go +++ b/inspector.go @@ -27,54 +27,58 @@ func (i *Inspector) toTaskSlice(msgs []*taskMessage) []*Task { } // ListEnqueuedTasks returns a list of tasks ready to be processed. -func (i *Inspector) ListEnqueuedTasks() ([]*Task, error) { +func (i *Inspector) ListEnqueuedTasks() ([]*EnqueuedTask, error) { // TODO(hibiken): Support pagination. msgs, err := i.rdb.listEnqueued() if err != nil { return nil, err } - return i.toTaskSlice(msgs), nil + var tasks []*EnqueuedTask + for _, m := range msgs { + tasks = append(tasks, &EnqueuedTask{ + ID: m.ID, + Type: m.Type, + Payload: m.Payload, + }) + } + return tasks, nil } // ListInProgressTasks returns a list of tasks that are being processed. -func (i *Inspector) ListInProgressTasks() ([]*Task, error) { +func (i *Inspector) ListInProgressTasks() ([]*InProgressTask, error) { // TODO(hibiken): Support pagination. msgs, err := i.rdb.listInProgress() if err != nil { return nil, err } - return i.toTaskSlice(msgs), nil + var tasks []*InProgressTask + for _, m := range msgs { + tasks = append(tasks, &InProgressTask{ + ID: m.ID, + Type: m.Type, + Payload: m.Payload, + }) + } + return tasks, nil } // ListScheduledTasks returns a list of tasks that are scheduled to // be processed in the future. -func (i *Inspector) ListScheduledTasks() ([]*Task, error) { +func (i *Inspector) ListScheduledTasks() ([]*ScheduledTask, error) { // TODO(hibiken): Support pagination. - msgs, err := i.rdb.listScheduled() - if err != nil { - return nil, err - } - return i.toTaskSlice(msgs), nil + return i.rdb.listScheduled() } // ListRetryTasks returns a list of tasks what will be retried in the // future. -func (i *Inspector) ListRetryTasks() ([]*Task, error) { +func (i *Inspector) ListRetryTasks() ([]*RetryTask, error) { // TODO(hibiken): Support pagination. - msgs, err := i.rdb.listRetry() - if err != nil { - return nil, err - } - return i.toTaskSlice(msgs), nil + return i.rdb.listRetry() } // ListDeadTasks returns a list of tasks that have reached its // maximum retry limit. -func (i *Inspector) ListDeadTasks() ([]*Task, error) { +func (i *Inspector) ListDeadTasks() ([]*DeadTask, error) { // TODO(hibiken): Support pagination. - msgs, err := i.rdb.listDead() - if err != nil { - return nil, err - } - return i.toTaskSlice(msgs), nil + return i.rdb.listDead() } diff --git a/inspector_test.go b/inspector_test.go index aac32e0..8d79dd6 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -1,14 +1,54 @@ package asynq import ( + "sort" "testing" "time" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" ) +// ---- TODO(hibiken): Remove this once the new version is released (https://github.com/google/go-cmp/issues/166) ---- +// EquateApproxTime returns a Comparer options that +// determine two time.Time values to be equal if they +// are within the given time interval of one another. +// Note that if both times have a monotonic clock reading, +// the monotonic time difference will be used. +// +// The zero time is treated specially: it is only considered +// equal to another zero time value. +// +// It will panic if margin is negative. +func EquateApproxTime(margin time.Duration) cmp.Option { + if margin < 0 { + panic("negative duration in EquateApproxTime") + } + return cmp.FilterValues(func(x, y time.Time) bool { + return !x.IsZero() && !y.IsZero() + }, cmp.Comparer(timeApproximator{margin}.compare)) +} + +type timeApproximator struct { + margin time.Duration +} + +func (a timeApproximator) compare(x, y time.Time) bool { + // Avoid subtracting times to avoid overflow when the + // difference is larger than the largest representible duration. + if x.After(y) { + // Ensure x is always before y + x, y = y, x + } + // We're within the margin if x+margin >= y. + // Note: time.Time doesn't have AfterOrEqual method hence the negation. + return !x.Add(a.margin).Before(y) +} + +//----------------------------- + func TestCurrentStats(t *testing.T) { r := setup(t) inspector := &Inspector{r} @@ -121,21 +161,21 @@ func TestListEnqueuedTasks(t *testing.T) { m1 := randomTask("send_email", "default", nil) m2 := randomTask("send_email", "default", nil) m3 := randomTask("gen_export", "default", nil) - t1 := &Task{Type: m1.Type, Payload: m1.Payload} - t2 := &Task{Type: m2.Type, Payload: m2.Payload} - t3 := &Task{Type: m3.Type, Payload: m3.Payload} + t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} + t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} + t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload} tests := []struct { queued []*taskMessage - want []*Task + want []*EnqueuedTask }{ { queued: []*taskMessage{m1, m2, m3}, - want: []*Task{t1, t2, t3}, + want: []*EnqueuedTask{t1, t2, t3}, }, { queued: []*taskMessage{}, - want: []*Task{}, + want: []*EnqueuedTask{}, }, } @@ -157,7 +197,14 @@ func TestListEnqueuedTasks(t *testing.T) { continue } - if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + sortOpt := cmp.Transformer("SortEnqueuedTasks", func(in []*EnqueuedTask) []*EnqueuedTask { + out := append([]*EnqueuedTask(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out + }) + if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("(*Inspector).ListEnqueuedTasks = %v, want %v; (-want, +got)\n%s", got, tc.want, diff) continue @@ -171,21 +218,21 @@ func TestListInProgressTasks(t *testing.T) { m1 := randomTask("send_email", "default", nil) m2 := randomTask("send_email", "default", nil) m3 := randomTask("gen_export", "default", nil) - t1 := &Task{Type: m1.Type, Payload: m1.Payload} - t2 := &Task{Type: m2.Type, Payload: m2.Payload} - t3 := &Task{Type: m3.Type, Payload: m3.Payload} + t1 := &InProgressTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} + t2 := &InProgressTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} + t3 := &InProgressTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload} tests := []struct { inProgress []*taskMessage - want []*Task + want []*InProgressTask }{ { inProgress: []*taskMessage{m1, m2, m3}, - want: []*Task{t1, t2, t3}, + want: []*InProgressTask{t1, t2, t3}, }, { inProgress: []*taskMessage{}, - want: []*Task{}, + want: []*InProgressTask{}, }, } @@ -207,7 +254,14 @@ func TestListInProgressTasks(t *testing.T) { continue } - if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + sortOpt := cmp.Transformer("SortInProgressTasks", func(in []*InProgressTask) []*InProgressTask { + out := append([]*InProgressTask(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out + }) + if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("(*Inspector).ListInProgressTasks = %v, want %v; (-want, +got)\n%s", got, tc.want, diff) continue @@ -221,21 +275,32 @@ func TestListScheduledTasks(t *testing.T) { m1 := randomTask("send_email", "default", nil) m2 := randomTask("send_email", "default", nil) m3 := randomTask("gen_export", "default", nil) - t1 := &Task{Type: m1.Type, Payload: m1.Payload} - t2 := &Task{Type: m2.Type, Payload: m2.Payload} - t3 := &Task{Type: m3.Type, Payload: m3.Payload} + t1 := time.Now().Add(5 * time.Minute) + t2 := time.Now().Add(time.Hour) + t3 := time.Now().Add(24 * time.Hour) + s1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: t1} + s2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: t2} + s3 := &ScheduledTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload, ProcessAt: t3} + type scheduledMsg struct { + msg *taskMessage + processAt time.Time + } tests := []struct { - scheduled []*taskMessage - want []*Task + scheduled []scheduledMsg + want []*ScheduledTask }{ { - scheduled: []*taskMessage{m1, m2, m3}, - want: []*Task{t1, t2, t3}, + scheduled: []scheduledMsg{ + {m1, t1}, + {m2, t2}, + {m3, t3}, + }, + want: []*ScheduledTask{s1, s2, s3}, }, { - scheduled: []*taskMessage{}, - want: []*Task{}, + scheduled: []scheduledMsg{}, + want: []*ScheduledTask{}, }, } @@ -244,8 +309,8 @@ func TestListScheduledTasks(t *testing.T) { if err := r.client.FlushDB().Err(); err != nil { t.Fatal(err) } - for _, msg := range tc.scheduled { - err := r.client.ZAdd(scheduled, &redis.Z{Member: mustMarshal(t, msg), Score: 123}).Err() + for _, s := range tc.scheduled { + err := r.client.ZAdd(scheduled, &redis.Z{Member: mustMarshal(t, s.msg), Score: float64(s.processAt.Unix())}).Err() if err != nil { t.Fatal(err) } @@ -257,7 +322,15 @@ func TestListScheduledTasks(t *testing.T) { continue } - if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + sortOpt := cmp.Transformer("SortScheduledTasks", func(in []*ScheduledTask) []*ScheduledTask { + out := append([]*ScheduledTask(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out + }) + timeCmpOpt := EquateApproxTime(time.Second) + if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" { t.Errorf("(*Inspector).ListScheduledTasks = %v, want %v; (-want, +got)\n%s", got, tc.want, diff) continue @@ -268,24 +341,61 @@ func TestListScheduledTasks(t *testing.T) { func TestListRetryTasks(t *testing.T) { r := setup(t) inspector := &Inspector{r} - m1 := randomTask("send_email", "default", nil) - m2 := randomTask("send_email", "default", nil) - m3 := randomTask("gen_export", "default", nil) - t1 := &Task{Type: m1.Type, Payload: m1.Payload} - t2 := &Task{Type: m2.Type, Payload: m2.Payload} - t3 := &Task{Type: m3.Type, Payload: m3.Payload} + m1 := &taskMessage{ + ID: uuid.New(), + Type: "send_email", + Payload: map[string]interface{}{"to": "customer@example.com"}, + ErrorMsg: "couldn't send email", + Retry: 25, + Retried: 10, + } + m2 := &taskMessage{ + ID: uuid.New(), + Type: "gen_thumbnail", + Payload: map[string]interface{}{"src": "some/path/to/img/file"}, + ErrorMsg: "couldn't find a file", + Retry: 20, + Retried: 3, + } + t1 := time.Now().Add(time.Hour) + t2 := time.Now().Add(24 * time.Hour) + r1 := &RetryTask{ + ID: m1.ID, + Type: m1.Type, + Payload: m1.Payload, + ProcessAt: t1, + ErrorMsg: m1.ErrorMsg, + Retry: m1.Retry, + Retried: m1.Retried, + } + r2 := &RetryTask{ + ID: m2.ID, + Type: m2.Type, + Payload: m2.Payload, + ProcessAt: t2, + ErrorMsg: m2.ErrorMsg, + Retry: m2.Retry, + Retried: m2.Retried, + } + type retryEntry struct { + msg *taskMessage + processAt time.Time + } tests := []struct { - retry []*taskMessage - want []*Task + retry []retryEntry + want []*RetryTask }{ { - retry: []*taskMessage{m1, m2, m3}, - want: []*Task{t1, t2, t3}, + retry: []retryEntry{ + {m1, t1}, + {m2, t2}, + }, + want: []*RetryTask{r1, r2}, }, { - retry: []*taskMessage{}, - want: []*Task{}, + retry: []retryEntry{}, + want: []*RetryTask{}, }, } @@ -294,8 +404,10 @@ func TestListRetryTasks(t *testing.T) { if err := r.client.FlushDB().Err(); err != nil { t.Fatal(err) } - for _, msg := range tc.retry { - err := r.client.ZAdd(retry, &redis.Z{Member: mustMarshal(t, msg), Score: 123}).Err() + for _, e := range tc.retry { + err := r.client.ZAdd(retry, &redis.Z{ + Member: mustMarshal(t, e.msg), + Score: float64(e.processAt.Unix())}).Err() if err != nil { t.Fatal(err) } @@ -307,7 +419,15 @@ func TestListRetryTasks(t *testing.T) { continue } - if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + sortOpt := cmp.Transformer("SortRetryTasks", func(in []*RetryTask) []*RetryTask { + out := append([]*RetryTask(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out + }) + timeCmpOpt := EquateApproxTime(time.Second) + if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" { t.Errorf("(*Inspector).ListRetryTasks = %v, want %v; (-want, +got)\n%s", got, tc.want, diff) continue @@ -318,24 +438,31 @@ func TestListRetryTasks(t *testing.T) { func TestListDeadTasks(t *testing.T) { r := setup(t) inspector := &Inspector{r} - m1 := randomTask("send_email", "default", nil) - m2 := randomTask("send_email", "default", nil) - m3 := randomTask("gen_export", "default", nil) - t1 := &Task{Type: m1.Type, Payload: m1.Payload} - t2 := &Task{Type: m2.Type, Payload: m2.Payload} - t3 := &Task{Type: m3.Type, Payload: m3.Payload} + m1 := &taskMessage{ID: uuid.New(), Type: "send_email", Payload: map[string]interface{}{"to": "customer@example.com"}, ErrorMsg: "couldn't send email"} + m2 := &taskMessage{ID: uuid.New(), Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "path/to/img/file"}, ErrorMsg: "couldn't find file"} + t1 := time.Now().Add(-5 * time.Second) + t2 := time.Now().Add(-24 * time.Hour) + d1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ErrorMsg: m1.ErrorMsg, LastFailedAt: t1} + d2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ErrorMsg: m2.ErrorMsg, LastFailedAt: t2} + type deadEntry struct { + msg *taskMessage + lastFailedAt time.Time + } tests := []struct { - dead []*taskMessage - want []*Task + dead []deadEntry + want []*DeadTask }{ { - dead: []*taskMessage{m1, m2, m3}, - want: []*Task{t1, t2, t3}, + dead: []deadEntry{ + {m1, t1}, + {m2, t2}, + }, + want: []*DeadTask{d1, d2}, }, { - dead: []*taskMessage{}, - want: []*Task{}, + dead: []deadEntry{}, + want: []*DeadTask{}, }, } @@ -344,8 +471,10 @@ func TestListDeadTasks(t *testing.T) { if err := r.client.FlushDB().Err(); err != nil { t.Fatal(err) } - for _, msg := range tc.dead { - err := r.client.ZAdd(dead, &redis.Z{Member: mustMarshal(t, msg), Score: 123}).Err() + for _, d := range tc.dead { + err := r.client.ZAdd(dead, &redis.Z{ + Member: mustMarshal(t, d.msg), + Score: float64(d.lastFailedAt.Unix())}).Err() if err != nil { t.Fatal(err) } @@ -353,11 +482,19 @@ func TestListDeadTasks(t *testing.T) { got, err := inspector.ListDeadTasks() if err != nil { - t.Error(err) + t.Errorf("(*Inspector).ListDeadTask = %v, %v; want %v, nil", got, err, tc.want) continue } - if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + sortOpt := cmp.Transformer("SortDeadTasks", func(in []*DeadTask) []*DeadTask { + out := append([]*DeadTask(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out + }) + timeCmpOpt := EquateApproxTime(time.Second) + if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" { t.Errorf("(*Inspector).ListDeadTasks = %v, want %v; (-want, +got)\n%s", got, tc.want, diff) continue diff --git a/rdb.go b/rdb.go index 31bceff..6b8ffca 100644 --- a/rdb.go +++ b/rdb.go @@ -184,16 +184,89 @@ func (r *rdb) listInProgress() ([]*taskMessage, error) { return r.rangeList(inProgress) } -func (r *rdb) listScheduled() ([]*taskMessage, error) { - return r.rangeZSet(scheduled) +func (r *rdb) listScheduled() ([]*ScheduledTask, error) { + data, err := r.client.ZRangeWithScores(scheduled, 0, -1).Result() + if err != nil { + return nil, err + } + var tasks []*ScheduledTask + for _, z := range data { + s, ok := z.Member.(string) + if !ok { + continue // bad data, ignore and continue + } + var msg taskMessage + err := json.Unmarshal([]byte(s), &msg) + if err != nil { + continue // bad data, ignore and continue + } + processAt := time.Unix(int64(z.Score), 0) + tasks = append(tasks, &ScheduledTask{ + ID: msg.ID, + Type: msg.Type, + Payload: msg.Payload, + ProcessAt: processAt, + }) + } + return tasks, nil } -func (r *rdb) listRetry() ([]*taskMessage, error) { - return r.rangeZSet(retry) +func (r *rdb) listRetry() ([]*RetryTask, error) { + data, err := r.client.ZRangeWithScores(retry, 0, -1).Result() + if err != nil { + return nil, err + } + var tasks []*RetryTask + for _, z := range data { + s, ok := z.Member.(string) + if !ok { + continue // bad data, ignore and continue + } + var msg taskMessage + err := json.Unmarshal([]byte(s), &msg) + if err != nil { + continue // bad data, ignore and continue + } + processAt := time.Unix(int64(z.Score), 0) + tasks = append(tasks, &RetryTask{ + ID: msg.ID, + Type: msg.Type, + Payload: msg.Payload, + ErrorMsg: msg.ErrorMsg, + Retry: msg.Retry, + Retried: msg.Retried, + ProcessAt: processAt, + }) + } + return tasks, nil } -func (r *rdb) listDead() ([]*taskMessage, error) { - return r.rangeZSet(dead) +func (r *rdb) listDead() ([]*DeadTask, error) { + data, err := r.client.ZRangeWithScores(dead, 0, -1).Result() + if err != nil { + return nil, err + } + var tasks []*DeadTask + for _, z := range data { + s, ok := z.Member.(string) + if !ok { + continue // bad data, ignore and continue + } + var msg taskMessage + err := json.Unmarshal([]byte(s), &msg) + if err != nil { + continue // bad data, ignore and continue + } + lastFailedAt := time.Unix(int64(z.Score), 0) + tasks = append(tasks, &DeadTask{ + ID: msg.ID, + Type: msg.Type, + Payload: msg.Payload, + ErrorMsg: msg.ErrorMsg, + LastFailedAt: lastFailedAt, + }) + } + return tasks, nil } func (r *rdb) rangeList(key string) ([]*taskMessage, error) {