2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.Enqueue

This commit is contained in:
Ken Hibino 2021-02-21 06:15:43 -08:00
parent 16d8fa4b91
commit 2bef3319c7
2 changed files with 29 additions and 6 deletions

View File

@ -299,13 +299,25 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, items []b
// GetPendingMessages returns all pending messages in the given queue. // GetPendingMessages returns all pending messages in the given queue.
func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getListMessages(tb, r, base.PendingKey(qname)) ids := r.LRange(base.PendingKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
data := r.Get(base.TaskKey(qname, id)).Val()
msgs = append(msgs, MustUnmarshal(tb, data))
}
return msgs
} }
// GetActiveMessages returns all active messages in the given queue. // GetActiveMessages returns all active messages in the given queue.
func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getListMessages(tb, r, base.ActiveKey(qname)) ids := r.LRange(base.ActiveKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
data := r.Get(base.TaskKey(qname, id)).Val()
msgs = append(msgs, MustUnmarshal(tb, data))
}
return msgs
} }
// GetScheduledMessages returns all scheduled task messages in the given queue. // GetScheduledMessages returns all scheduled task messages in the given queue.

View File

@ -50,7 +50,17 @@ func (r *RDB) Ping() error {
return r.client.Ping().Err() return r.client.Ping().Err()
} }
// Enqueue inserts the given task to the tail of the queue. // KEYS[1] -> asynq:{qname}:t:<task_id>
// KEYS[2] -> asynq:{qname}:pending
// ARGV[1] -> task message data
// ARGV[2] -> task ID
var enqueueCmd = redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
// Enqueue adds the given task to the pending list of the queue.
func (r *RDB) Enqueue(msg *base.TaskMessage) error { func (r *RDB) Enqueue(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg) encoded, err := base.EncodeMessage(msg)
if err != nil { if err != nil {
@ -59,12 +69,13 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err return err
} }
key := base.PendingKey(msg.Queue) keys := []string{base.TaskKey(msg.Queue, msg.ID.String()), base.PendingKey(msg.Queue)}
return r.client.LPush(key, encoded).Err() args := []interface{}{encoded, msg.ID.String()}
return enqueueCmd.Run(r.client, keys, args...).Err()
} }
// KEYS[1] -> unique key // KEYS[1] -> unique key
// KEYS[2] -> asynq:{<qname>} // KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID // ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL // ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data // ARGV[3] -> task message data