2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-08-19 15:08:55 +08:00

Update Client to add task to group if Group option is specified

This commit is contained in:
Ken Hibino
2022-03-05 06:41:44 -08:00
parent 8b582899ad
commit f17c157b0f
5 changed files with 196 additions and 12 deletions

View File

@@ -434,6 +434,14 @@ func GetCompletedEntries(tb testing.TB, r redis.UniversalClient, qname string) [
return getMessagesFromZSetWithScores(tb, r, qname, base.CompletedKey, base.TaskStateCompleted)
}
// GetGroupEntries returns all scheduled messages and its score in the given queue.
// It also asserts the state field of the task.
func GetGroupEntries(tb testing.TB, r redis.UniversalClient, qname, groupKey string) []base.Z {
tb.Helper()
return getMessagesFromZSetWithScores(tb, r, qname,
func(qname string) string { return base.GroupKey(qname, groupKey) }, base.TaskStateAggregating)
}
// Retrieves all messages stored under `keyFn(qname)` key in redis list.
func getMessagesFromList(tb testing.TB, r redis.UniversalClient, qname string,
keyFn func(qname string) string, state base.TaskState) []*base.TaskMessage {

View File

@@ -50,6 +50,7 @@ const (
TaskStateRetry
TaskStateArchived
TaskStateCompleted
TaskStateAggregating // describes a state where task is waiting in a group to be aggregated
)
func (s TaskState) String() string {
@@ -66,6 +67,8 @@ func (s TaskState) String() string {
return "archived"
case TaskStateCompleted:
return "completed"
case TaskStateAggregating:
return "aggregating"
}
panic(fmt.Sprintf("internal error: unknown task state %d", s))
}
@@ -84,6 +87,8 @@ func TaskStateFromString(s string) (TaskState, error) {
return TaskStateArchived, nil
case "completed":
return TaskStateCompleted, nil
case "aggregating":
return TaskStateAggregating, nil
}
return 0, errors.E(errors.FailedPrecondition, fmt.Sprintf("%q is not supported task state", s))
}