2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 23:06:12 +08:00

Update Enqueue and Schedule commands in rdb

This commit is contained in:
Ken Hibino
2020-08-07 06:31:02 -07:00
parent 87124993f8
commit faeeb2c820
3 changed files with 85 additions and 97 deletions

View File

@@ -50,27 +50,22 @@ func (r *RDB) Ping() error {
return r.client.Ping().Err()
}
// KEYS[1] -> asynq:queues:<qname>
// KEYS[2] -> asynq:queues
// ARGV[1] -> task message data
var enqueueCmd = redis.NewScript(`
redis.call("LPUSH", KEYS[1], ARGV[1])
redis.call("SADD", KEYS[2], KEYS[1])
return 1`)
// Enqueue inserts the given task to the tail of the queue.
func (r *RDB) Enqueue(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
err := r.client.SAdd(base.AllQueues, msg.Queue).Err()
if err != nil {
return err
}
key := base.QueueKey(msg.Queue)
return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, encoded).Err()
return r.client.LPush(key, encoded).Err()
}
// KEYS[1] -> unique key in the form <type>:<payload>:<qname>
// KEYS[2] -> asynq:queues:<qname>
// KEYS[2] -> asynq:queues
// KEYS[1] -> unique key
// KEYS[2] -> asynq:{<qname>}
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data
@@ -80,7 +75,6 @@ if not ok then
return 0
end
redis.call("LPUSH", KEYS[2], ARGV[3])
redis.call("SADD", KEYS[3], KEYS[2])
return 1
`)
@@ -91,9 +85,12 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
if err != nil {
return err
}
key := base.QueueKey(msg.Queue)
err := r.client.SAdd(base.AllQueues, msg.Queue).Err()
if err != nil {
return err
}
res, err := enqueueUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, key, base.AllQueues},
[]string{msg.UniqueKey, base.QueueKey(msg.Queue)},
msg.ID.String(), int(ttl.Seconds()), encoded).Result()
if err != nil {
return err
@@ -194,7 +191,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:deadlines
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key in the format <type>:<payload>:<qname>
// KEYS[4] -> unique key
// ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
@@ -257,45 +254,32 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
encoded).Err()
}
// KEYS[1] -> asynq:scheduled
// KEYS[2] -> asynq:queues
// ARGV[1] -> score (process_at timestamp)
// ARGV[2] -> task message
// ARGV[3] -> queue key
var scheduleCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
redis.call("SADD", KEYS[2], ARGV[3])
return 1
`)
// Schedule adds the task to the backlog queue to be processed in the future.
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
qkey := base.QueueKey(msg.Queue)
err := r.client.SAdd(base.AllQueues, msg.Queue).Err()
if err != nil {
return err
}
score := float64(processAt.Unix())
return scheduleCmd.Run(r.client,
[]string{base.ScheduledQueue, base.AllQueues},
score, encoded, qkey).Err()
return r.client.ZAdd(base.ScheduledKey(msg.Queue), &redis.Z{Score: score, Member: encoded}).Err()
}
// KEYS[1] -> unique key in the format <type>:<payload>:<qname>
// KEYS[2] -> asynq:scheduled
// KEYS[3] -> asynq:queues
// KEYS[1] -> unique key
// KEYS[2] -> asynq:{<qname>}:scheduled
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> score (process_at timestamp)
// ARGV[4] -> task message
// ARGV[5] -> queue key
var scheduleUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
redis.call("SADD", KEYS[3], ARGV[5])
return 1
`)
@@ -306,11 +290,14 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
if err != nil {
return err
}
qkey := base.QueueKey(msg.Queue)
err := r.client.SAdd(base.AllQueues, msg.Queue).Err()
if err != nil {
return err
}
score := float64(processAt.Unix())
res, err := scheduleUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, base.ScheduledQueue, base.AllQueues},
msg.ID.String(), int(ttl.Seconds()), score, encoded, qkey).Result()
[]string{msg.UniqueKey, base.ScheduledKey(msg.Queue)},
msg.ID.String(), int(ttl.Seconds()), score, encoded).Result()
if err != nil {
return err
}