2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

WIP: Track task state under task key

This commit is contained in:
Ken Hibino 2021-03-23 16:39:15 -07:00
parent 24e3d2e273
commit e9636c7c19
3 changed files with 69 additions and 23 deletions

View File

@ -218,11 +218,11 @@ func main() {
// Options passed at enqueue time override default ones. // Options passed at enqueue time override default ones.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) taskID, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil { if err != nil {
log.Fatal("could not enqueue task: %v", err) log.Fatal("could not enqueue task: %v", err)
} }
fmt.Printf("Enqueued Result: %+v\n", res) log.Printf("Enqueued task: %s", taskID)
} }
``` ```

View File

@ -216,6 +216,11 @@ const (
TaskStateArchived TaskStateArchived
) )
func (i *Inspector) GetTaskInfo(taskID string) (TaskInfo, error) {
// TODO: implement this
return nil, nil
}
// PendingTask is a task in a queue and is ready to be processed. // PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct { type PendingTask struct {
*asynq.Task *asynq.Task

View File

@ -54,8 +54,14 @@ func (r *RDB) Ping() error {
// ARGV[2] -> task ID // ARGV[2] -> task ID
// ARGV[3] -> task timeout in seconds (0 if not timeout) // ARGV[3] -> task timeout in seconds (0 if not timeout)
// ARGV[4] -> task deadline in unix time (0 if no deadline) // ARGV[4] -> task deadline in unix time (0 if no deadline)
// ARGV[5] -> current time in unix time
var enqueueCmd = redis.NewScript(` var enqueueCmd = redis.NewScript(`
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[3], "deadline", ARGV[4]) redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"timeout", ARGV[3],
"deadline", ARGV[4],
"state", "PENDING",
"process_at", ARGV[5])
redis.call("LPUSH", KEYS[2], ARGV[2]) redis.call("LPUSH", KEYS[2], ARGV[2])
return 1 return 1
`) `)
@ -78,6 +84,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
msg.ID.String(), msg.ID.String(),
msg.Timeout, msg.Timeout,
msg.Deadline, msg.Deadline,
time.Now().Unix(),
} }
return enqueueCmd.Run(r.client, keys, argv...).Err() return enqueueCmd.Run(r.client, keys, argv...).Err()
} }
@ -90,12 +97,18 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
// ARGV[3] -> task message data // ARGV[3] -> task message data
// ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline) // ARGV[5] -> task deadline in unix time (0 if no deadline)
// ARGV[6] -> current time in unix time
var enqueueUniqueCmd = redis.NewScript(` var enqueueUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then if not ok then
return 0 return 0
end end
redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5]) redis.call("HSET", KEYS[2],
"msg", ARGV[3],
"timeout", ARGV[4],
"deadline", ARGV[5],
"state", "PENDING",
"process_at", ARGV[6])
redis.call("LPUSH", KEYS[3], ARGV[1]) redis.call("LPUSH", KEYS[3], ARGV[1])
return 1 return 1
`) `)
@ -121,6 +134,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
encoded, encoded,
msg.Timeout, msg.Timeout,
msg.Deadline, msg.Deadline,
time.Now().UTC().Unix(),
} }
res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result() res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result()
if err != nil { if err != nil {
@ -181,6 +195,7 @@ if redis.call("EXISTS", KEYS[2]) == 0 then
else else
return redis.error_reply("asynq internal error: both timeout and deadline are not set") return redis.error_reply("asynq internal error: both timeout and deadline are not set")
end end
redis.call("HSET", key, "state", "ACTIVE")
redis.call("ZADD", KEYS[4], score, id) redis.call("ZADD", KEYS[4], score, id)
return {msg, score} return {msg, score}
end end
@ -325,7 +340,12 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
// ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline) // ARGV[5] -> task deadline in unix time (0 if no deadline)
var scheduleCmd = redis.NewScript(` var scheduleCmd = redis.NewScript(`
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[4], "deadline", ARGV[5]) redis.call("HSET", KEYS[1],
"msg", ARGV[1],
"timeout", ARGV[4],
"deadline", ARGV[5],
"state", "SCHEDULED",
"process_at", ARGV[2])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
return 1 return 1
`) `)
@ -367,7 +387,12 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then if not ok then
return 0 return 0
end end
redis.call("HSET", KEYS[2], "msg", ARGV[4], "timeout", ARGV[5], "deadline", ARGV[6]) redis.call("HSET", KEYS[2],
"msg", ARGV[4],
"timeout", ARGV[5],
"deadline", ARGV[6],
"state", "SCHEDULED",
"process_at", ARGV[3])
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1])
return 1 return 1
`) `)
@ -427,7 +452,10 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND") return redis.error_reply("NOT FOUND")
end end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("HSET", KEYS[1], "msg", ARGV[2]) redis.call("HSET", KEYS[1],
"msg", ARGV[2],
"state", "RETRY",
"process_at", ARGV[3])
local n = redis.call("INCR", KEYS[5]) local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[4]) redis.call("EXPIREAT", KEYS[5], ARGV[4])
@ -494,7 +522,11 @@ end
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5])
redis.call("HSET", KEYS[1], "msg", ARGV[2]) redis.call("HSET", KEYS[1],
"msg", ARGV[2],
"state", "ARCHIVED",
"process_at", -1,
"last_failed_at", ARGV[3])
local n = redis.call("INCR", KEYS[5]) local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[6]) redis.call("EXPIREAT", KEYS[5], ARGV[6])
@ -540,10 +572,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
// and move any tasks that are ready to be processed to the pending set. // and move any tasks that are ready to be processed to the pending set.
func (r *RDB) ForwardIfReady(qnames ...string) error { func (r *RDB) ForwardIfReady(qnames ...string) error {
for _, qname := range qnames { for _, qname := range qnames {
if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil { if err := r.forwardAll(qname); err != nil {
return err
}
if err := r.forwardAll(base.RetryKey(qname), base.PendingKey(qname)); err != nil {
return err return err
} }
} }
@ -553,10 +582,13 @@ func (r *RDB) ForwardIfReady(qnames ...string) error {
// KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry}) // KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry})
// KEYS[2] -> destination queue (e.g. asynq:{<qname>}) // KEYS[2] -> destination queue (e.g. asynq:{<qname>})
// ARGV[1] -> current unix time // ARGV[1] -> current unix time
// ARGV[2] -> task key prefix
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short. // Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(` var forwardCmd = redis.NewScript(`
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, id in ipairs(ids) do for _, id in ipairs(ids) do
local key = ARGV[2] .. id
redis.call("HSET", key, "state", "PENDING")
redis.call("LPUSH", KEYS[2], id) redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], id) redis.call("ZREM", KEYS[1], id)
end end
@ -564,25 +596,34 @@ return table.getn(ids)`)
// forward moves tasks with a score less than the current unix time // forward moves tasks with a score less than the current unix time
// from the src zset to the dst list. It returns the number of tasks moved. // from the src zset to the dst list. It returns the number of tasks moved.
func (r *RDB) forward(src, dst string) (int, error) { func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
now := float64(time.Now().Unix()) keys := []string{src, dst}
res, err := forwardCmd.Run(r.client, []string{src, dst}, now).Result() argv := []interface{}{
time.Now().Unix(),
taskKeyPrefix,
}
res, err := forwardCmd.Run(r.client, keys, argv...).Result()
if err != nil { if err != nil {
return 0, err return 0, err
} }
return cast.ToInt(res), nil return cast.ToInt(res), nil
} }
// forwardAll moves tasks with a score less than the current unix time from the src zset, // forwardAll moves tasks in scheduled and retry state to pending if the task is ready to processed.
// until there's no more tasks. func (r *RDB) forwardAll(qname string) (err error) {
func (r *RDB) forwardAll(src, dst string) (err error) { srcs := []string{base.ScheduledKey(qname), base.RetryKey(qname)}
dst := base.PendingKey(qname)
taskKeyPrefix := base.TaskKeyPrefix(qname)
for _, src := range srcs {
n := 1 n := 1
for n != 0 { for n != 0 {
n, err = r.forward(src, dst) n, err = r.forward(src, dst, taskKeyPrefix)
if err != nil { if err != nil {
return err return err
} }
} }
}
return nil return nil
} }