diff --git a/asynq_test.go b/asynq_test.go index b28f9eb..cc56fa9 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -10,7 +10,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hibiken/asynq/internal/rdb" + "github.com/hibiken/asynq/internal/base" "github.com/rs/xid" ) @@ -21,19 +21,9 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// Redis keys -const ( - queuePrefix = "asynq:queues:" // LIST - asynq:queues: - defaultQ = queuePrefix + "default" // LIST - scheduledQ = "asynq:scheduled" // ZSET - retryQ = "asynq:retry" // ZSET - deadQ = "asynq:dead" // ZSET - inProgressQ = "asynq:in_progress" // LIST -) - // scheduledEntry represents an item in redis sorted set (aka ZSET). type sortedSetEntry struct { - msg *rdb.TaskMessage + msg *base.TaskMessage score int64 } @@ -58,8 +48,8 @@ var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { return out }) -var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.TaskMessage { - out := append([]*rdb.TaskMessage(nil), in...) // Copy input to avoid mutating it +var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*base.TaskMessage) []*base.TaskMessage { + out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { return out[i].ID.String() < out[j].ID.String() }) @@ -74,10 +64,10 @@ var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry return out }) -var ignoreIDOpt = cmpopts.IgnoreFields(rdb.TaskMessage{}, "ID") +var ignoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") -func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage { - return &rdb.TaskMessage{ +func randomTask(taskType, qname string, payload map[string]interface{}) *base.TaskMessage { + return &base.TaskMessage{ ID: xid.New(), Type: taskType, Queue: qname, @@ -86,7 +76,7 @@ func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.Tas } } -func mustMarshal(t *testing.T, task *rdb.TaskMessage) string { +func mustMarshal(t *testing.T, task *base.TaskMessage) string { t.Helper() data, err := json.Marshal(task) if err != nil { @@ -95,9 +85,9 @@ func mustMarshal(t *testing.T, task *rdb.TaskMessage) string { return string(data) } -func mustUnmarshal(t *testing.T, data string) *rdb.TaskMessage { +func mustUnmarshal(t *testing.T, data string) *base.TaskMessage { t.Helper() - var task rdb.TaskMessage + var task base.TaskMessage err := json.Unmarshal([]byte(data), &task) if err != nil { t.Fatal(err) @@ -105,7 +95,7 @@ func mustUnmarshal(t *testing.T, data string) *rdb.TaskMessage { return &task } -func mustMarshalSlice(t *testing.T, tasks []*rdb.TaskMessage) []string { +func mustMarshalSlice(t *testing.T, tasks []*base.TaskMessage) []string { t.Helper() var data []string for _, task := range tasks { @@ -114,9 +104,9 @@ func mustMarshalSlice(t *testing.T, tasks []*rdb.TaskMessage) []string { return data } -func mustUnmarshalSlice(t *testing.T, data []string) []*rdb.TaskMessage { +func mustUnmarshalSlice(t *testing.T, data []string) []*base.TaskMessage { t.Helper() - var tasks []*rdb.TaskMessage + var tasks []*base.TaskMessage for _, s := range data { tasks = append(tasks, mustUnmarshal(t, s)) } diff --git a/client.go b/client.go index cb12b1f..aa518e6 100644 --- a/client.go +++ b/client.go @@ -3,6 +3,7 @@ package asynq import ( "time" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" "github.com/rs/xid" ) @@ -73,7 +74,7 @@ const ( // Option the last one overrides the ones before. func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error { opt := composeOptions(opts...) - msg := &rdb.TaskMessage{ + msg := &base.TaskMessage{ ID: xid.New(), Type: task.Type, Payload: task.Payload, @@ -83,7 +84,7 @@ func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error return c.enqueue(msg, processAt) } -func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error { +func (c *Client) enqueue(msg *base.TaskMessage, processAt time.Time) error { if time.Now().After(processAt) { return c.rdb.Enqueue(msg) } diff --git a/client_test.go b/client_test.go index b71ece6..ca9f27d 100644 --- a/client_test.go +++ b/client_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -19,7 +20,7 @@ func TestClient(t *testing.T) { task *Task processAt time.Time opts []Option - wantEnqueued []*rdb.TaskMessage + wantEnqueued []*base.TaskMessage wantScheduled []sortedSetEntry }{ { @@ -27,8 +28,8 @@ func TestClient(t *testing.T) { task: task, processAt: time.Now(), opts: []Option{}, - wantEnqueued: []*rdb.TaskMessage{ - &rdb.TaskMessage{ + wantEnqueued: []*base.TaskMessage{ + &base.TaskMessage{ Type: task.Type, Payload: task.Payload, Retry: defaultMaxRetry, @@ -45,7 +46,7 @@ func TestClient(t *testing.T) { wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantScheduled: []sortedSetEntry{ { - msg: &rdb.TaskMessage{ + msg: &base.TaskMessage{ Type: task.Type, Payload: task.Payload, Retry: defaultMaxRetry, @@ -62,8 +63,8 @@ func TestClient(t *testing.T) { opts: []Option{ MaxRetry(3), }, - wantEnqueued: []*rdb.TaskMessage{ - &rdb.TaskMessage{ + wantEnqueued: []*base.TaskMessage{ + &base.TaskMessage{ Type: task.Type, Payload: task.Payload, Retry: 3, @@ -79,8 +80,8 @@ func TestClient(t *testing.T) { opts: []Option{ MaxRetry(-2), }, - wantEnqueued: []*rdb.TaskMessage{ - &rdb.TaskMessage{ + wantEnqueued: []*base.TaskMessage{ + &base.TaskMessage{ Type: task.Type, Payload: task.Payload, Retry: 0, // Retry count should be set to zero @@ -97,8 +98,8 @@ func TestClient(t *testing.T) { MaxRetry(2), MaxRetry(10), }, - wantEnqueued: []*rdb.TaskMessage{ - &rdb.TaskMessage{ + wantEnqueued: []*base.TaskMessage{ + &base.TaskMessage{ Type: task.Type, Payload: task.Payload, Retry: 10, // Last option takes precedence @@ -121,13 +122,13 @@ func TestClient(t *testing.T) { continue } - gotEnqueuedRaw := r.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, ignoreIDOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, defaultQ, diff) + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.DefaultQueue, diff) } - gotScheduledRaw := r.ZRangeWithScores(scheduledQ, 0, -1).Val() + gotScheduledRaw := r.ZRangeWithScores(base.ScheduledQueue, 0, -1).Val() var gotScheduled []sortedSetEntry for _, z := range gotScheduledRaw { gotScheduled = append(gotScheduled, sortedSetEntry{ @@ -138,7 +139,7 @@ func TestClient(t *testing.T) { cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, cmpOpt, ignoreIDOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, scheduledQ, diff) + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff) } } } diff --git a/internal/base/base.go b/internal/base/base.go new file mode 100644 index 0000000..679849e --- /dev/null +++ b/internal/base/base.go @@ -0,0 +1,36 @@ +// Package base defines foundational types and constants used in asynq package. +package base + +import "github.com/rs/xid" + +// Redis keys +const ( + QueuePrefix = "asynq:queues:" // LIST - asynq:queues: + DefaultQueue = QueuePrefix + "default" // LIST + ScheduledQueue = "asynq:scheduled" // ZSET + RetryQueue = "asynq:retry" // ZSET + DeadQueue = "asynq:dead" // ZSET + InProgressQueue = "asynq:in_progress" // LIST +) + +// TaskMessage is the internal representation of a task with additional metadata fields. +// Serialized data of this type gets written in redis. +type TaskMessage struct { + //-------- Task fields -------- + // Type represents the kind of task. + Type string + // Payload holds data needed to process the task. + Payload map[string]interface{} + + //-------- Metadata fields -------- + // ID is a unique identifier for each task + ID xid.ID + // Queue is a name this message should be enqueued to + Queue string + // Retry is the max number of retry for this task. + Retry int + // Retried is the number of times we've retried this task so far + Retried int + // ErrorMsg holds the error message from the last failure + ErrorMsg string +} diff --git a/internal/rdb/helpers_test.go b/internal/rdb/helpers_test.go index 55ac3ef..1d5be35 100644 --- a/internal/rdb/helpers_test.go +++ b/internal/rdb/helpers_test.go @@ -9,6 +9,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/base" "github.com/rs/xid" ) @@ -37,8 +38,8 @@ func flushDB(t *testing.T, r *RDB) { } } -var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessage { - out := append([]*TaskMessage(nil), in...) // Copy input to avoid mutating it +var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*base.TaskMessage) []*base.TaskMessage { + out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { return out[i].ID.String() < out[j].ID.String() }) @@ -53,8 +54,8 @@ var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry return out }) -func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage { - return &TaskMessage{ +func newTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { + return &base.TaskMessage{ ID: xid.New(), Type: taskType, Queue: "default", @@ -63,7 +64,7 @@ func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessag } } -func mustMarshal(t *testing.T, msg *TaskMessage) string { +func mustMarshal(t *testing.T, msg *base.TaskMessage) string { t.Helper() data, err := json.Marshal(msg) if err != nil { @@ -72,9 +73,9 @@ func mustMarshal(t *testing.T, msg *TaskMessage) string { return string(data) } -func mustUnmarshal(t *testing.T, data string) *TaskMessage { +func mustUnmarshal(t *testing.T, data string) *base.TaskMessage { t.Helper() - var msg TaskMessage + var msg base.TaskMessage err := json.Unmarshal([]byte(data), &msg) if err != nil { t.Fatal(err) @@ -82,7 +83,7 @@ func mustUnmarshal(t *testing.T, data string) *TaskMessage { return &msg } -func mustMarshalSlice(t *testing.T, msgs []*TaskMessage) []string { +func mustMarshalSlice(t *testing.T, msgs []*base.TaskMessage) []string { t.Helper() var data []string for _, m := range msgs { @@ -91,16 +92,16 @@ func mustMarshalSlice(t *testing.T, msgs []*TaskMessage) []string { return data } -func mustUnmarshalSlice(t *testing.T, data []string) []*TaskMessage { +func mustUnmarshalSlice(t *testing.T, data []string) []*base.TaskMessage { t.Helper() - var msgs []*TaskMessage + var msgs []*base.TaskMessage for _, s := range data { msgs = append(msgs, mustUnmarshal(t, s)) } return msgs } -func seedRedisList(t *testing.T, c *redis.Client, key string, msgs []*TaskMessage) { +func seedRedisList(t *testing.T, c *redis.Client, key string, msgs []*base.TaskMessage) { data := mustMarshalSlice(t, msgs) for _, s := range data { if err := c.LPush(key, s).Err(); err != nil { @@ -120,36 +121,36 @@ func seedRedisZSet(t *testing.T, c *redis.Client, key string, items []sortedSetE // scheduledEntry represents an item in redis sorted set (aka ZSET). type sortedSetEntry struct { - msg *TaskMessage + msg *base.TaskMessage score int64 } // seedDefaultQueue initializes the default queue with the given messages. -func seedDefaultQueue(t *testing.T, r *RDB, msgs []*TaskMessage) { +func seedDefaultQueue(t *testing.T, r *RDB, msgs []*base.TaskMessage) { t.Helper() - seedRedisList(t, r.client, defaultQ, msgs) + seedRedisList(t, r.client, base.DefaultQueue, msgs) } // seedInProgressQueue initializes the in-progress queue with the given messages. -func seedInProgressQueue(t *testing.T, r *RDB, msgs []*TaskMessage) { +func seedInProgressQueue(t *testing.T, r *RDB, msgs []*base.TaskMessage) { t.Helper() - seedRedisList(t, r.client, inProgressQ, msgs) + seedRedisList(t, r.client, base.InProgressQueue, msgs) } // seedScheduledQueue initializes the scheduled queue with the given messages. func seedScheduledQueue(t *testing.T, r *RDB, entries []sortedSetEntry) { t.Helper() - seedRedisZSet(t, r.client, scheduledQ, entries) + seedRedisZSet(t, r.client, base.ScheduledQueue, entries) } // seedRetryQueue initializes the retry queue with the given messages. func seedRetryQueue(t *testing.T, r *RDB, entries []sortedSetEntry) { t.Helper() - seedRedisZSet(t, r.client, retryQ, entries) + seedRedisZSet(t, r.client, base.RetryQueue, entries) } // seedDeadQueue initializes the dead queue with the given messages. func seedDeadQueue(t *testing.T, r *RDB, entries []sortedSetEntry) { t.Helper() - seedRedisZSet(t, r.client, deadQ, entries) + seedRedisZSet(t, r.client, base.DeadQueue, entries) } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 689cd94..7539643 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/base" "github.com/rs/xid" ) @@ -73,11 +74,11 @@ type DeadTask struct { // CurrentStats returns a current state of the queues. func (r *RDB) CurrentStats() (*Stats, error) { pipe := r.client.Pipeline() - qlen := pipe.LLen(defaultQ) - plen := pipe.LLen(inProgressQ) - slen := pipe.ZCard(scheduledQ) - rlen := pipe.ZCard(retryQ) - dlen := pipe.ZCard(deadQ) + qlen := pipe.LLen(base.DefaultQueue) + plen := pipe.LLen(base.InProgressQueue) + slen := pipe.ZCard(base.ScheduledQueue) + rlen := pipe.ZCard(base.RetryQueue) + dlen := pipe.ZCard(base.DeadQueue) _, err := pipe.Exec() if err != nil { return nil, err @@ -94,13 +95,13 @@ func (r *RDB) CurrentStats() (*Stats, error) { // ListEnqueued returns all enqueued tasks that are ready to be processed. func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { - data, err := r.client.LRange(defaultQ, 0, -1).Result() + data, err := r.client.LRange(base.DefaultQueue, 0, -1).Result() if err != nil { return nil, err } var tasks []*EnqueuedTask for _, s := range data { - var msg TaskMessage + var msg base.TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { // continue // bad data, ignore and continue @@ -117,13 +118,13 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { // ListInProgress returns all tasks that are currently being processed. func (r *RDB) ListInProgress() ([]*InProgressTask, error) { - data, err := r.client.LRange(inProgressQ, 0, -1).Result() + data, err := r.client.LRange(base.InProgressQueue, 0, -1).Result() if err != nil { return nil, err } var tasks []*InProgressTask for _, s := range data { - var msg TaskMessage + var msg base.TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { continue // bad data, ignore and continue @@ -140,7 +141,7 @@ func (r *RDB) ListInProgress() ([]*InProgressTask, error) { // ListScheduled returns all tasks that are scheduled to be processed // in the future. func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { - data, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result() + data, err := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Result() if err != nil { return nil, err } @@ -150,7 +151,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { if !ok { continue // bad data, ignore and continue } - var msg TaskMessage + var msg base.TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { continue // bad data, ignore and continue @@ -170,7 +171,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { // ListRetry returns all tasks that have failed before and willl be retried // in the future. func (r *RDB) ListRetry() ([]*RetryTask, error) { - data, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result() + data, err := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Result() if err != nil { return nil, err } @@ -180,7 +181,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) { if !ok { continue // bad data, ignore and continue } - var msg TaskMessage + var msg base.TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { continue // bad data, ignore and continue @@ -202,7 +203,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) { // ListDead returns all tasks that have exhausted its retry limit. func (r *RDB) ListDead() ([]*DeadTask, error) { - data, err := r.client.ZRangeWithScores(deadQ, 0, -1).Result() + data, err := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Result() if err != nil { return nil, err } @@ -212,7 +213,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) { if !ok { continue // bad data, ignore and continue } - var msg TaskMessage + var msg base.TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { continue // bad data, ignore and continue @@ -234,7 +235,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) { // and enqueues it for processing. If a task that matches the id and score // does not exist, it returns ErrTaskNotFound. func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error { - n, err := r.removeAndEnqueue(deadQ, id.String(), float64(score)) + n, err := r.removeAndEnqueue(base.DeadQueue, id.String(), float64(score)) if err != nil { return err } @@ -248,7 +249,7 @@ func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error { // and enqueues it for processing. If a task that matches the id and score // does not exist, it returns ErrTaskNotFound. func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error { - n, err := r.removeAndEnqueue(retryQ, id.String(), float64(score)) + n, err := r.removeAndEnqueue(base.RetryQueue, id.String(), float64(score)) if err != nil { return err } @@ -262,7 +263,7 @@ func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error { // and enqueues it for processing. If a task that matches the id and score does not // exist, it returns ErrTaskNotFound. func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error { - n, err := r.removeAndEnqueue(scheduledQ, id.String(), float64(score)) + n, err := r.removeAndEnqueue(base.ScheduledQueue, id.String(), float64(score)) if err != nil { return err } @@ -275,19 +276,19 @@ func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error { // EnqueueAllScheduledTasks enqueues all tasks from scheduled queue // and returns the number of tasks enqueued. func (r *RDB) EnqueueAllScheduledTasks() (int64, error) { - return r.removeAndEnqueueAll(scheduledQ) + return r.removeAndEnqueueAll(base.ScheduledQueue) } // EnqueueAllRetryTasks enqueues all tasks from retry queue // and returns the number of tasks enqueued. func (r *RDB) EnqueueAllRetryTasks() (int64, error) { - return r.removeAndEnqueueAll(retryQ) + return r.removeAndEnqueueAll(base.RetryQueue) } // EnqueueAllDeadTasks enqueues all tasks from dead queue // and returns the number of tasks enqueued. func (r *RDB) EnqueueAllDeadTasks() (int64, error) { - return r.removeAndEnqueueAll(deadQ) + return r.removeAndEnqueueAll(base.DeadQueue) } func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { @@ -303,7 +304,7 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { end return 0 `) - res, err := script.Run(r.client, []string{zset, defaultQ}, score, id).Result() + res, err := script.Run(r.client, []string{zset, base.DefaultQueue}, score, id).Result() if err != nil { return 0, err } @@ -323,7 +324,7 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) { end return table.getn(msgs) `) - res, err := script.Run(r.client, []string{zset, defaultQ}).Result() + res, err := script.Run(r.client, []string{zset, base.DefaultQueue}).Result() if err != nil { return 0, err } @@ -338,21 +339,21 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) { // and deletes it. If a task that matches the id and score does not exist, // it returns ErrTaskNotFound. func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error { - return r.deleteTask(deadQ, id.String(), float64(score)) + return r.deleteTask(base.DeadQueue, id.String(), float64(score)) } // DeleteRetryTask finds a task that matches the given id and score from retry queue // and deletes it. If a task that matches the id and score does not exist, // it returns ErrTaskNotFound. func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error { - return r.deleteTask(retryQ, id.String(), float64(score)) + return r.deleteTask(base.RetryQueue, id.String(), float64(score)) } // DeleteScheduledTask finds a task that matches the given id and score from // scheduled queue and deletes it. If a task that matches the id and score //does not exist, it returns ErrTaskNotFound. func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error { - return r.deleteTask(scheduledQ, id.String(), float64(score)) + return r.deleteTask(base.ScheduledQueue, id.String(), float64(score)) } func (r *RDB) deleteTask(zset, id string, score float64) error { @@ -383,15 +384,15 @@ func (r *RDB) deleteTask(zset, id string, score float64) error { // DeleteAllDeadTasks deletes all tasks from the dead queue. func (r *RDB) DeleteAllDeadTasks() error { - return r.client.Del(deadQ).Err() + return r.client.Del(base.DeadQueue).Err() } // DeleteAllRetryTasks deletes all tasks from the dead queue. func (r *RDB) DeleteAllRetryTasks() error { - return r.client.Del(retryQ).Err() + return r.client.Del(base.RetryQueue).Err() } // DeleteAllScheduledTasks deletes all tasks from the dead queue. func (r *RDB) DeleteAllScheduledTasks() error { - return r.client.Del(scheduledQ).Err() + return r.client.Del(base.ScheduledQueue).Err() } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 8725374..891e64e 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/base" "github.com/rs/xid" ) @@ -55,16 +56,16 @@ func TestCurrentStats(t *testing.T) { m4 := newTaskMessage("sync", nil) tests := []struct { - enqueued []*TaskMessage - inProgress []*TaskMessage + enqueued []*base.TaskMessage + inProgress []*base.TaskMessage scheduled []sortedSetEntry retry []sortedSetEntry dead []sortedSetEntry want *Stats }{ { - enqueued: []*TaskMessage{m1}, - inProgress: []*TaskMessage{m2}, + enqueued: []*base.TaskMessage{m1}, + inProgress: []*base.TaskMessage{m2}, scheduled: []sortedSetEntry{ {m3, time.Now().Add(time.Hour).Unix()}, {m4, time.Now().Unix()}}, @@ -80,8 +81,8 @@ func TestCurrentStats(t *testing.T) { }, }, { - enqueued: []*TaskMessage{}, - inProgress: []*TaskMessage{}, + enqueued: []*base.TaskMessage{}, + inProgress: []*base.TaskMessage{}, scheduled: []sortedSetEntry{ {m3, time.Now().Unix()}, {m4, time.Now().Unix()}}, @@ -130,15 +131,15 @@ func TestListEnqueued(t *testing.T) { t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} tests := []struct { - enqueued []*TaskMessage + enqueued []*base.TaskMessage want []*EnqueuedTask }{ { - enqueued: []*TaskMessage{m1, m2}, + enqueued: []*base.TaskMessage{m1, m2}, want: []*EnqueuedTask{t1, t2}, }, { - enqueued: []*TaskMessage{}, + enqueued: []*base.TaskMessage{}, want: []*EnqueuedTask{}, }, } @@ -174,15 +175,15 @@ func TestListInProgress(t *testing.T) { t1 := &InProgressTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} t2 := &InProgressTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} tests := []struct { - inProgress []*TaskMessage + inProgress []*base.TaskMessage want []*InProgressTask }{ { - inProgress: []*TaskMessage{m1, m2}, + inProgress: []*base.TaskMessage{m1, m2}, want: []*InProgressTask{t1, t2}, }, { - inProgress: []*TaskMessage{}, + inProgress: []*base.TaskMessage{}, want: []*InProgressTask{}, }, } @@ -262,7 +263,7 @@ func TestListScheduled(t *testing.T) { func TestListRetry(t *testing.T) { r := setup(t) - m1 := &TaskMessage{ + m1 := &base.TaskMessage{ ID: xid.New(), Type: "send_email", Queue: "default", @@ -271,7 +272,7 @@ func TestListRetry(t *testing.T) { Retry: 25, Retried: 10, } - m2 := &TaskMessage{ + m2 := &base.TaskMessage{ ID: xid.New(), Type: "reindex", Queue: "default", @@ -346,14 +347,14 @@ func TestListRetry(t *testing.T) { func TestListDead(t *testing.T) { r := setup(t) - m1 := &TaskMessage{ + m1 := &base.TaskMessage{ ID: xid.New(), Type: "send_email", Queue: "default", Payload: map[string]interface{}{"subject": "hello"}, ErrorMsg: "email server not responding", } - m2 := &TaskMessage{ + m2 := &base.TaskMessage{ ID: xid.New(), Type: "reindex", Queue: "default", @@ -434,8 +435,8 @@ func TestEnqueueDeadTask(t *testing.T) { score int64 id xid.ID want error // expected return value from calling EnqueueDeadTask - wantDead []*TaskMessage - wantEnqueued []*TaskMessage + wantDead []*base.TaskMessage + wantEnqueued []*base.TaskMessage }{ { dead: []sortedSetEntry{ @@ -445,8 +446,8 @@ func TestEnqueueDeadTask(t *testing.T) { score: s2, id: t2.ID, want: nil, - wantDead: []*TaskMessage{t1}, - wantEnqueued: []*TaskMessage{t2}, + wantDead: []*base.TaskMessage{t1}, + wantEnqueued: []*base.TaskMessage{t2}, }, { dead: []sortedSetEntry{ @@ -456,8 +457,8 @@ func TestEnqueueDeadTask(t *testing.T) { score: 123, id: t2.ID, want: ErrTaskNotFound, - wantDead: []*TaskMessage{t1, t2}, - wantEnqueued: []*TaskMessage{}, + wantDead: []*base.TaskMessage{t1, t2}, + wantEnqueued: []*base.TaskMessage{}, }, } @@ -470,15 +471,15 @@ func TestEnqueueDeadTask(t *testing.T) { t.Errorf("r.EnqueueDeadTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DefaultQueue, diff) } - gotDeadRaw := r.client.ZRange(deadQ, 0, -1).Val() + gotDeadRaw := r.client.ZRange(base.DeadQueue, 0, -1).Val() gotDead := mustUnmarshalSlice(t, gotDeadRaw) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q, (-want, +got)\n%s", deadQ, diff) + t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.DeadQueue, diff) } } } @@ -495,8 +496,8 @@ func TestEnqueueRetryTask(t *testing.T) { score int64 id xid.ID want error // expected return value from calling EnqueueRetryTask - wantRetry []*TaskMessage - wantEnqueued []*TaskMessage + wantRetry []*base.TaskMessage + wantEnqueued []*base.TaskMessage }{ { retry: []sortedSetEntry{ @@ -506,8 +507,8 @@ func TestEnqueueRetryTask(t *testing.T) { score: s2, id: t2.ID, want: nil, - wantRetry: []*TaskMessage{t1}, - wantEnqueued: []*TaskMessage{t2}, + wantRetry: []*base.TaskMessage{t1}, + wantEnqueued: []*base.TaskMessage{t2}, }, { retry: []sortedSetEntry{ @@ -517,8 +518,8 @@ func TestEnqueueRetryTask(t *testing.T) { score: 123, id: t2.ID, want: ErrTaskNotFound, - wantRetry: []*TaskMessage{t1, t2}, - wantEnqueued: []*TaskMessage{}, + wantRetry: []*base.TaskMessage{t1, t2}, + wantEnqueued: []*base.TaskMessage{}, }, } @@ -532,15 +533,15 @@ func TestEnqueueRetryTask(t *testing.T) { t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DefaultQueue, diff) } - gotRetryRaw := r.client.ZRange(retryQ, 0, -1).Val() + gotRetryRaw := r.client.ZRange(base.RetryQueue, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q, (-want, +got)\n%s", retryQ, diff) + t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.RetryQueue, diff) } } } @@ -557,8 +558,8 @@ func TestEnqueueScheduledTask(t *testing.T) { score int64 id xid.ID want error // expected return value from calling EnqueueScheduledTask - wantScheduled []*TaskMessage - wantEnqueued []*TaskMessage + wantScheduled []*base.TaskMessage + wantEnqueued []*base.TaskMessage }{ { scheduled: []sortedSetEntry{ @@ -568,8 +569,8 @@ func TestEnqueueScheduledTask(t *testing.T) { score: s2, id: t2.ID, want: nil, - wantScheduled: []*TaskMessage{t1}, - wantEnqueued: []*TaskMessage{t2}, + wantScheduled: []*base.TaskMessage{t1}, + wantEnqueued: []*base.TaskMessage{t2}, }, { scheduled: []sortedSetEntry{ @@ -579,8 +580,8 @@ func TestEnqueueScheduledTask(t *testing.T) { score: 123, id: t2.ID, want: ErrTaskNotFound, - wantScheduled: []*TaskMessage{t1, t2}, - wantEnqueued: []*TaskMessage{}, + wantScheduled: []*base.TaskMessage{t1, t2}, + wantEnqueued: []*base.TaskMessage{}, }, } @@ -593,15 +594,15 @@ func TestEnqueueScheduledTask(t *testing.T) { t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want) continue } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DefaultQueue, diff) } - gotScheduledRaw := r.client.ZRange(scheduledQ, 0, -1).Val() + gotScheduledRaw := r.client.ZRange(base.ScheduledQueue, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q, (-want, +got)\n%s", scheduledQ, diff) + t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.ScheduledQueue, diff) } } } @@ -616,7 +617,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { desc string scheduled []sortedSetEntry want int64 - wantEnqueued []*TaskMessage + wantEnqueued []*base.TaskMessage }{ { desc: "with tasks in scheduled queue", @@ -626,13 +627,13 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { {t3, time.Now().Add(time.Hour).Unix()}, }, want: 3, - wantEnqueued: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*base.TaskMessage{t1, t2, t3}, }, { desc: "with empty scheduled queue", scheduled: []sortedSetEntry{}, want: 0, - wantEnqueued: []*TaskMessage{}, + wantEnqueued: []*base.TaskMessage{}, }, } @@ -652,10 +653,10 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { tc.desc, got, err, tc.want) } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, defaultQ, diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.DefaultQueue, diff) } } } @@ -670,7 +671,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) { description string retry []sortedSetEntry want int64 - wantEnqueued []*TaskMessage + wantEnqueued []*base.TaskMessage }{ { description: "with tasks in retry queue", @@ -680,13 +681,13 @@ func TestEnqueueAllRetryTasks(t *testing.T) { {t3, time.Now().Add(time.Hour).Unix()}, }, want: 3, - wantEnqueued: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*base.TaskMessage{t1, t2, t3}, }, { description: "with empty retry queue", retry: []sortedSetEntry{}, want: 0, - wantEnqueued: []*TaskMessage{}, + wantEnqueued: []*base.TaskMessage{}, }, } @@ -706,10 +707,10 @@ func TestEnqueueAllRetryTasks(t *testing.T) { tc.description, got, err, tc.want) } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, base.DefaultQueue, diff) } } } @@ -724,7 +725,7 @@ func TestEnqueueAllDeadTasks(t *testing.T) { desc string dead []sortedSetEntry want int64 - wantEnqueued []*TaskMessage + wantEnqueued []*base.TaskMessage }{ { desc: "with tasks in dead queue", @@ -734,13 +735,13 @@ func TestEnqueueAllDeadTasks(t *testing.T) { {t3, time.Now().Add(-time.Minute).Unix()}, }, want: 3, - wantEnqueued: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*base.TaskMessage{t1, t2, t3}, }, { desc: "with empty dead queue", dead: []sortedSetEntry{}, want: 0, - wantEnqueued: []*TaskMessage{}, + wantEnqueued: []*base.TaskMessage{}, }, } @@ -760,10 +761,10 @@ func TestEnqueueAllDeadTasks(t *testing.T) { tc.desc, got, err, tc.want) } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, defaultQ, diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.DefaultQueue, diff) } } } @@ -780,7 +781,7 @@ func TestDeleteDeadTask(t *testing.T) { id xid.ID score int64 want error - wantDead []*TaskMessage + wantDead []*base.TaskMessage }{ { dead: []sortedSetEntry{ @@ -790,7 +791,7 @@ func TestDeleteDeadTask(t *testing.T) { id: m1.ID, score: t1.Unix(), want: nil, - wantDead: []*TaskMessage{m2}, + wantDead: []*base.TaskMessage{m2}, }, { dead: []sortedSetEntry{ @@ -800,14 +801,14 @@ func TestDeleteDeadTask(t *testing.T) { id: m1.ID, score: t2.Unix(), // id and score mismatch want: ErrTaskNotFound, - wantDead: []*TaskMessage{m1, m2}, + wantDead: []*base.TaskMessage{m1, m2}, }, { dead: []sortedSetEntry{}, id: m1.ID, score: t1.Unix(), want: ErrTaskNotFound, - wantDead: []*TaskMessage{}, + wantDead: []*base.TaskMessage{}, }, } @@ -821,10 +822,10 @@ func TestDeleteDeadTask(t *testing.T) { continue } - gotDeadRaw := r.client.ZRange(deadQ, 0, -1).Val() + gotDeadRaw := r.client.ZRange(base.DeadQueue, 0, -1).Val() gotDead := mustUnmarshalSlice(t, gotDeadRaw) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", deadQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadQueue, diff) } } } @@ -841,7 +842,7 @@ func TestDeleteRetryTask(t *testing.T) { id xid.ID score int64 want error - wantRetry []*TaskMessage + wantRetry []*base.TaskMessage }{ { retry: []sortedSetEntry{ @@ -851,7 +852,7 @@ func TestDeleteRetryTask(t *testing.T) { id: m1.ID, score: t1.Unix(), want: nil, - wantRetry: []*TaskMessage{m2}, + wantRetry: []*base.TaskMessage{m2}, }, { retry: []sortedSetEntry{ @@ -860,7 +861,7 @@ func TestDeleteRetryTask(t *testing.T) { id: m2.ID, score: t2.Unix(), want: ErrTaskNotFound, - wantRetry: []*TaskMessage{m1}, + wantRetry: []*base.TaskMessage{m1}, }, } @@ -874,10 +875,10 @@ func TestDeleteRetryTask(t *testing.T) { continue } - gotRetryRaw := r.client.ZRange(retryQ, 0, -1).Val() + gotRetryRaw := r.client.ZRange(base.RetryQueue, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) } } } @@ -894,7 +895,7 @@ func TestDeleteScheduledTask(t *testing.T) { id xid.ID score int64 want error - wantScheduled []*TaskMessage + wantScheduled []*base.TaskMessage }{ { scheduled: []sortedSetEntry{ @@ -904,7 +905,7 @@ func TestDeleteScheduledTask(t *testing.T) { id: m1.ID, score: t1.Unix(), want: nil, - wantScheduled: []*TaskMessage{m2}, + wantScheduled: []*base.TaskMessage{m2}, }, { scheduled: []sortedSetEntry{ @@ -913,7 +914,7 @@ func TestDeleteScheduledTask(t *testing.T) { id: m2.ID, score: t2.Unix(), want: ErrTaskNotFound, - wantScheduled: []*TaskMessage{m1}, + wantScheduled: []*base.TaskMessage{m1}, }, } @@ -927,10 +928,10 @@ func TestDeleteScheduledTask(t *testing.T) { continue } - gotScheduledRaw := r.client.ZRange(scheduledQ, 0, -1).Val() + gotScheduledRaw := r.client.ZRange(base.ScheduledQueue, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", scheduledQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledQueue, diff) } } } @@ -943,7 +944,7 @@ func TestDeleteAllDeadTasks(t *testing.T) { tests := []struct { dead []sortedSetEntry - wantDead []*TaskMessage + wantDead []*base.TaskMessage }{ { dead: []sortedSetEntry{ @@ -951,7 +952,7 @@ func TestDeleteAllDeadTasks(t *testing.T) { {m2, time.Now().Unix()}, {m3, time.Now().Unix()}, }, - wantDead: []*TaskMessage{}, + wantDead: []*base.TaskMessage{}, }, } @@ -964,10 +965,10 @@ func TestDeleteAllDeadTasks(t *testing.T) { t.Errorf("r.DeleteAllDeaadTasks = %v, want nil", err) } - gotDeadRaw := r.client.ZRange(deadQ, 0, -1).Val() + gotDeadRaw := r.client.ZRange(base.DeadQueue, 0, -1).Val() gotDead := mustUnmarshalSlice(t, gotDeadRaw) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", deadQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadQueue, diff) } } } @@ -980,7 +981,7 @@ func TestDeleteAllRetryTasks(t *testing.T) { tests := []struct { retry []sortedSetEntry - wantRetry []*TaskMessage + wantRetry []*base.TaskMessage }{ { retry: []sortedSetEntry{ @@ -988,7 +989,7 @@ func TestDeleteAllRetryTasks(t *testing.T) { {m2, time.Now().Unix()}, {m3, time.Now().Unix()}, }, - wantRetry: []*TaskMessage{}, + wantRetry: []*base.TaskMessage{}, }, } @@ -1001,10 +1002,10 @@ func TestDeleteAllRetryTasks(t *testing.T) { t.Errorf("r.DeleteAllDeaadTasks = %v, want nil", err) } - gotRetryRaw := r.client.ZRange(retryQ, 0, -1).Val() + gotRetryRaw := r.client.ZRange(base.RetryQueue, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) } } } @@ -1017,7 +1018,7 @@ func TestDeleteAllScheduledTasks(t *testing.T) { tests := []struct { scheduled []sortedSetEntry - wantScheduled []*TaskMessage + wantScheduled []*base.TaskMessage }{ { scheduled: []sortedSetEntry{ @@ -1025,7 +1026,7 @@ func TestDeleteAllScheduledTasks(t *testing.T) { {m2, time.Now().Add(time.Minute).Unix()}, {m3, time.Now().Add(time.Minute).Unix()}, }, - wantScheduled: []*TaskMessage{}, + wantScheduled: []*base.TaskMessage{}, }, } @@ -1038,10 +1039,10 @@ func TestDeleteAllScheduledTasks(t *testing.T) { t.Errorf("r.DeleteAllDeaadTasks = %v, want nil", err) } - gotScheduledRaw := r.client.ZRange(scheduledQ, 0, -1).Val() + gotScheduledRaw := r.client.ZRange(base.ScheduledQueue, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", scheduledQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledQueue, diff) } } } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 1adf18a..ef89700 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -8,17 +8,7 @@ import ( "time" "github.com/go-redis/redis/v7" - "github.com/rs/xid" -) - -// Redis keys -const ( - queuePrefix = "asynq:queues:" // LIST - asynq:queues: - defaultQ = queuePrefix + "default" // LIST - scheduledQ = "asynq:scheduled" // ZSET - retryQ = "asynq:retry" // ZSET - deadQ = "asynq:dead" // ZSET - inProgressQ = "asynq:in_progress" // LIST + "github.com/hibiken/asynq/internal/base" ) var ( @@ -39,28 +29,6 @@ func NewRDB(client *redis.Client) *RDB { return &RDB{client} } -// TaskMessage is the internal representation of a task with additional metadata fields. -// Serialized data of this type gets written in redis. -type TaskMessage struct { - //-------- Task fields -------- - // Type represents the kind of task. - Type string - // Payload holds data needed to process the task. - Payload map[string]interface{} - - //-------- Metadata fields -------- - // ID is a unique identifier for each task - ID xid.ID - // Queue is a name this message should be enqueued to - Queue string - // Retry is the max number of retry for this task. - Retry int - // Retried is the number of times we've retried this task so far - Retried int - // ErrorMsg holds the error message from the last failure - ErrorMsg string -} - // Close closes the connection with redis server. func (r *RDB) Close() error { return r.client.Close() @@ -68,12 +36,12 @@ func (r *RDB) Close() error { // Enqueue inserts the given task to the end of the queue. // It also adds the queue name to the "all-queues" list. -func (r *RDB) Enqueue(msg *TaskMessage) error { +func (r *RDB) Enqueue(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } - qname := queuePrefix + msg.Queue + qname := base.QueuePrefix + msg.Queue pipe := r.client.Pipeline() pipe.LPush(qname, string(bytes)) _, err = pipe.Exec() @@ -86,15 +54,15 @@ func (r *RDB) Enqueue(msg *TaskMessage) error { // Dequeue blocks until there is a task available to be processed, // once a task is available, it adds the task to "in progress" list // and returns the task. -func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) { - data, err := r.client.BRPopLPush(defaultQ, inProgressQ, timeout).Result() +func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) { + data, err := r.client.BRPopLPush(base.DefaultQueue, base.InProgressQueue, timeout).Result() if err == redis.Nil { return nil, ErrDequeueTimeout } if err != nil { - return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", defaultQ, inProgressQ, timeout, err) + return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", base.DefaultQueue, base.InProgressQueue, timeout, err) } - var msg TaskMessage + var msg base.TaskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err) @@ -103,22 +71,22 @@ func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) { } // Done removes the task from in-progress queue to mark the task as done. -func (r *RDB) Done(msg *TaskMessage) error { +func (r *RDB) Done(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } // NOTE: count ZERO means "remove all elements equal to val" - err = r.client.LRem(inProgressQ, 0, string(bytes)).Err() + err = r.client.LRem(base.InProgressQueue, 0, string(bytes)).Err() if err != nil { - return fmt.Errorf("command `LREM %s 0 %s` failed: %v", inProgressQ, string(bytes), err) + return fmt.Errorf("command `LREM %s 0 %s` failed: %v", base.InProgressQueue, string(bytes), err) } return nil } // Requeue moves the task from in-progress queue to the default // queue. -func (r *RDB) Requeue(msg *TaskMessage) error { +func (r *RDB) Requeue(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -126,33 +94,33 @@ func (r *RDB) Requeue(msg *TaskMessage) error { // Note: Use RPUSH to push to the head of the queue. // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:queues:default - // ARGV[1] -> taskMessage value + // ARGV[1] -> base.TaskMessage value script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("RPUSH", KEYS[2], ARGV[1]) return redis.status_reply("OK") `) - _, err = script.Run(r.client, []string{inProgressQ, defaultQ}, string(bytes)).Result() + _, err = script.Run(r.client, []string{base.InProgressQueue, base.DefaultQueue}, string(bytes)).Result() return err } // Schedule adds the task to the backlog queue to be processed in the future. -func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { +func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } score := float64(processAt.Unix()) - err = r.client.ZAdd(scheduledQ, &redis.Z{Member: string(bytes), Score: score}).Err() + err = r.client.ZAdd(base.ScheduledQueue, &redis.Z{Member: string(bytes), Score: score}).Err() if err != nil { - return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", scheduledQ, score, string(bytes), err) + return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", base.ScheduledQueue, score, string(bytes), err) } return nil } // Retry moves the task from in-progress to retry queue, incrementing retry count // and assigning error message to the task message. -func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error { +func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { bytesToRemove, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -166,15 +134,15 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error } // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:retry - // ARGV[1] -> TaskMessage value to remove from InProgress queue - // ARGV[2] -> TaskMessage value to add to Retry queue + // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue + // ARGV[2] -> base.TaskMessage value to add to Retry queue // ARGV[3] -> retry_at UNIX timestamp script := redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) return redis.status_reply("OK") `) - _, err = script.Run(r.client, []string{inProgressQ, retryQ}, + _, err = script.Run(r.client, []string{base.InProgressQueue, base.RetryQueue}, string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result() return err } @@ -182,7 +150,7 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error // Kill sends the task to "dead" queue from in-progress queue, assigning // the error message to the task. // It also trims the set by timestamp and set size. -func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { +func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { const maxDeadTask = 10 const deadExpirationInDays = 90 bytesToRemove, err := json.Marshal(msg) @@ -199,8 +167,8 @@ func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:dead - // ARGV[1] -> TaskMessage value to remove from InProgress queue - // ARGV[2] -> TaskMessage value to add to Dead queue + // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue + // ARGV[2] -> base.TaskMessage value to add to Dead queue // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[5] -> max number of tasks in dead queue (e.g., 100) @@ -211,7 +179,7 @@ func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) return redis.status_reply("OK") `) - _, err = script.Run(r.client, []string{inProgressQ, deadQ}, + _, err = script.Run(r.client, []string{base.InProgressQueue, base.DeadQueue}, string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result() return err } @@ -226,7 +194,7 @@ func (r *RDB) RestoreUnfinished() (int64, error) { end return len `) - res, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result() + res, err := script.Run(r.client, []string{base.InProgressQueue, base.DefaultQueue}).Result() if err != nil { return 0, err } @@ -240,7 +208,7 @@ func (r *RDB) RestoreUnfinished() (int64, error) { // CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // have to be processed. func (r *RDB) CheckAndEnqueue() error { - delayed := []string{scheduledQ, retryQ} + delayed := []string{base.ScheduledQueue, base.RetryQueue} for _, zset := range delayed { if err := r.forward(zset); err != nil { return err @@ -261,6 +229,6 @@ func (r *RDB) forward(from string) error { return msgs `) now := float64(time.Now().Unix()) - _, err := script.Run(r.client, []string{from, defaultQ}, now).Result() + _, err := script.Run(r.client, []string{from, base.DefaultQueue}, now).Result() return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index f5d3324..f263104 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -6,12 +6,13 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/base" ) func TestEnqueue(t *testing.T) { r := setup(t) tests := []struct { - msg *TaskMessage + msg *base.TaskMessage }{ {msg: newTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})}, {msg: newTaskMessage("generate_csv", map[string]interface{}{})}, @@ -26,9 +27,9 @@ func TestEnqueue(t *testing.T) { t.Errorf("(*RDB).Enqueue = %v, want nil", err) continue } - res := r.client.LRange(defaultQ, 0, -1).Val() + res := r.client.LRange(base.DefaultQueue, 0, -1).Val() if len(res) != 1 { - t.Errorf("%q has length %d, want 1", defaultQ, len(res)) + t.Errorf("%q has length %d, want 1", base.DefaultQueue, len(res)) continue } if diff := cmp.Diff(tc.msg, mustUnmarshal(t, res[0])); diff != "" { @@ -41,13 +42,13 @@ func TestDequeue(t *testing.T) { r := setup(t) t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) tests := []struct { - enqueued []*TaskMessage - want *TaskMessage + enqueued []*base.TaskMessage + want *base.TaskMessage err error inProgress int64 // length of "in-progress" tasks after dequeue }{ - {enqueued: []*TaskMessage{t1}, want: t1, err: nil, inProgress: 1}, - {enqueued: []*TaskMessage{}, want: nil, err: ErrDequeueTimeout, inProgress: 0}, + {enqueued: []*base.TaskMessage{t1}, want: t1, err: nil, inProgress: 1}, + {enqueued: []*base.TaskMessage{}, want: nil, err: ErrDequeueTimeout, inProgress: 0}, } for _, tc := range tests { @@ -60,8 +61,8 @@ func TestDequeue(t *testing.T) { got, err, tc.want, tc.err) continue } - if l := r.client.LLen(inProgressQ).Val(); l != tc.inProgress { - t.Errorf("%q has length %d, want %d", inProgressQ, l, tc.inProgress) + if l := r.client.LLen(base.InProgressQueue).Val(); l != tc.inProgress { + t.Errorf("%q has length %d, want %d", base.InProgressQueue, l, tc.inProgress) } } } @@ -72,24 +73,24 @@ func TestDone(t *testing.T) { t2 := newTaskMessage("export_csv", nil) tests := []struct { - inProgress []*TaskMessage // initial state of the in-progress list - target *TaskMessage // task to remove - wantInProgress []*TaskMessage // final state of the in-progress list + inProgress []*base.TaskMessage // initial state of the in-progress list + target *base.TaskMessage // task to remove + wantInProgress []*base.TaskMessage // final state of the in-progress list }{ { - inProgress: []*TaskMessage{t1, t2}, + inProgress: []*base.TaskMessage{t1, t2}, target: t1, - wantInProgress: []*TaskMessage{t2}, + wantInProgress: []*base.TaskMessage{t2}, }, { - inProgress: []*TaskMessage{t2}, + inProgress: []*base.TaskMessage{t2}, target: t1, - wantInProgress: []*TaskMessage{t2}, + wantInProgress: []*base.TaskMessage{t2}, }, { - inProgress: []*TaskMessage{t1}, + inProgress: []*base.TaskMessage{t1}, target: t1, - wantInProgress: []*TaskMessage{}, + wantInProgress: []*base.TaskMessage{}, }, } @@ -103,10 +104,10 @@ func TestDone(t *testing.T) { continue } - data := r.client.LRange(inProgressQ, 0, -1).Val() + data := r.client.LRange(base.InProgressQueue, 0, -1).Val() gotInProgress := mustUnmarshalSlice(t, data) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Done: (-want, +got):\n%s", inProgressQ, diff) + t.Errorf("mismatch found in %q after calling (*RDB).Done: (-want, +got):\n%s", base.InProgressQueue, diff) continue } } @@ -118,25 +119,25 @@ func TestRequeue(t *testing.T) { t2 := newTaskMessage("export_csv", nil) tests := []struct { - enqueued []*TaskMessage // initial state of the default queue - inProgress []*TaskMessage // initial state of the in-progress list - target *TaskMessage // task to requeue - wantEnqueued []*TaskMessage // final state of the default queue - wantInProgress []*TaskMessage // final state of the in-progress list + enqueued []*base.TaskMessage // initial state of the default queue + inProgress []*base.TaskMessage // initial state of the in-progress list + target *base.TaskMessage // task to requeue + wantEnqueued []*base.TaskMessage // final state of the default queue + wantInProgress []*base.TaskMessage // final state of the in-progress list }{ { - enqueued: []*TaskMessage{}, - inProgress: []*TaskMessage{t1, t2}, + enqueued: []*base.TaskMessage{}, + inProgress: []*base.TaskMessage{t1, t2}, target: t1, - wantEnqueued: []*TaskMessage{t1}, - wantInProgress: []*TaskMessage{t2}, + wantEnqueued: []*base.TaskMessage{t1}, + wantInProgress: []*base.TaskMessage{t2}, }, { - enqueued: []*TaskMessage{t1}, - inProgress: []*TaskMessage{t2}, + enqueued: []*base.TaskMessage{t1}, + inProgress: []*base.TaskMessage{t2}, target: t2, - wantEnqueued: []*TaskMessage{t1, t2}, - wantInProgress: []*TaskMessage{}, + wantEnqueued: []*base.TaskMessage{t1, t2}, + wantInProgress: []*base.TaskMessage{}, }, } @@ -151,16 +152,16 @@ func TestRequeue(t *testing.T) { continue } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", defaultQ, diff) + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff) } - gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val() gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", inProgressQ, diff) + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff) } } } @@ -171,7 +172,7 @@ func TestKill(t *testing.T) { t2 := newTaskMessage("reindex", nil) t3 := newTaskMessage("generate_csv", nil) errMsg := "SMTP server not responding" - t1AfterKill := &TaskMessage{ + t1AfterKill := &base.TaskMessage{ ID: t1.ID, Type: t1.Type, Payload: t1.Payload, @@ -184,29 +185,29 @@ func TestKill(t *testing.T) { // TODO(hibiken): add test cases for trimming tests := []struct { - inProgress []*TaskMessage + inProgress []*base.TaskMessage dead []sortedSetEntry - target *TaskMessage // task to kill - wantInProgress []*TaskMessage + target *base.TaskMessage // task to kill + wantInProgress []*base.TaskMessage wantDead []sortedSetEntry }{ { - inProgress: []*TaskMessage{t1, t2}, + inProgress: []*base.TaskMessage{t1, t2}, dead: []sortedSetEntry{ {t3, now.Add(-time.Hour).Unix()}, }, target: t1, - wantInProgress: []*TaskMessage{t2}, + wantInProgress: []*base.TaskMessage{t2}, wantDead: []sortedSetEntry{ {t1AfterKill, now.Unix()}, {t3, now.Add(-time.Hour).Unix()}, }, }, { - inProgress: []*TaskMessage{t1, t2, t3}, + inProgress: []*base.TaskMessage{t1, t2, t3}, dead: []sortedSetEntry{}, target: t1, - wantInProgress: []*TaskMessage{t2, t3}, + wantInProgress: []*base.TaskMessage{t2, t3}, wantDead: []sortedSetEntry{ {t1AfterKill, now.Unix()}, }, @@ -224,14 +225,14 @@ func TestKill(t *testing.T) { continue } - gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val() gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressQueue, diff) } var gotDead []sortedSetEntry - data := r.client.ZRangeWithScores(deadQ, 0, -1).Val() + data := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Val() for _, z := range data { gotDead = append(gotDead, sortedSetEntry{ msg: mustUnmarshal(t, z.Member.(string)), @@ -241,7 +242,7 @@ func TestKill(t *testing.T) { cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) if diff := cmp.Diff(tc.wantDead, gotDead, cmpOpt, sortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", deadQ, diff) + t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff) } } } @@ -253,32 +254,32 @@ func TestRestoreUnfinished(t *testing.T) { t3 := newTaskMessage("sync_stuff", nil) tests := []struct { - inProgress []*TaskMessage - enqueued []*TaskMessage + inProgress []*base.TaskMessage + enqueued []*base.TaskMessage want int64 - wantInProgress []*TaskMessage - wantEnqueued []*TaskMessage + wantInProgress []*base.TaskMessage + wantEnqueued []*base.TaskMessage }{ { - inProgress: []*TaskMessage{t1, t2, t3}, - enqueued: []*TaskMessage{}, + inProgress: []*base.TaskMessage{t1, t2, t3}, + enqueued: []*base.TaskMessage{}, want: 3, - wantInProgress: []*TaskMessage{}, - wantEnqueued: []*TaskMessage{t1, t2, t3}, + wantInProgress: []*base.TaskMessage{}, + wantEnqueued: []*base.TaskMessage{t1, t2, t3}, }, { - inProgress: []*TaskMessage{}, - enqueued: []*TaskMessage{t1, t2, t3}, + inProgress: []*base.TaskMessage{}, + enqueued: []*base.TaskMessage{t1, t2, t3}, want: 0, - wantInProgress: []*TaskMessage{}, - wantEnqueued: []*TaskMessage{t1, t2, t3}, + wantInProgress: []*base.TaskMessage{}, + wantEnqueued: []*base.TaskMessage{t1, t2, t3}, }, { - inProgress: []*TaskMessage{t2, t3}, - enqueued: []*TaskMessage{t1}, + inProgress: []*base.TaskMessage{t2, t3}, + enqueued: []*base.TaskMessage{t1}, want: 2, - wantInProgress: []*TaskMessage{}, - wantEnqueued: []*TaskMessage{t1, t2, t3}, + wantInProgress: []*base.TaskMessage{}, + wantEnqueued: []*base.TaskMessage{t1, t2, t3}, }, } @@ -294,15 +295,15 @@ func TestRestoreUnfinished(t *testing.T) { continue } - gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val() gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgressQ, diff) + t.Errorf("mismatch found in %q (-want, +got)\n%s", base.InProgressQueue, diff) } - gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQ, diff) + t.Errorf("mismatch found in %q (-want, +got)\n%s", base.DefaultQueue, diff) } } } @@ -318,9 +319,9 @@ func TestCheckAndEnqueue(t *testing.T) { tests := []struct { scheduled []sortedSetEntry retry []sortedSetEntry - wantQueued []*TaskMessage - wantScheduled []*TaskMessage - wantRetry []*TaskMessage + wantQueued []*base.TaskMessage + wantScheduled []*base.TaskMessage + wantRetry []*base.TaskMessage }{ { scheduled: []sortedSetEntry{ @@ -328,9 +329,9 @@ func TestCheckAndEnqueue(t *testing.T) { {t2, secondAgo.Unix()}}, retry: []sortedSetEntry{ {t3, secondAgo.Unix()}}, - wantQueued: []*TaskMessage{t1, t2, t3}, - wantScheduled: []*TaskMessage{}, - wantRetry: []*TaskMessage{}, + wantQueued: []*base.TaskMessage{t1, t2, t3}, + wantScheduled: []*base.TaskMessage{}, + wantRetry: []*base.TaskMessage{}, }, { scheduled: []sortedSetEntry{ @@ -338,9 +339,9 @@ func TestCheckAndEnqueue(t *testing.T) { {t2, secondAgo.Unix()}}, retry: []sortedSetEntry{ {t3, secondAgo.Unix()}}, - wantQueued: []*TaskMessage{t2, t3}, - wantScheduled: []*TaskMessage{t1}, - wantRetry: []*TaskMessage{}, + wantQueued: []*base.TaskMessage{t2, t3}, + wantScheduled: []*base.TaskMessage{t1}, + wantRetry: []*base.TaskMessage{}, }, { scheduled: []sortedSetEntry{ @@ -348,9 +349,9 @@ func TestCheckAndEnqueue(t *testing.T) { {t2, hourFromNow.Unix()}}, retry: []sortedSetEntry{ {t3, hourFromNow.Unix()}}, - wantQueued: []*TaskMessage{}, - wantScheduled: []*TaskMessage{t1, t2}, - wantRetry: []*TaskMessage{t3}, + wantQueued: []*base.TaskMessage{}, + wantScheduled: []*base.TaskMessage{t1, t2}, + wantRetry: []*base.TaskMessage{t3}, }, } @@ -364,20 +365,20 @@ func TestCheckAndEnqueue(t *testing.T) { t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) continue } - queued := r.client.LRange(defaultQ, 0, -1).Val() + queued := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotQueued := mustUnmarshalSlice(t, queued) if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", defaultQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DefaultQueue, diff) } - scheduled := r.client.ZRange(scheduledQ, 0, -1).Val() + scheduled := r.client.ZRange(base.ScheduledQueue, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, scheduled) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", scheduledQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledQueue, diff) } - retry := r.client.ZRange(retryQ, 0, -1).Val() + retry := r.client.ZRange(base.RetryQueue, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, retry) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) } } } @@ -385,7 +386,7 @@ func TestCheckAndEnqueue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) tests := []struct { - msg *TaskMessage + msg *base.TaskMessage processAt time.Time }{ { @@ -404,9 +405,9 @@ func TestSchedule(t *testing.T) { continue } - res := r.client.ZRangeWithScores(scheduledQ, 0, -1).Val() + res := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Val() if len(res) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), scheduledQ) + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), base.ScheduledQueue) continue } if res[0].Score != float64(tc.processAt.Unix()) { @@ -423,7 +424,7 @@ func TestRetry(t *testing.T) { t3 := newTaskMessage("reindex", nil) t1.Retried = 10 errMsg := "SMTP server is not responding" - t1AfterRetry := &TaskMessage{ + t1AfterRetry := &base.TaskMessage{ ID: t1.ID, Type: t1.Type, Payload: t1.Payload, @@ -435,23 +436,23 @@ func TestRetry(t *testing.T) { now := time.Now() tests := []struct { - inProgress []*TaskMessage + inProgress []*base.TaskMessage retry []sortedSetEntry - msg *TaskMessage + msg *base.TaskMessage processAt time.Time errMsg string - wantInProgress []*TaskMessage + wantInProgress []*base.TaskMessage wantRetry []sortedSetEntry }{ { - inProgress: []*TaskMessage{t1, t2}, + inProgress: []*base.TaskMessage{t1, t2}, retry: []sortedSetEntry{ {t3, now.Add(time.Minute).Unix()}, }, msg: t1, processAt: now.Add(5 * time.Minute), errMsg: errMsg, - wantInProgress: []*TaskMessage{t2}, + wantInProgress: []*base.TaskMessage{t2}, wantRetry: []sortedSetEntry{ {t1AfterRetry, now.Add(5 * time.Minute).Unix()}, {t3, now.Add(time.Minute).Unix()}, @@ -470,13 +471,13 @@ func TestRetry(t *testing.T) { continue } - gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val() + gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val() gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressQueue, diff) } - gotRetryRaw := r.client.ZRangeWithScores(retryQ, 0, -1).Val() + gotRetryRaw := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Val() var gotRetry []sortedSetEntry for _, z := range gotRetryRaw { gotRetry = append(gotRetry, sortedSetEntry{ @@ -486,7 +487,7 @@ func TestRetry(t *testing.T) { } cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) } } } diff --git a/poller_test.go b/poller_test.go index f05e693..6da3c33 100644 --- a/poller_test.go +++ b/poller_test.go @@ -6,12 +6,13 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) func TestPoller(t *testing.T) { type scheduledTask struct { - msg *rdb.TaskMessage + msg *base.TaskMessage processAt time.Time } r := setup(t) @@ -24,13 +25,13 @@ func TestPoller(t *testing.T) { t4 := randomTask("sync", "default", nil) tests := []struct { - initScheduled []scheduledTask // scheduled queue initial state - initRetry []scheduledTask // retry queue initial state - initQueue []*rdb.TaskMessage // default queue initial state - wait time.Duration // wait duration before checking for final state - wantScheduled []*rdb.TaskMessage // schedule queue final state - wantRetry []*rdb.TaskMessage // retry queue final state - wantQueue []*rdb.TaskMessage // default queue final state + initScheduled []scheduledTask // scheduled queue initial state + initRetry []scheduledTask // retry queue initial state + initQueue []*base.TaskMessage // default queue initial state + wait time.Duration // wait duration before checking for final state + wantScheduled []*base.TaskMessage // schedule queue final state + wantRetry []*base.TaskMessage // retry queue final state + wantQueue []*base.TaskMessage // default queue final state }{ { initScheduled: []scheduledTask{ @@ -40,11 +41,11 @@ func TestPoller(t *testing.T) { initRetry: []scheduledTask{ {t3, time.Now().Add(-500 * time.Millisecond)}, }, - initQueue: []*rdb.TaskMessage{t4}, + initQueue: []*base.TaskMessage{t4}, wait: pollInterval * 2, - wantScheduled: []*rdb.TaskMessage{t1}, - wantRetry: []*rdb.TaskMessage{}, - wantQueue: []*rdb.TaskMessage{t2, t3, t4}, + wantScheduled: []*base.TaskMessage{t1}, + wantRetry: []*base.TaskMessage{}, + wantQueue: []*base.TaskMessage{t2, t3, t4}, }, { initScheduled: []scheduledTask{ @@ -53,11 +54,11 @@ func TestPoller(t *testing.T) { {t3, time.Now().Add(-500 * time.Millisecond)}, }, initRetry: []scheduledTask{}, - initQueue: []*rdb.TaskMessage{t4}, + initQueue: []*base.TaskMessage{t4}, wait: pollInterval * 2, - wantScheduled: []*rdb.TaskMessage{}, - wantRetry: []*rdb.TaskMessage{}, - wantQueue: []*rdb.TaskMessage{t1, t2, t3, t4}, + wantScheduled: []*base.TaskMessage{}, + wantRetry: []*base.TaskMessage{}, + wantQueue: []*base.TaskMessage{t1, t2, t3, t4}, }, } @@ -75,7 +76,7 @@ func TestPoller(t *testing.T) { } // initialize retry queue for _, st := range tc.initRetry { - err := r.ZAdd(retryQ, &redis.Z{ + err := r.ZAdd(base.RetryQueue, &redis.Z{ Member: mustMarshal(t, st.msg), Score: float64(st.processAt.Unix()), }).Err() @@ -95,22 +96,22 @@ func TestPoller(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotScheduledRaw := r.ZRange(scheduledQ, 0, -1).Val() + gotScheduledRaw := r.ZRange(base.ScheduledQueue, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", scheduledQ, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", base.ScheduledQueue, diff) } - gotRetryRaw := r.ZRange(retryQ, 0, -1).Val() + gotRetryRaw := r.ZRange(base.RetryQueue, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", retryQ, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", base.RetryQueue, diff) } - gotQueueRaw := r.LRange(defaultQ, 0, -1).Val() + gotQueueRaw := r.LRange(base.DefaultQueue, 0, -1).Val() gotQueue := mustUnmarshalSlice(t, gotQueueRaw) if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", defaultQ, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", base.DefaultQueue, diff) } } } diff --git a/processor.go b/processor.go index 8dcee58..b4a7139 100644 --- a/processor.go +++ b/processor.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -160,21 +161,21 @@ func (p *processor) restore() { } } -func (p *processor) requeue(msg *rdb.TaskMessage) { +func (p *processor) requeue(msg *base.TaskMessage) { err := p.rdb.Requeue(msg) if err != nil { log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err) } } -func (p *processor) markAsDone(msg *rdb.TaskMessage) { +func (p *processor) markAsDone(msg *base.TaskMessage) { err := p.rdb.Done(msg) if err != nil { log.Printf("[ERROR] Could not remove task from InProgress queue: %v\n", err) } } -func (p *processor) retry(msg *rdb.TaskMessage, errMsg string) { +func (p *processor) retry(msg *base.TaskMessage, errMsg string) { retryAt := time.Now().Add(delaySeconds(msg.Retried)) err := p.rdb.Retry(msg, retryAt, errMsg) if err != nil { @@ -182,7 +183,7 @@ func (p *processor) retry(msg *rdb.TaskMessage, errMsg string) { } } -func (p *processor) kill(msg *rdb.TaskMessage, errMsg string) { +func (p *processor) kill(msg *base.TaskMessage, errMsg string) { log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) err := p.rdb.Kill(msg, errMsg) if err != nil { diff --git a/processor_test.go b/processor_test.go index fad80bd..693a719 100644 --- a/processor_test.go +++ b/processor_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -25,20 +26,20 @@ func TestProcessorSuccess(t *testing.T) { t4 := &Task{Type: m4.Type, Payload: m4.Payload} tests := []struct { - initQueue []*rdb.TaskMessage // initial default queue state - incoming []*rdb.TaskMessage // tasks to be enqueued during run - wait time.Duration // wait duration between starting and stopping processor for this test case - wantProcessed []*Task // tasks to be processed at the end + initQueue []*base.TaskMessage // initial default queue state + incoming []*base.TaskMessage // tasks to be enqueued during run + wait time.Duration // wait duration between starting and stopping processor for this test case + wantProcessed []*Task // tasks to be processed at the end }{ { - initQueue: []*rdb.TaskMessage{m1}, - incoming: []*rdb.TaskMessage{m2, m3, m4}, + initQueue: []*base.TaskMessage{m1}, + incoming: []*base.TaskMessage{m2, m3, m4}, wait: time.Second, wantProcessed: []*Task{t1, t2, t3, t4}, }, { - initQueue: []*rdb.TaskMessage{}, - incoming: []*rdb.TaskMessage{m1}, + initQueue: []*base.TaskMessage{}, + incoming: []*base.TaskMessage{m1}, wait: time.Second, wantProcessed: []*Task{t1}, }, @@ -85,8 +86,8 @@ func TestProcessorSuccess(t *testing.T) { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } - if l := r.LLen(inProgressQ).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", inProgressQ, l) + if l := r.LLen(base.InProgressQueue).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) } } } @@ -116,18 +117,18 @@ func TestProcessorRetry(t *testing.T) { r4.Retried = m4.Retried + 1 tests := []struct { - initQueue []*rdb.TaskMessage // initial default queue state - incoming []*rdb.TaskMessage // tasks to be enqueued during run - wait time.Duration // wait duration between starting and stopping processor for this test case - wantRetry []*rdb.TaskMessage // tasks in retry queue at the end - wantDead []*rdb.TaskMessage // tasks in dead queue at the end + initQueue []*base.TaskMessage // initial default queue state + incoming []*base.TaskMessage // tasks to be enqueued during run + wait time.Duration // wait duration between starting and stopping processor for this test case + wantRetry []*base.TaskMessage // tasks in retry queue at the end + wantDead []*base.TaskMessage // tasks in dead queue at the end }{ { - initQueue: []*rdb.TaskMessage{m1, m2}, - incoming: []*rdb.TaskMessage{m3, m4}, + initQueue: []*base.TaskMessage{m1, m2}, + incoming: []*base.TaskMessage{m3, m4}, wait: time.Second, - wantRetry: []*rdb.TaskMessage{&r2, &r3, &r4}, - wantDead: []*rdb.TaskMessage{&r1}, + wantRetry: []*base.TaskMessage{&r2, &r3, &r4}, + wantDead: []*base.TaskMessage{&r1}, }, } @@ -162,20 +163,20 @@ func TestProcessorRetry(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotRetryRaw := r.ZRange(retryQ, 0, -1).Val() + gotRetryRaw := r.ZRange(base.RetryQueue, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", retryQ, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) } - gotDeadRaw := r.ZRange(deadQ, 0, -1).Val() + gotDeadRaw := r.ZRange(base.DeadQueue, 0, -1).Val() gotDead := mustUnmarshalSlice(t, gotDeadRaw) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", deadQ, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadQueue, diff) } - if l := r.LLen(inProgressQ).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", inProgressQ, l) + if l := r.LLen(base.InProgressQueue).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l) } } }