diff --git a/inspector.go b/inspector.go index 813661e..ac347dd 100644 --- a/inspector.go +++ b/inspector.go @@ -16,3 +16,65 @@ func NewInspector(opt *RedisOpt) *Inspector { func (i *Inspector) CurrentStats() (*Stats, error) { return i.rdb.currentStats() } + +// toTaskSlice converts a taskMessage slice to a Task slice. +func (i *Inspector) toTaskSlice(msgs []*taskMessage) []*Task { + var tasks []*Task + for _, m := range msgs { + tasks = append(tasks, &Task{Type: m.Type, Payload: m.Payload}) + } + return tasks +} + +// ListEnqueuedTasks returns a list of tasks ready to be processed. +func (i *Inspector) ListEnqueuedTasks() ([]*Task, error) { + // TODO(hibiken): Support pagination. + msgs, err := i.rdb.listEnqueued() + if err != nil { + return nil, err + } + return i.toTaskSlice(msgs), nil +} + +// ListInProgressTasks returns a list of tasks that are being processed. +func (i *Inspector) ListInProgressTasks() ([]*Task, error) { + // TODO(hibiken): Support pagination. + msgs, err := i.rdb.listInProgress() + if err != nil { + return nil, err + } + return i.toTaskSlice(msgs), nil +} + +// ListScheduledTasks returns a list of tasks that are scheduled to +// be processed in the future. +func (i *Inspector) ListScheduledTasks() ([]*Task, error) { + // TODO(hibiken): Support pagination. + msgs, err := i.rdb.listScheduled() + if err != nil { + return nil, err + } + return i.toTaskSlice(msgs), nil +} + +// ListRetryTasks returns a list of tasks what will be retried in the +// future. +func (i *Inspector) ListRetryTasks() ([]*Task, error) { + // TODO(hibiken): Support pagination. + msgs, err := i.rdb.listRetry() + if err != nil { + return nil, err + } + return i.toTaskSlice(msgs), nil +} + +// ListDeadTasks returns a list of tasks that have reached its +// maximum retry limit. +func (i *Inspector) ListDeadTasks() ([]*Task, error) { + // TODO(hibiken): Support pagination. + msgs, err := i.rdb.listDead() + if err != nil { + return nil, err + } + return i.toTaskSlice(msgs), nil +} diff --git a/inspector_test.go b/inspector_test.go index bd0e544..aac32e0 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -114,3 +114,253 @@ func TestCurrentStats(t *testing.T) { } } } + +func TestListEnqueuedTasks(t *testing.T) { + r := setup(t) + inspector := &Inspector{r} + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("send_email", "default", nil) + m3 := randomTask("gen_export", "default", nil) + t1 := &Task{Type: m1.Type, Payload: m1.Payload} + t2 := &Task{Type: m2.Type, Payload: m2.Payload} + t3 := &Task{Type: m3.Type, Payload: m3.Payload} + + tests := []struct { + queued []*taskMessage + want []*Task + }{ + { + queued: []*taskMessage{m1, m2, m3}, + want: []*Task{t1, t2, t3}, + }, + { + queued: []*taskMessage{}, + want: []*Task{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + for _, msg := range tc.queued { + err := r.client.LPush(defaultQueue, mustMarshal(t, msg)).Err() + if err != nil { + t.Fatal(err) + } + } + + got, err := inspector.ListEnqueuedTasks() + if err != nil { + t.Error(err) + continue + } + + if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + t.Errorf("(*Inspector).ListEnqueuedTasks = %v, want %v; (-want, +got)\n%s", + got, tc.want, diff) + continue + } + } +} + +func TestListInProgressTasks(t *testing.T) { + r := setup(t) + inspector := &Inspector{r} + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("send_email", "default", nil) + m3 := randomTask("gen_export", "default", nil) + t1 := &Task{Type: m1.Type, Payload: m1.Payload} + t2 := &Task{Type: m2.Type, Payload: m2.Payload} + t3 := &Task{Type: m3.Type, Payload: m3.Payload} + + tests := []struct { + inProgress []*taskMessage + want []*Task + }{ + { + inProgress: []*taskMessage{m1, m2, m3}, + want: []*Task{t1, t2, t3}, + }, + { + inProgress: []*taskMessage{}, + want: []*Task{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + for _, msg := range tc.inProgress { + err := r.client.LPush(inProgress, mustMarshal(t, msg)).Err() + if err != nil { + t.Fatal(err) + } + } + + got, err := inspector.ListInProgressTasks() + if err != nil { + t.Error(err) + continue + } + + if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + t.Errorf("(*Inspector).ListInProgressTasks = %v, want %v; (-want, +got)\n%s", + got, tc.want, diff) + continue + } + } +} + +func TestListScheduledTasks(t *testing.T) { + r := setup(t) + inspector := &Inspector{r} + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("send_email", "default", nil) + m3 := randomTask("gen_export", "default", nil) + t1 := &Task{Type: m1.Type, Payload: m1.Payload} + t2 := &Task{Type: m2.Type, Payload: m2.Payload} + t3 := &Task{Type: m3.Type, Payload: m3.Payload} + + tests := []struct { + scheduled []*taskMessage + want []*Task + }{ + { + scheduled: []*taskMessage{m1, m2, m3}, + want: []*Task{t1, t2, t3}, + }, + { + scheduled: []*taskMessage{}, + want: []*Task{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + for _, msg := range tc.scheduled { + err := r.client.ZAdd(scheduled, &redis.Z{Member: mustMarshal(t, msg), Score: 123}).Err() + if err != nil { + t.Fatal(err) + } + } + + got, err := inspector.ListScheduledTasks() + if err != nil { + t.Error(err) + continue + } + + if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + t.Errorf("(*Inspector).ListScheduledTasks = %v, want %v; (-want, +got)\n%s", + got, tc.want, diff) + continue + } + } +} + +func TestListRetryTasks(t *testing.T) { + r := setup(t) + inspector := &Inspector{r} + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("send_email", "default", nil) + m3 := randomTask("gen_export", "default", nil) + t1 := &Task{Type: m1.Type, Payload: m1.Payload} + t2 := &Task{Type: m2.Type, Payload: m2.Payload} + t3 := &Task{Type: m3.Type, Payload: m3.Payload} + + tests := []struct { + retry []*taskMessage + want []*Task + }{ + { + retry: []*taskMessage{m1, m2, m3}, + want: []*Task{t1, t2, t3}, + }, + { + retry: []*taskMessage{}, + want: []*Task{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + for _, msg := range tc.retry { + err := r.client.ZAdd(retry, &redis.Z{Member: mustMarshal(t, msg), Score: 123}).Err() + if err != nil { + t.Fatal(err) + } + } + + got, err := inspector.ListRetryTasks() + if err != nil { + t.Error(err) + continue + } + + if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + t.Errorf("(*Inspector).ListRetryTasks = %v, want %v; (-want, +got)\n%s", + got, tc.want, diff) + continue + } + } +} + +func TestListDeadTasks(t *testing.T) { + r := setup(t) + inspector := &Inspector{r} + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("send_email", "default", nil) + m3 := randomTask("gen_export", "default", nil) + t1 := &Task{Type: m1.Type, Payload: m1.Payload} + t2 := &Task{Type: m2.Type, Payload: m2.Payload} + t3 := &Task{Type: m3.Type, Payload: m3.Payload} + + tests := []struct { + dead []*taskMessage + want []*Task + }{ + { + dead: []*taskMessage{m1, m2, m3}, + want: []*Task{t1, t2, t3}, + }, + { + dead: []*taskMessage{}, + want: []*Task{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + for _, msg := range tc.dead { + err := r.client.ZAdd(dead, &redis.Z{Member: mustMarshal(t, msg), Score: 123}).Err() + if err != nil { + t.Fatal(err) + } + } + + got, err := inspector.ListDeadTasks() + if err != nil { + t.Error(err) + continue + } + + if diff := cmp.Diff(tc.want, got, sortTaskOpt); diff != "" { + t.Errorf("(*Inspector).ListDeadTasks = %v, want %v; (-want, +got)\n%s", + got, tc.want, diff) + continue + } + } +} diff --git a/rdb.go b/rdb.go index a6b4e6b..31bceff 100644 --- a/rdb.go +++ b/rdb.go @@ -175,3 +175,54 @@ func (r *rdb) currentStats() (*Stats, error) { Timestamp: time.Now(), }, nil } + +func (r *rdb) listEnqueued() ([]*taskMessage, error) { + return r.rangeList(defaultQueue) +} + +func (r *rdb) listInProgress() ([]*taskMessage, error) { + return r.rangeList(inProgress) +} + +func (r *rdb) listScheduled() ([]*taskMessage, error) { + return r.rangeZSet(scheduled) +} + +func (r *rdb) listRetry() ([]*taskMessage, error) { + return r.rangeZSet(retry) +} + +func (r *rdb) listDead() ([]*taskMessage, error) { + return r.rangeZSet(dead) +} + +func (r *rdb) rangeList(key string) ([]*taskMessage, error) { + data, err := r.client.LRange(key, 0, -1).Result() + if err != nil { + return nil, err + } + return r.toMessageSlice(data), nil +} + +func (r *rdb) rangeZSet(key string) ([]*taskMessage, error) { + data, err := r.client.ZRange(key, 0, -1).Result() + if err != nil { + return nil, err + } + return r.toMessageSlice(data), nil +} + +// toMessageSlice convers json strings to a slice of task messages. +func (r *rdb) toMessageSlice(data []string) []*taskMessage { + var msgs []*taskMessage + for _, s := range data { + var msg taskMessage + err := json.Unmarshal([]byte(s), &msg) + if err != nil { + // bad data; ignore and continue + continue + } + msgs = append(msgs, &msg) + } + return msgs +}