2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00
asynq/rdb_test.go

445 lines
12 KiB
Go
Raw Normal View History

2019-11-20 23:01:24 +08:00
package asynq
import (
"encoding/json"
2019-11-27 23:16:16 +08:00
"fmt"
"math/rand"
2019-11-26 22:38:11 +08:00
"sort"
2019-11-20 23:01:24 +08:00
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
2019-11-22 22:16:43 +08:00
"github.com/google/uuid"
2019-11-20 23:01:24 +08:00
)
func init() {
rand.Seed(time.Now().UnixNano())
}
2019-11-28 22:50:05 +08:00
var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*taskMessage) []*taskMessage {
out := append([]*taskMessage(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
})
return out
})
2019-11-30 04:48:54 +08:00
var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
out := append([]*Task(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].Type < out[j].Type
})
return out
})
2019-11-20 23:01:24 +08:00
// setup connects to a redis database and flush all keys
// before returning an instance of rdb.
2019-11-26 10:55:17 +08:00
func setup(t *testing.T) *rdb {
t.Helper()
2019-11-27 22:41:54 +08:00
r := newRDB(&RedisOpt{
2019-11-20 23:01:24 +08:00
Addr: "localhost:6379",
DB: 15, // use database 15 to separate from other applications
})
// Start each test with a clean slate.
2019-11-27 22:41:54 +08:00
if err := r.client.FlushDB().Err(); err != nil {
2019-11-20 23:01:24 +08:00
panic(err)
}
2019-11-27 22:41:54 +08:00
return r
2019-11-20 23:01:24 +08:00
}
2019-11-26 10:55:17 +08:00
func randomTask(taskType, qname string, payload map[string]interface{}) *taskMessage {
return &taskMessage{
2019-11-26 10:55:17 +08:00
ID: uuid.New(),
Type: taskType,
Queue: qname,
Retry: rand.Intn(100),
Payload: make(map[string]interface{}),
2019-11-20 23:01:24 +08:00
}
}
2019-11-28 22:50:05 +08:00
func mustMarshal(t *testing.T, task *taskMessage) string {
t.Helper()
data, err := json.Marshal(task)
if err != nil {
t.Fatal(err)
}
return string(data)
}
func mustUnmarshal(t *testing.T, data string) *taskMessage {
t.Helper()
var task taskMessage
err := json.Unmarshal([]byte(data), &task)
if err != nil {
t.Fatal(err)
}
return &task
}
2019-11-28 23:17:07 +08:00
func mustMarshalSlice(t *testing.T, tasks []*taskMessage) []string {
t.Helper()
var data []string
for _, task := range tasks {
data = append(data, mustMarshal(t, task))
}
return data
}
func mustUnmarshalSlice(t *testing.T, data []string) []*taskMessage {
t.Helper()
var tasks []*taskMessage
for _, s := range data {
tasks = append(tasks, mustUnmarshal(t, s))
}
return tasks
}
2019-11-26 11:58:24 +08:00
func TestEnqueue(t *testing.T) {
2019-11-26 10:55:17 +08:00
r := setup(t)
tests := []struct {
msg *taskMessage
}{
2019-11-26 12:57:53 +08:00
{msg: randomTask("send_email", "default",
map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})},
{msg: randomTask("generate_csv", "default",
map[string]interface{}{})},
2019-11-26 10:55:17 +08:00
{msg: randomTask("sync", "default", nil)},
2019-11-20 23:01:24 +08:00
}
2019-11-26 10:55:17 +08:00
for _, tc := range tests {
2019-11-26 12:57:53 +08:00
// clean up db before each test case.
2019-11-27 22:41:54 +08:00
if err := r.client.FlushDB().Err(); err != nil {
2019-11-26 12:57:53 +08:00
t.Fatal(err)
}
2019-11-26 11:58:24 +08:00
err := r.enqueue(tc.msg)
2019-11-26 10:55:17 +08:00
if err != nil {
t.Error(err)
2019-11-28 12:05:31 +08:00
continue
2019-11-26 10:55:17 +08:00
}
2019-11-27 22:41:54 +08:00
res := r.client.LRange(defaultQueue, 0, -1).Val()
2019-11-26 10:55:17 +08:00
if len(res) != 1 {
t.Errorf("LIST %q has length %d, want 1", defaultQueue, len(res))
continue
}
2019-11-27 22:41:54 +08:00
if !r.client.SIsMember(allQueues, defaultQueue).Val() {
2019-11-26 10:55:17 +08:00
t.Errorf("SISMEMBER %q %q = false, want true", allQueues, defaultQueue)
}
2019-11-28 23:17:07 +08:00
if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" {
2019-11-26 10:55:17 +08:00
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
}
2019-11-20 23:01:24 +08:00
}
}
2019-11-26 12:57:53 +08:00
func TestDequeue(t *testing.T) {
2019-11-26 10:55:17 +08:00
r := setup(t)
2019-11-26 12:57:53 +08:00
t1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello!"})
tests := []struct {
queued []*taskMessage
want *taskMessage
err error
inProgress int64 // length of "in-progress" tasks after dequeue
}{
{queued: []*taskMessage{t1}, want: t1, err: nil, inProgress: 1},
2019-11-28 11:36:56 +08:00
{queued: []*taskMessage{}, want: nil, err: errDequeueTimeout, inProgress: 0},
2019-11-22 13:45:27 +08:00
}
2019-11-20 23:01:24 +08:00
2019-11-26 12:57:53 +08:00
for _, tc := range tests {
// clean up db before each test case.
2019-11-27 22:41:54 +08:00
if err := r.client.FlushDB().Err(); err != nil {
2019-11-26 12:57:53 +08:00
t.Fatal(err)
}
for _, m := range tc.queued {
r.enqueue(m)
}
got, err := r.dequeue(defaultQueue, time.Second)
if !cmp.Equal(got, tc.want) || err != tc.err {
t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v",
defaultQueue, got, err, tc.want, tc.err)
continue
}
2019-11-27 22:41:54 +08:00
if l := r.client.LLen(inProgress).Val(); l != tc.inProgress {
2019-11-26 12:57:53 +08:00
t.Errorf("LIST %q has length %d, want %d", inProgress, l, tc.inProgress)
}
2019-11-20 23:01:24 +08:00
}
}
2019-11-28 22:50:05 +08:00
func TestRemove(t *testing.T) {
r := setup(t)
t1 := randomTask("send_email", "default", nil)
t2 := randomTask("export_csv", "csv", nil)
tests := []struct {
initial []*taskMessage // initial state of the list
target *taskMessage // task to remove
final []*taskMessage // final state of the list
}{
{
initial: []*taskMessage{t1, t2},
target: t1,
final: []*taskMessage{t2},
},
{
initial: []*taskMessage{t2},
target: t1,
final: []*taskMessage{t2},
},
{
initial: []*taskMessage{t1},
target: t1,
final: []*taskMessage{},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
// set up initial state
for _, task := range tc.initial {
err := r.client.LPush(defaultQueue, mustMarshal(t, task)).Err()
if err != nil {
t.Fatal(err)
}
}
err := r.remove(defaultQueue, tc.target)
if err != nil {
t.Error(err)
continue
}
var got []*taskMessage
data := r.client.LRange(defaultQueue, 0, -1).Val()
for _, s := range data {
got = append(got, mustUnmarshal(t, s))
}
if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", defaultQueue, diff)
2019-11-28 23:47:12 +08:00
continue
}
}
}
func TestKill(t *testing.T) {
r := setup(t)
t1 := randomTask("send_email", "default", nil)
// TODO(hibiken): add test cases for trimming
tests := []struct {
initial []*taskMessage // inital state of "dead" set
target *taskMessage // task to kill
want []*taskMessage // final state of "dead" set
}{
{
initial: []*taskMessage{},
target: t1,
want: []*taskMessage{t1},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
// set up initial state
for _, task := range tc.initial {
err := r.client.ZAdd(dead, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err()
if err != nil {
t.Fatal(err)
}
}
err := r.kill(tc.target)
if err != nil {
t.Error(err)
continue
}
actual := r.client.ZRange(dead, 0, -1).Val()
got := mustUnmarshalSlice(t, actual)
if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", dead, diff)
continue
2019-11-28 22:50:05 +08:00
}
}
}
func TestMoveAll(t *testing.T) {
2019-11-26 10:55:17 +08:00
r := setup(t)
2019-11-27 01:57:53 +08:00
t1 := randomTask("send_email", "default", nil)
t2 := randomTask("export_csv", "csv", nil)
t3 := randomTask("sync_stuff", "sync", nil)
2019-11-27 01:57:53 +08:00
tests := []struct {
2019-11-28 23:17:07 +08:00
beforeSrc []*taskMessage
beforeDst []*taskMessage
afterSrc []*taskMessage
afterDst []*taskMessage
2019-11-27 01:57:53 +08:00
}{
{
2019-11-28 23:17:07 +08:00
beforeSrc: []*taskMessage{t1, t2, t3},
beforeDst: []*taskMessage{},
afterSrc: []*taskMessage{},
afterDst: []*taskMessage{t1, t2, t3},
2019-11-27 01:57:53 +08:00
},
{
2019-11-28 23:17:07 +08:00
beforeSrc: []*taskMessage{},
beforeDst: []*taskMessage{t1, t2, t3},
afterSrc: []*taskMessage{},
afterDst: []*taskMessage{t1, t2, t3},
2019-11-27 01:57:53 +08:00
},
{
2019-11-28 23:17:07 +08:00
beforeSrc: []*taskMessage{t2, t3},
beforeDst: []*taskMessage{t1},
afterSrc: []*taskMessage{},
afterDst: []*taskMessage{t1, t2, t3},
2019-11-27 01:57:53 +08:00
},
}
2019-11-27 01:57:53 +08:00
for _, tc := range tests {
// clean up db before each test case.
2019-11-27 22:41:54 +08:00
if err := r.client.FlushDB().Err(); err != nil {
2019-11-27 01:57:53 +08:00
t.Error(err)
continue
}
// seed src list.
for _, msg := range tc.beforeSrc {
2019-11-28 23:17:07 +08:00
r.client.LPush(inProgress, mustMarshal(t, msg))
2019-11-27 01:57:53 +08:00
}
// seed dst list.
for _, msg := range tc.beforeDst {
2019-11-28 23:17:07 +08:00
r.client.LPush(defaultQueue, mustMarshal(t, msg))
2019-11-27 01:57:53 +08:00
}
if err := r.moveAll(inProgress, defaultQueue); err != nil {
t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", inProgress, defaultQueue, err)
continue
}
2019-11-28 23:17:07 +08:00
src := r.client.LRange(inProgress, 0, -1).Val()
gotSrc := mustUnmarshalSlice(t, src)
if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" {
2019-11-27 01:57:53 +08:00
t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgress, diff)
}
2019-11-28 23:17:07 +08:00
dst := r.client.LRange(defaultQueue, 0, -1).Val()
gotDst := mustUnmarshalSlice(t, dst)
if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" {
2019-11-27 01:57:53 +08:00
t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQueue, diff)
}
}
}
func TestForward(t *testing.T) {
2019-11-26 10:55:17 +08:00
r := setup(t)
t1 := randomTask("send_email", defaultQueue, nil)
t2 := randomTask("generate_csv", defaultQueue, nil)
2019-11-26 22:38:11 +08:00
secondAgo := time.Now().Add(-time.Second)
hourFromNow := time.Now().Add(time.Hour)
tests := []struct {
2019-11-28 23:17:07 +08:00
tasks []*redis.Z // scheduled tasks with timestamp as a score
wantQueued []*taskMessage // queue after calling forward
wantScheduled []*taskMessage // scheduled queue after calling forward
2019-11-26 22:38:11 +08:00
}{
{
tasks: []*redis.Z{
2019-11-28 23:17:07 +08:00
&redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())},
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
wantQueued: []*taskMessage{t1, t2},
wantScheduled: []*taskMessage{},
2019-11-26 22:38:11 +08:00
},
{
tasks: []*redis.Z{
2019-11-28 23:17:07 +08:00
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
wantQueued: []*taskMessage{t2},
wantScheduled: []*taskMessage{t1},
2019-11-26 22:38:11 +08:00
},
{
tasks: []*redis.Z{
2019-11-28 23:17:07 +08:00
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
&redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}},
wantQueued: []*taskMessage{},
wantScheduled: []*taskMessage{t1, t2},
2019-11-26 22:38:11 +08:00
},
}
for _, tc := range tests {
// clean up db before each test case.
2019-11-27 22:41:54 +08:00
if err := r.client.FlushDB().Err(); err != nil {
2019-11-26 22:38:11 +08:00
t.Fatal(err)
}
2019-11-27 22:41:54 +08:00
if err := r.client.ZAdd(scheduled, tc.tasks...).Err(); err != nil {
2019-11-26 22:38:11 +08:00
t.Error(err)
continue
}
2019-11-28 23:17:07 +08:00
err := r.forward(scheduled)
2019-11-26 22:38:11 +08:00
if err != nil {
t.Errorf("(*rdb).forward(%q) = %v, want nil", scheduled, err)
continue
}
2019-11-28 23:17:07 +08:00
queued := r.client.LRange(defaultQueue, 0, -1).Val()
gotQueued := mustUnmarshalSlice(t, queued)
if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" {
2019-11-26 22:38:11 +08:00
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQueue, len(gotQueued), len(tc.wantQueued), diff)
continue
}
2019-11-28 23:17:07 +08:00
scheduled := r.client.ZRangeByScore(scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val()
gotScheduled := mustUnmarshalSlice(t, scheduled)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" {
2019-11-26 22:38:11 +08:00
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff)
continue
}
}
}
2019-11-27 23:16:16 +08:00
func TestSchedule(t *testing.T) {
r := setup(t)
tests := []struct {
msg *taskMessage
processAt time.Time
zset string
}{
{
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
time.Now().Add(15 * time.Minute),
scheduled,
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
err := r.schedule(tc.zset, tc.processAt, tc.msg)
if err != nil {
t.Error(err)
continue
}
res, err := r.client.ZRangeWithScores(tc.zset, 0, -1).Result()
if err != nil {
t.Error(err)
continue
}
desc := fmt.Sprintf("(*rdb).schedule(%q, %v, %v)", tc.zset, tc.processAt, tc.msg)
if len(res) != 1 {
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), tc.zset)
continue
}
if res[0].Score != float64(tc.processAt.Unix()) {
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
continue
}
}
}