2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

Update all list methods in RDB

This commit is contained in:
Ken Hibino 2020-08-12 06:18:15 -07:00
parent 5f82b4b365
commit dbf140a767
2 changed files with 208 additions and 90 deletions

View File

@ -254,9 +254,9 @@ func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, e
return r.listMessages(qkey, pgn) return r.listMessages(qkey, pgn)
} }
// ListInProgress returns all tasks that are currently being processed. // ListInProgress returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error) { func (r *RDB) ListInProgress(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
return r.listMessages(base.InProgressQueue, pgn) return r.listMessages(base.InProgressKey(qname), pgn)
} }
// listMessages returns a list of TaskMessage in Redis list with the given key. // listMessages returns a list of TaskMessage in Redis list with the given key.
@ -282,21 +282,21 @@ func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, err
} }
// ListScheduled returns all tasks that are scheduled to be processed // ListScheduled returns all tasks from the given queue that are scheduled
// in the future. // to be processed in the future.
func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error) { func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.ScheduledQueue, pgn) return r.listZSetEntries(base.ScheduledKey(qname), pgn)
} }
// ListRetry returns all tasks that have failed before and willl be retried // ListRetry returns all tasks from the given queue that have failed before
// in the future. // and willl be retried in the future.
func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error) { func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.RetryQueue, pgn) return r.listZSetEntries(base.RetryKey(qname), pgn)
} }
// ListDead returns all tasks that have exhausted its retry limit. // ListDead returns all tasks from the given queue that have exhausted its retry limit.
func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error) { func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.DeadQueue, pgn) return r.listZSetEntries(base.DeadKey(qname), pgn)
} }
// listZSetEntries returns a list of message and score pairs in Redis sorted-set // listZSetEntries returns a list of message and score pairs in Redis sorted-set

View File

@ -369,7 +369,7 @@ func TestListEnqueuedPagination(t *testing.T) {
msgs = append(msgs, msg) msgs = append(msgs, msg)
} }
// create 100 tasks in default queue // create 100 tasks in default queue
h.SeedEnqueuedQueue(t, r.client, msgs) h.SeedEnqueuedQueue(t, r.client, msgs, "default")
msgs = []*base.TaskMessage(nil) // empty list msgs = []*base.TaskMessage(nil) // empty list
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
@ -432,26 +432,44 @@ func TestListInProgress(t *testing.T) {
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task2", nil, "critical")
m4 := h.NewTaskMessageWithQueue("task2", nil, "low")
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage inProgress map[string][]*base.TaskMessage
qname string
want []*base.TaskMessage
}{ }{
{inProgress: []*base.TaskMessage{m1, m2}}, {
{inProgress: []*base.TaskMessage(nil)}, inProgress: map[string][]*base.TaskMessage{
"default": {m1, m2},
"critical": {m3},
"low": {m4},
},
qname: "default",
want: []*base.TaskMessage{m1, m2},
},
{
inProgress: map[string][]*base.TaskMessage{
"default": {},
},
qname: "default",
want: []*base.TaskMessage{},
},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
got, err := r.ListInProgress(Pagination{Size: 20, Page: 0}) got, err := r.ListInProgress(tc.qname, Pagination{Size: 20, Page: 0})
op := "r.ListInProgress(Pagination{Size: 20, Page: 0})" op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil { if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress)
continue continue
} }
if diff := cmp.Diff(tc.inProgress, got); diff != "" { if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.inProgress, diff) t.Errorf("%s = %v, %v, want %v, nil; (-want, +got)\n%s", op, got, err, tc.want, diff)
continue continue
} }
} }
@ -464,26 +482,27 @@ func TestListInProgressPagination(t *testing.T) {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg) msgs = append(msgs, msg)
} }
h.SeedInProgressQueue(t, r.client, msgs) h.SeedInProgressQueue(t, r.client, msgs, "default")
tests := []struct { tests := []struct {
desc string desc string
qname string
page int page int
size int size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
}{ }{
{"first page", 0, 20, 20, "task 0", "task 19"}, {"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"}, {"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"}, {"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"}, {"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""}, {"out of range", "default", 4, 30, 0, "", ""},
} }
for _, tc := range tests { for _, tc := range tests {
got, err := r.ListInProgress(Pagination{Size: tc.size, Page: tc.page}) got, err := r.ListInProgress(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListInProgress(Pagination{Size: %d, Page: %d})", tc.size, tc.page) op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil { if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err) t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue continue
@ -517,20 +536,29 @@ func TestListScheduled(t *testing.T) {
m1 := h.NewTaskMessage("task1", nil) m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil) m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessage("task3", nil) m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessageWithQueue("task3", nil, "custom")
p1 := time.Now().Add(30 * time.Minute) p1 := time.Now().Add(30 * time.Minute)
p2 := time.Now().Add(24 * time.Hour) p2 := time.Now().Add(24 * time.Hour)
p3 := time.Now().Add(5 * time.Minute) p3 := time.Now().Add(5 * time.Minute)
p4 := time.Now().Add(2 * time.Minute)
tests := []struct { tests := []struct {
scheduled []base.Z scheduled map[string][]base.Z
qname string
want []base.Z want []base.Z
}{ }{
{ {
scheduled: []base.Z{ scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()}, {Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()}, {Message: m2, Score: p2.Unix()},
{Message: m3, Score: p3.Unix()}, {Message: m3, Score: p3.Unix()},
}, },
"custom": {
{Message: m4, Score: p4.Unix()},
},
},
qname: "default",
// should be sorted by score in ascending order // should be sorted by score in ascending order
want: []base.Z{ want: []base.Z{
{Message: m3, Score: p3.Unix()}, {Message: m3, Score: p3.Unix()},
@ -539,17 +567,36 @@ func TestListScheduled(t *testing.T) {
}, },
}, },
{ {
scheduled: []base.Z(nil), scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
{Message: m3, Score: p3.Unix()},
},
"custom": {
{Message: m4, Score: p4.Unix()},
},
},
qname: "custom",
want: []base.Z{
{Message: m4, Score: p4.Unix()},
},
},
{
scheduled: map[string][]base.Z{
"default": {},
},
qname: "default",
want: []base.Z(nil), want: []base.Z(nil),
}, },
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got, err := r.ListScheduled(Pagination{Size: 20, Page: 0}) got, err := r.ListScheduled(tc.qname, Pagination{Size: 20, Page: 0})
op := "r.ListScheduled(Pagination{Size: 20, Page: 0})" op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil { if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue continue
@ -573,22 +620,23 @@ func TestListScheduledPagination(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
qname string
page int page int
size int size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
}{ }{
{"first page", 0, 20, 20, "task 0", "task 19"}, {"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"}, {"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"}, {"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"}, {"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""}, {"out of range", "default", 4, 30, 0, "", ""},
} }
for _, tc := range tests { for _, tc := range tests {
got, err := r.ListScheduled(Pagination{Size: tc.size, Page: tc.page}) got, err := r.ListScheduled(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListScheduled(Pagination{Size: %d, Page: %d})", tc.size, tc.page) op := fmt.Sprintf("r.ListScheduled(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil { if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err) t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue continue
@ -621,51 +669,86 @@ func TestListRetry(t *testing.T) {
r := setup(t) r := setup(t)
m1 := &base.TaskMessage{ m1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "task1",
Queue: "default", Queue: "default",
Payload: map[string]interface{}{"subject": "hello"}, Payload: nil,
ErrorMsg: "email server not responding", ErrorMsg: "some error occurred",
Retry: 25, Retry: 25,
Retried: 10, Retried: 10,
} }
m2 := &base.TaskMessage{ m2 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "reindex", Type: "task2",
Queue: "default", Queue: "default",
Payload: nil, Payload: nil,
ErrorMsg: "search engine not responding", ErrorMsg: "some error occurred",
Retry: 25, Retry: 25,
Retried: 2, Retried: 2,
} }
m3 := &base.TaskMessage{
ID: uuid.New(),
Type: "task3",
Queue: "custom",
Payload: nil,
ErrorMsg: "some error occurred",
Retry: 25,
Retried: 3,
}
p1 := time.Now().Add(5 * time.Minute) p1 := time.Now().Add(5 * time.Minute)
p2 := time.Now().Add(24 * time.Hour) p2 := time.Now().Add(24 * time.Hour)
p3 := time.Now().Add(24 * time.Hour)
tests := []struct { tests := []struct {
retry []base.Z retry map[string][]base.Z
qname string
want []base.Z want []base.Z
}{ }{
{ {
retry: []base.Z{ retry: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()}, {Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()}, {Message: m2, Score: p2.Unix()},
}, },
"custom": {
{Message: m3, Score: p3.Unix()},
},
},
qname: "default",
want: []base.Z{ want: []base.Z{
{Message: m1, Score: p1.Unix()}, {Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()}, {Message: m2, Score: p2.Unix()},
}, },
}, },
{ {
retry: []base.Z(nil), retry: map[string][]base.Z{
"default": {
{Message: m1, Score: p1.Unix()},
{Message: m2, Score: p2.Unix()},
},
"custom": {
{Message: m3, Score: p3.Unix()},
},
},
qname: "custom",
want: []base.Z{
{Message: m3, Score: p3.Unix()},
},
},
{
retry: map[string][]base.Z{
"default": {},
},
qname: "default",
want: []base.Z(nil), want: []base.Z(nil),
}, },
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedRetryQueue(t, r.client, tc.retry) h.SeedAllRetryQueues(t, r.client, tc.retry)
got, err := r.ListRetry(Pagination{Size: 20, Page: 0}) got, err := r.ListRetry(Pagination{Size: 20, Page: 0})
op := "r.ListRetry(Pagination{Size: 20, Page: 0})" op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil { if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue continue
@ -688,27 +771,28 @@ func TestListRetryPagination(t *testing.T) {
processAt := now.Add(time.Duration(i) * time.Second) processAt := now.Add(time.Duration(i) * time.Second)
seed = append(seed, base.Z{Message: msg, Score: processAt.Unix()}) seed = append(seed, base.Z{Message: msg, Score: processAt.Unix()})
} }
h.SeedRetryQueue(t, r.client, seed) h.SeedRetryQueue(t, r.client, seed, "default")
tests := []struct { tests := []struct {
desc string desc string
qname string
page int page int
size int size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
}{ }{
{"first page", 0, 20, 20, "task 0", "task 19"}, {"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"}, {"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"}, {"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"}, {"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""}, {"out of range", "default", 4, 30, 0, "", ""},
} }
for _, tc := range tests { for _, tc := range tests {
got, err := r.ListRetry(Pagination{Size: tc.size, Page: tc.page}) got, err := r.ListRetry(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListRetry(Pagination{Size: %d, Page: %d})", op := fmt.Sprintf("r.ListRetry(%q, Pagination{Size: %d, Page: %d})",
tc.size, tc.page) tc.qname, tc.size, tc.page)
if err != nil { if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err) t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue continue
@ -742,47 +826,80 @@ func TestListDead(t *testing.T) {
r := setup(t) r := setup(t)
m1 := &base.TaskMessage{ m1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "task1",
Queue: "default", Queue: "default",
Payload: map[string]interface{}{"subject": "hello"}, Payload: nil,
ErrorMsg: "email server not responding", ErrorMsg: "some error occurred",
} }
m2 := &base.TaskMessage{ m2 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "reindex", Type: "task2",
Queue: "default", Queue: "default",
Payload: nil, Payload: nil,
ErrorMsg: "search engine not responding", ErrorMsg: "some error occurred",
}
m3 := &base.TaskMessage{
ID: uuid.New(),
Type: "task3",
Queue: "custom",
Payload: nil,
ErrorMsg: "some error occurred",
} }
f1 := time.Now().Add(-5 * time.Minute) f1 := time.Now().Add(-5 * time.Minute)
f2 := time.Now().Add(-24 * time.Hour) f2 := time.Now().Add(-24 * time.Hour)
f3 := time.Now().Add(-4 * time.Hour)
tests := []struct { tests := []struct {
dead []base.Z dead map[string][]base.Z
qname string
want []base.Z want []base.Z
}{ }{
{ {
dead: []base.Z{ dead: map[string][]base.Z{
"default": {
{Message: m1, Score: f1.Unix()}, {Message: m1, Score: f1.Unix()},
{Message: m2, Score: f2.Unix()}, {Message: m2, Score: f2.Unix()},
}, },
"custom": {
{Message: m3, Score: f3.Unix()},
},
},
qname: "default",
want: []base.Z{ want: []base.Z{
{Message: m2, Score: f2.Unix()}, // FIXME: shouldn't be sorted in the other order? {Message: m2, Score: f2.Unix()}, // FIXME: shouldn't be sorted in the other order?
{Message: m1, Score: f1.Unix()}, {Message: m1, Score: f1.Unix()},
}, },
}, },
{ {
dead: []base.Z(nil), dead: map[string][]base.Z{
"default": {
{Message: m1, Score: f1.Unix()},
{Message: m2, Score: f2.Unix()},
},
"custom": {
{Message: m3, Score: f3.Unix()},
},
},
qname: "custom",
want: []base.Z{
{Message: m3, Score: f3.Unix()},
},
},
{
dead: map[string][]base.Z{
"default": {},
},
qname: "default",
want: []base.Z(nil), want: []base.Z(nil),
}, },
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDeadQueue(t, r.client, tc.dead) h.SeedAllDeadQueues(t, r.client, tc.dead)
got, err := r.ListDead(Pagination{Size: 20, Page: 0}) got, err := r.ListDead(Pagination{Size: 20, Page: 0})
op := "r.ListDead(Pagination{Size: 20, Page: 0})" op := fmt.Sprintf("r.ListDead(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil { if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want)
continue continue
@ -802,25 +919,26 @@ func TestListDeadPagination(t *testing.T) {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
entries = append(entries, base.Z{Message: msg, Score: int64(i)}) entries = append(entries, base.Z{Message: msg, Score: int64(i)})
} }
h.SeedDeadQueue(t, r.client, entries) h.SeedDeadQueue(t, r.client, entries, "default")
tests := []struct { tests := []struct {
desc string desc string
qname string
page int page int
size int size int
wantSize int wantSize int
wantFirst string wantFirst string
wantLast string wantLast string
}{ }{
{"first page", 0, 20, 20, "task 0", "task 19"}, {"first page", "default", 0, 20, 20, "task 0", "task 19"},
{"second page", 1, 20, 20, "task 20", "task 39"}, {"second page", "default", 1, 20, 20, "task 20", "task 39"},
{"different page size", 2, 30, 30, "task 60", "task 89"}, {"different page size", "default", 2, 30, 30, "task 60", "task 89"},
{"last page", 3, 30, 10, "task 90", "task 99"}, {"last page", "default", 3, 30, 10, "task 90", "task 99"},
{"out of range", 4, 30, 0, "", ""}, {"out of range", "default", 4, 30, 0, "", ""},
} }
for _, tc := range tests { for _, tc := range tests {
got, err := r.ListDead(Pagination{Size: tc.size, Page: tc.page}) got, err := r.ListDead(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})", op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})",
tc.size, tc.page) tc.size, tc.page)
if err != nil { if err != nil {