diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 5c3e3d1..935bc1f 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -299,13 +299,25 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b // GetPendingMessages returns all pending messages in the given queue. func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getListMessages(tb, r, base.PendingKey(qname)) + ids := r.LRange(base.PendingKey(qname), 0, -1).Val() + var msgs []*base.TaskMessage + for _, id := range ids { + data := r.Get(base.TaskKey(qname, id)).Val() + msgs = append(msgs, MustUnmarshal(tb, data)) + } + return msgs } // GetActiveMessages returns all active messages in the given queue. func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getListMessages(tb, r, base.ActiveKey(qname)) + ids := r.LRange(base.ActiveKey(qname), 0, -1).Val() + var msgs []*base.TaskMessage + for _, id := range ids { + data := r.Get(base.TaskKey(qname, id)).Val() + msgs = append(msgs, MustUnmarshal(tb, data)) + } + return msgs } // GetScheduledMessages returns all scheduled task messages in the given queue. diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 650fa13..28cc574 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -50,7 +50,17 @@ func (r *RDB) Ping() error { return r.client.Ping().Err() } -// Enqueue inserts the given task to the tail of the queue. +// KEYS[1] -> asynq:{qname}:t: +// KEYS[2] -> asynq:{qname}:pending +// ARGV[1] -> task message data +// ARGV[2] -> task ID +var enqueueCmd = redis.NewScript(` +redis.call("SET", KEYS[1], ARGV[1]) +redis.call("LPUSH", KEYS[2], ARGV[2]) +return 1 +`) + +// Enqueue adds the given task to the pending list of the queue. func (r *RDB) Enqueue(msg *base.TaskMessage) error { encoded, err := base.EncodeMessage(msg) if err != nil { @@ -59,12 +69,13 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - key := base.PendingKey(msg.Queue) - return r.client.LPush(key, encoded).Err() + keys := []string{base.TaskKey(msg.Queue, msg.ID.String()), base.PendingKey(msg.Queue)} + args := []interface{}{encoded, msg.ID.String()} + return enqueueCmd.Run(r.client, keys, args...).Err() } // KEYS[1] -> unique key -// KEYS[2] -> asynq:{} +// KEYS[2] -> asynq:{}:pending // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> task message data