diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 89aac43..64f2446 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -49,7 +49,19 @@ func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskM return &base.TaskMessage{ ID: xid.New(), Type: taskType, - Queue: "default", + Queue: base.DefaultQueueName, + Retry: 25, + Payload: payload, + } +} + +// NewTaskMessageWithQueue returns a new instance of TaskMessage given a +// task type, payload and queue name. +func NewTaskMessageWithQueue(taskType string, payload map[string]interface{}, qname string) *base.TaskMessage { + return &base.TaskMessage{ + ID: xid.New(), + Type: taskType, + Queue: qname, Retry: 25, Payload: payload, } @@ -117,6 +129,7 @@ func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, if len(queueOpt) > 0 { queue = base.QueueKey(queueOpt[0]) } + r.SAdd(base.AllQueues, queue) seedRedisList(tb, r, queue, msgs) } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 397c216..5a3a496 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -233,7 +233,22 @@ func (r *RDB) RedisInfo() (map[string]string, error) { // ListEnqueued returns all enqueued tasks that are ready to be processed. func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { - data, err := r.client.LRange(base.DefaultQueue, 0, -1).Result() + script := redis.NewScript(` + local res = {} + local queues = redis.call("SMEMBERS", KEYS[1]) + for _, qkey in ipairs(queues) do + local msgs = redis.call("LRANGE", qkey, 0, -1) + for _, msg in ipairs(msgs) do + table.insert(res, msg) + end + end + return res + `) + res, err := script.Run(r.client, []string{base.AllQueues}).Result() + if err != nil { + return nil, err + } + data, err := cast.ToStringSliceE(res) if err != nil { return nil, err } @@ -242,8 +257,7 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { var msg base.TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { - // continue // bad data, ignore and continue - return nil, err + continue // bad data, ignore and continue } tasks = append(tasks, &EnqueuedTask{ ID: msg.ID, diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 13a415c..6ee2448 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -225,25 +225,43 @@ func TestListEnqueued(t *testing.T) { m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m2 := h.NewTaskMessage("reindex", nil) + m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") + m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} + t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload} + t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload} tests := []struct { - enqueued []*base.TaskMessage + enqueued map[string][]*base.TaskMessage want []*EnqueuedTask }{ { - enqueued: []*base.TaskMessage{m1, m2}, - want: []*EnqueuedTask{t1, t2}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {m1, m2}, + }, + want: []*EnqueuedTask{t1, t2}, }, { - enqueued: []*base.TaskMessage{}, - want: []*EnqueuedTask{}, + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {}, + }, + want: []*EnqueuedTask{}, + }, + { + enqueued: map[string][]*base.TaskMessage{ + base.DefaultQueueName: {m1, m2}, + "critical": {m3}, + "low": {m4}, + }, + want: []*EnqueuedTask{t1, t2, t3, t4}, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedEnqueuedQueue(t, r.client, tc.enqueued) + for qname, msgs := range tc.enqueued { + h.SeedEnqueuedQueue(t, r.client, msgs, qname) + } got, err := r.ListEnqueued() if err != nil {