diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 35d5ad3..8e4cdb2 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -314,7 +314,8 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { } // KEYS[1] -> unique key -// KEYS[2] -> asynq:{}:scheduled +// KEYS[2] -> asynq:{}:t: +// KEYS[3] -> asynq:{}:scheduled // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> score (process_at timestamp) @@ -324,7 +325,8 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end -redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4]) +redis.call("SET", KEYS[2], ARGV[4]) +redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) return 1 `) @@ -338,10 +340,9 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - score := float64(processAt.Unix()) - res, err := scheduleUniqueCmd.Run(r.client, - []string{msg.UniqueKey, base.ScheduledKey(msg.Queue)}, - msg.ID.String(), int(ttl.Seconds()), score, encoded).Result() + keys := []string{msg.UniqueKey, base.TaskKey(msg.Queue, msg.ID.String()), base.ScheduledKey(msg.Queue)} + argv := []interface{}{msg.ID.String(), int(ttl.Seconds()), processAt.Unix(), encoded} + res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { return err }