2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 11:31:18 +08:00

Add TaskState type to base package

This commit is contained in:
Ken Hibino 2021-05-11 20:43:01 -07:00
parent 1812d05d21
commit 0bf767cf21
2 changed files with 55 additions and 28 deletions

View File

@ -182,42 +182,42 @@ func FlushDB(tb testing.TB, r redis.UniversalClient) {
func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.PendingKey(qname), msgs, "pending") seedRedisList(tb, r, base.PendingKey(qname), msgs, base.TaskStatePending)
} }
// SeedActiveQueue initializes the active queue with the given messages. // SeedActiveQueue initializes the active queue with the given messages.
func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.ActiveKey(qname), msgs, "active") seedRedisList(tb, r, base.ActiveKey(qname), msgs, base.TaskStateActive)
} }
// SeedScheduledQueue initializes the scheduled queue with the given messages. // SeedScheduledQueue initializes the scheduled queue with the given messages.
func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { func SeedScheduledQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.ScheduledKey(qname), entries, "scheduled") seedRedisZSet(tb, r, base.ScheduledKey(qname), entries, base.TaskStateScheduled)
} }
// SeedRetryQueue initializes the retry queue with the given messages. // SeedRetryQueue initializes the retry queue with the given messages.
func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.RetryKey(qname), entries, "retry") seedRedisZSet(tb, r, base.RetryKey(qname), entries, base.TaskStateRetry)
} }
// SeedArchivedQueue initializes the archived queue with the given messages. // SeedArchivedQueue initializes the archived queue with the given messages.
func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.ArchivedKey(qname), entries, "archived") seedRedisZSet(tb, r, base.ArchivedKey(qname), entries, base.TaskStateArchived)
} }
// SeedDeadlines initializes the deadlines set with the given entries. // SeedDeadlines initializes the deadlines set with the given entries.
func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { func SeedDeadlines(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
tb.Helper() tb.Helper()
r.SAdd(base.AllQueues, qname) r.SAdd(base.AllQueues, qname)
seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, "active") seedRedisZSet(tb, r, base.DeadlinesKey(qname), entries, base.TaskStateActive)
} }
// SeedAllPendingQueues initializes all of the specified queues with the given messages. // SeedAllPendingQueues initializes all of the specified queues with the given messages.
@ -271,7 +271,7 @@ func SeedAllDeadlines(tb testing.TB, r redis.UniversalClient, deadlines map[stri
} }
func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
msgs []*base.TaskMessage, state string) { msgs []*base.TaskMessage, state base.TaskState) {
tb.Helper() tb.Helper()
for _, msg := range msgs { for _, msg := range msgs {
encoded := MustMarshal(tb, msg) encoded := MustMarshal(tb, msg)
@ -281,7 +281,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
key := base.TaskKey(msg.Queue, msg.ID.String()) key := base.TaskKey(msg.Queue, msg.ID.String())
data := map[string]interface{}{ data := map[string]interface{}{
"msg": encoded, "msg": encoded,
"state": state, "state": state.String(),
"timeout": msg.Timeout, "timeout": msg.Timeout,
"deadline": msg.Deadline, "deadline": msg.Deadline,
} }
@ -292,7 +292,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
} }
func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string,
items []base.Z, state string) { items []base.Z, state base.TaskState) {
tb.Helper() tb.Helper()
for _, item := range items { for _, item := range items {
msg := item.Message msg := item.Message
@ -304,7 +304,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string,
key := base.TaskKey(msg.Queue, msg.ID.String()) key := base.TaskKey(msg.Queue, msg.ID.String())
data := map[string]interface{}{ data := map[string]interface{}{
"msg": encoded, "msg": encoded,
"state": state, "state": state.String(),
"timeout": msg.Timeout, "timeout": msg.Timeout,
"deadline": msg.Deadline, "deadline": msg.Deadline,
} }
@ -318,68 +318,68 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string,
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getMessagesFromList(tb, r, qname, base.PendingKey, "pending") return getMessagesFromList(tb, r, qname, base.PendingKey, base.TaskStatePending)
} }
// GetActiveMessages returns all active messages in the given queue. // GetActiveMessages returns all active messages in the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getMessagesFromList(tb, r, qname, base.ActiveKey, "active") return getMessagesFromList(tb, r, qname, base.ActiveKey, base.TaskStateActive)
} }
// GetScheduledMessages returns all scheduled task messages in the given queue. // GetScheduledMessages returns all scheduled task messages in the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getMessagesFromZSet(tb, r, qname, base.ScheduledKey, "scheduled") return getMessagesFromZSet(tb, r, qname, base.ScheduledKey, base.TaskStateScheduled)
} }
// GetRetryMessages returns all retry messages in the given queue. // GetRetryMessages returns all retry messages in the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getMessagesFromZSet(tb, r, qname, base.RetryKey, "retry") return getMessagesFromZSet(tb, r, qname, base.RetryKey, base.TaskStateRetry)
} }
// GetArchivedMessages returns all archived messages in the given queue. // GetArchivedMessages returns all archived messages in the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getMessagesFromZSet(tb, r, qname, base.ArchivedKey, "archived") return getMessagesFromZSet(tb, r, qname, base.ArchivedKey, base.TaskStateArchived)
} }
// GetScheduledEntries returns all scheduled messages and its score in the given queue. // GetScheduledEntries returns all scheduled messages and its score in the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getMessagesFromZSetWithScores(tb, r, qname, base.ScheduledKey, "scheduled") return getMessagesFromZSetWithScores(tb, r, qname, base.ScheduledKey, base.TaskStateScheduled)
} }
// GetRetryEntries returns all retry messages and its score in the given queue. // GetRetryEntries returns all retry messages and its score in the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getMessagesFromZSetWithScores(tb, r, qname, base.RetryKey, "retry") return getMessagesFromZSetWithScores(tb, r, qname, base.RetryKey, base.TaskStateRetry)
} }
// GetArchivedEntries returns all archived messages and its score in the given queue. // GetArchivedEntries returns all archived messages and its score in the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getMessagesFromZSetWithScores(tb, r, qname, base.ArchivedKey, "archived") return getMessagesFromZSetWithScores(tb, r, qname, base.ArchivedKey, base.TaskStateArchived)
} }
// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. // GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue.
// It also asserts the state field of the task. // It also asserts the state field of the task.
func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, "active") return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive)
} }
// Retrieves all messages stored under `keyFn(qname)` key in redis list. // Retrieves all messages stored under `keyFn(qname)` key in redis list.
func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string, func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string,
keyFn func(qname string) string, state string) []*base.TaskMessage { keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage {
tb.Helper() tb.Helper()
ids := r.LRange(keyFn(qname), 0, -1).Val() ids := r.LRange(keyFn(qname), 0, -1).Val()
var msgs []*base.TaskMessage var msgs []*base.TaskMessage
@ -387,8 +387,8 @@ func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string,
taskKey := base.TaskKey(qname, id) taskKey := base.TaskKey(qname, id)
data := r.HGet(taskKey, "msg").Val() data := r.HGet(taskKey, "msg").Val()
msgs = append(msgs, MustUnmarshal(tb, data)) msgs = append(msgs, MustUnmarshal(tb, data))
if gotState := r.HGet(taskKey, "state").Val(); gotState != state { if gotState := r.HGet(taskKey, "state").Val(); gotState != state.String() {
tb.Errorf("task (id=%q) is in %q state, want %q", id, gotState, state) tb.Errorf("task (id=%q) is in %q state, want %v", id, gotState, state)
} }
} }
return msgs return msgs
@ -396,7 +396,7 @@ func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string,
// Retrieves all messages stored under `keyFn(qname)` key in redis zset (sorted-set). // Retrieves all messages stored under `keyFn(qname)` key in redis zset (sorted-set).
func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string, func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string,
keyFn func(qname string) string, state string) []*base.TaskMessage { keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage {
tb.Helper() tb.Helper()
ids := r.ZRange(keyFn(qname), 0, -1).Val() ids := r.ZRange(keyFn(qname), 0, -1).Val()
var msgs []*base.TaskMessage var msgs []*base.TaskMessage
@ -404,8 +404,8 @@ func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string,
taskKey := base.TaskKey(qname, id) taskKey := base.TaskKey(qname, id)
msg := r.HGet(taskKey, "msg").Val() msg := r.HGet(taskKey, "msg").Val()
msgs = append(msgs, MustUnmarshal(tb, msg)) msgs = append(msgs, MustUnmarshal(tb, msg))
if gotState := r.HGet(taskKey, "state").Val(); gotState != state { if gotState := r.HGet(taskKey, "state").Val(); gotState != state.String() {
tb.Errorf("task (id=%q) is in %q state, want %q", id, gotState, state) tb.Errorf("task (id=%q) is in %q state, want %v", id, gotState, state)
} }
} }
return msgs return msgs
@ -413,7 +413,7 @@ func getMessagesFromZSet(tb testing.TB, r redis.UniversalClient, qname string,
// Retrieves all messages along with their scores stored under `keyFn(qname)` key in redis zset (sorted-set). // Retrieves all messages along with their scores stored under `keyFn(qname)` key in redis zset (sorted-set).
func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient, func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient,
qname string, keyFn func(qname string) string, state string) []base.Z { qname string, keyFn func(qname string) string, state base.TaskState) []base.Z {
tb.Helper() tb.Helper()
zs := r.ZRangeWithScores(keyFn(qname), 0, -1).Val() zs := r.ZRangeWithScores(keyFn(qname), 0, -1).Val()
var res []base.Z var res []base.Z
@ -422,8 +422,8 @@ func getMessagesFromZSetWithScores(tb testing.TB, r redis.UniversalClient,
taskKey := base.TaskKey(qname, taskID) taskKey := base.TaskKey(qname, taskID)
msg := r.HGet(taskKey, "msg").Val() msg := r.HGet(taskKey, "msg").Val()
res = append(res, base.Z{Message: MustUnmarshal(tb, msg), Score: int64(z.Score)}) res = append(res, base.Z{Message: MustUnmarshal(tb, msg), Score: int64(z.Score)})
if gotState := r.HGet(taskKey, "state").Val(); gotState != state { if gotState := r.HGet(taskKey, "state").Val(); gotState != state.String() {
tb.Errorf("task (id=%q) is in %q state, want %q", taskID, gotState, state) tb.Errorf("task (id=%q) is in %q state, want %v", taskID, gotState, state)
} }
} }
return res return res

View File

@ -36,6 +36,33 @@ const (
CancelChannel = "asynq:cancel" // PubSub channel CancelChannel = "asynq:cancel" // PubSub channel
) )
// TaskState denotes the state of a task.
type TaskState int
const (
TaskStateActive TaskState = iota + 1
TaskStatePending
TaskStateScheduled
TaskStateRetry
TaskStateArchived
)
func (s TaskState) String() string {
switch s {
case TaskStateActive:
return "active"
case TaskStatePending:
return "pending"
case TaskStateScheduled:
return "scheduled"
case TaskStateRetry:
return "retry"
case TaskStateArchived:
return "archived"
}
panic(fmt.Sprintf("internal error: unknown task state %d", s))
}
// ValidateQueueName validates a given qname to be used as a queue name. // ValidateQueueName validates a given qname to be used as a queue name.
// Returns nil if valid, otherwise returns non-nil error. // Returns nil if valid, otherwise returns non-nil error.
func ValidateQueueName(qname string) error { func ValidateQueueName(qname string) error {