diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index a3d1452..50e73ef 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -182,42 +182,42 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) { func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisList(tb, r, base.PendingKey(qname), msgs, "pending") + seedRedisList(tb, r, base.PendingKey(qname), msgs, base.TaskStatePending) } // SeedActiveQueue initializes the active queue with the given messages. func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisList(tb, r, base.ActiveKey(qname), msgs, "active") + seedRedisList(tb, r, base.ActiveKey(qname), msgs, base.TaskStateActive) } // SeedScheduledQueue initializes the scheduled queue with the given messages. func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.ScheduledKey(qname), entries, "scheduled") + seedRedisZSet(tb, r, base.ScheduledKey(qname), entries, base.TaskStateScheduled) } // SeedRetryQueue initializes the retry queue with the given messages. func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.RetryKey(qname), entries, "retry") + seedRedisZSet(tb, r, base.RetryKey(qname), entries, base.TaskStateRetry) } // SeedArchivedQueue initializes the archived queue with the given messages. func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.ArchivedKey(qname), entries, "archived") + seedRedisZSet(tb, r, base.ArchivedKey(qname), entries, base.TaskStateArchived) } // SeedDeadlines initializes the deadlines set with the given entries. func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, "active") + seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive) } // SeedAllPendingQueues initializes all of the specified queues with the given messages. @@ -271,7 +271,7 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri } func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, - msgs []*base.TaskMessage, state string) { + msgs []*base.TaskMessage, state base.TaskState) { tb.Helper() for _, msg := range msgs { encoded := MustMarshal(tb, msg) @@ -281,7 +281,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, key := base.TaskKey(msg.Queue, msg.ID.String()) data := map[string]interface{}{ "msg": encoded, - "state": state, + "state": state.String(), "timeout": msg.Timeout, "deadline": msg.Deadline, } @@ -292,7 +292,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, } func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, - items []base.Z, state string) { + items []base.Z, state base.TaskState) { tb.Helper() for _, item := range items { msg := item.Message @@ -304,7 +304,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, key := base.TaskKey(msg.Queue, msg.ID.String()) data := map[string]interface{}{ "msg": encoded, - "state": state, + "state": state.String(), "timeout": msg.Timeout, "deadline": msg.Deadline, } @@ -318,68 +318,68 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, // It also asserts the state field of the task. func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getMessagesFromList(tb, r, qname, base.PendingKey, "pending") + return getMessagesFromList(tb, r, qname, base.PendingKey, base.TaskStatePending) } // GetActiveMessages returns all active messages in the given queue. // It also asserts the state field of the task. func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getMessagesFromList(tb, r, qname, base.ActiveKey, "active") + return getMessagesFromList(tb, r, qname, base.ActiveKey, base.TaskStateActive) } // GetScheduledMessages returns all scheduled task messages in the given queue. // It also asserts the state field of the task. func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getMessagesFromZSet(tb, r, qname, base.ScheduledKey, "scheduled") + return getMessagesFromZSet(tb, r, qname, base.ScheduledKey, base.TaskStateScheduled) } // GetRetryMessages returns all retry messages in the given queue. // It also asserts the state field of the task. func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getMessagesFromZSet(tb, r, qname, base.RetryKey, "retry") + return getMessagesFromZSet(tb, r, qname, base.RetryKey, base.TaskStateRetry) } // GetArchivedMessages returns all archived messages in the given queue. // It also asserts the state field of the task. func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getMessagesFromZSet(tb, r, qname, base.ArchivedKey, "archived") + return getMessagesFromZSet(tb, r, qname, base.ArchivedKey, base.TaskStateArchived) } // GetScheduledEntries returns all scheduled messages and its score in the given queue. // It also asserts the state field of the task. func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getMessagesFromZSetWithScores(tb, r, qname, base.ScheduledKey, "scheduled") + return getMessagesFromZSetWithScores(tb, r, qname, base.ScheduledKey, base.TaskStateScheduled) } // GetRetryEntries returns all retry messages and its score in the given queue. // It also asserts the state field of the task. func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getMessagesFromZSetWithScores(tb, r, qname, base.RetryKey, "retry") + return getMessagesFromZSetWithScores(tb, r, qname, base.RetryKey, base.TaskStateRetry) } // GetArchivedEntries returns all archived messages and its score in the given queue. // It also asserts the state field of the task. func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getMessagesFromZSetWithScores(tb, r, qname, base.ArchivedKey, "archived") + return getMessagesFromZSetWithScores(tb, r, qname, base.ArchivedKey, base.TaskStateArchived) } // GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. // It also asserts the state field of the task. func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, "active") + return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive) } // Retrieves all messages stored under `keyFn(qname)` key in redis list. func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string, - keyFn func(qname string) string, state string) []*base.TaskMessage { + keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage { tb.Helper() ids := r.LRange(keyFn(qname), 0, -1).Val() var msgs []*base.TaskMessage @@ -387,8 +387,8 @@ func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string, taskKey := base.TaskKey(qname, id) data := r.HGet(taskKey, "msg").Val() msgs = append(msgs, MustUnmarshal(tb, data)) - if gotState := r.HGet(taskKey, "state").Val(); gotState != state { - tb.Errorf("task (id=%q) is in %q state, want %q", id, gotState, state) + if gotState := r.HGet(taskKey, "state").Val(); gotState != state.String() { + tb.Errorf("task (id=%q) is in %q state, want %v", id, gotState, state) } } return msgs @@ -396,7 +396,7 @@ func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string, // Retrieves all messages stored under `keyFn(qname)` key in redis zset (sorted-set). func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string, - keyFn func(qname string) string, state string) []*base.TaskMessage { + keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage { tb.Helper() ids := r.ZRange(keyFn(qname), 0, -1).Val() var msgs []*base.TaskMessage @@ -404,8 +404,8 @@ func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string, taskKey := base.TaskKey(qname, id) msg := r.HGet(taskKey, "msg").Val() msgs = append(msgs, MustUnmarshal(tb, msg)) - if gotState := r.HGet(taskKey, "state").Val(); gotState != state { - tb.Errorf("task (id=%q) is in %q state, want %q", id, gotState, state) + if gotState := r.HGet(taskKey, "state").Val(); gotState != state.String() { + tb.Errorf("task (id=%q) is in %q state, want %v", id, gotState, state) } } return msgs @@ -413,7 +413,7 @@ func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string, // Retrieves all messages along with their scores stored under `keyFn(qname)` key in redis zset (sorted-set). func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient, - qname string, keyFn func(qname string) string, state string) []base.Z { + qname string, keyFn func(qname string) string, state base.TaskState) []base.Z { tb.Helper() zs := r.ZRangeWithScores(keyFn(qname), 0, -1).Val() var res []base.Z @@ -422,8 +422,8 @@ func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient, taskKey := base.TaskKey(qname, taskID) msg := r.HGet(taskKey, "msg").Val() res = append(res, base.Z{Message: MustUnmarshal(tb, msg), Score: int64(z.Score)}) - if gotState := r.HGet(taskKey, "state").Val(); gotState != state { - tb.Errorf("task (id=%q) is in %q state, want %q", taskID, gotState, state) + if gotState := r.HGet(taskKey, "state").Val(); gotState != state.String() { + tb.Errorf("task (id=%q) is in %q state, want %v", taskID, gotState, state) } } return res diff --git a/internal/base/base.go b/internal/base/base.go index db9c145..bb31b7b 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -36,6 +36,33 @@ const ( CancelChannel = "asynq:cancel" // PubSub channel ) +// TaskState denotes the state of a task. +type TaskState int + +const ( + TaskStateActive TaskState = iota + 1 + TaskStatePending + TaskStateScheduled + TaskStateRetry + TaskStateArchived +) + +func (s TaskState) String() string { + switch s { + case TaskStateActive: + return "active" + case TaskStatePending: + return "pending" + case TaskStateScheduled: + return "scheduled" + case TaskStateRetry: + return "retry" + case TaskStateArchived: + return "archived" + } + panic(fmt.Sprintf("internal error: unknown task state %d", s)) +} + // ValidateQueueName validates a given qname to be used as a queue name. // Returns nil if valid, otherwise returns non-nil error. func ValidateQueueName(qname string) error {