diff --git a/README.md b/README.md index 805d1e8..c31f4c4 100644 --- a/README.md +++ b/README.md @@ -218,11 +218,11 @@ func main() { // Options passed at enqueue time override default ones. // --------------------------------------------------------------------------- - res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) + taskID, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) if err != nil { log.Fatal("could not enqueue task: %v", err) } - fmt.Printf("Enqueued Result: %+v\n", res) + log.Printf("Enqueued task: %s", taskID) } ``` diff --git a/inspeq/inspector.go b/inspeq/inspector.go index d1ad262..d637d22 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -216,6 +216,11 @@ const ( TaskStateArchived ) +func (i *Inspector) GetTaskInfo(taskID string) (TaskInfo, error) { + // TODO: implement this + return nil, nil +} + // PendingTask is a task in a queue and is ready to be processed. type PendingTask struct { *asynq.Task diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 0e89673..f054145 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -54,8 +54,14 @@ func (r *RDB) Ping() error { // ARGV[2] -> task ID // ARGV[3] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> task deadline in unix time (0 if no deadline) +// ARGV[5] -> current time in unix time var enqueueCmd = redis.NewScript(` -redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[3], "deadline", ARGV[4]) +redis.call("HSET", KEYS[1], + "msg", ARGV[1], + "timeout", ARGV[3], + "deadline", ARGV[4], + "state", "PENDING", + "process_at", ARGV[5]) redis.call("LPUSH", KEYS[2], ARGV[2]) return 1 `) @@ -78,6 +84,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { msg.ID.String(), msg.Timeout, msg.Deadline, + time.Now().Unix(), } return enqueueCmd.Run(r.client, keys, argv...).Err() } @@ -90,12 +97,18 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { // ARGV[3] -> task message data // ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[5] -> task deadline in unix time (0 if no deadline) +// ARGV[6] -> current time in unix time var enqueueUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end -redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5]) +redis.call("HSET", KEYS[2], + "msg", ARGV[3], + "timeout", ARGV[4], + "deadline", ARGV[5], + "state", "PENDING", + "process_at", ARGV[6]) redis.call("LPUSH", KEYS[3], ARGV[1]) return 1 `) @@ -121,6 +134,7 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { encoded, msg.Timeout, msg.Deadline, + time.Now().UTC().Unix(), } res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { @@ -181,6 +195,7 @@ if redis.call("EXISTS", KEYS[2]) == 0 then else return redis.error_reply("asynq internal error: both timeout and deadline are not set") end + redis.call("HSET", key, "state", "ACTIVE") redis.call("ZADD", KEYS[4], score, id) return {msg, score} end @@ -325,7 +340,12 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { // ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[5] -> task deadline in unix time (0 if no deadline) var scheduleCmd = redis.NewScript(` -redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[4], "deadline", ARGV[5]) +redis.call("HSET", KEYS[1], + "msg", ARGV[1], + "timeout", ARGV[4], + "deadline", ARGV[5], + "state", "SCHEDULED", + "process_at", ARGV[2]) redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) return 1 `) @@ -367,7 +387,12 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then return 0 end -redis.call("HSET", KEYS[2], "msg", ARGV[4], "timeout", ARGV[5], "deadline", ARGV[6]) +redis.call("HSET", KEYS[2], + "msg", ARGV[4], + "timeout", ARGV[5], + "deadline", ARGV[6], + "state", "SCHEDULED", + "process_at", ARGV[3]) redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) return 1 `) @@ -427,7 +452,10 @@ if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) -redis.call("HSET", KEYS[1], "msg", ARGV[2]) +redis.call("HSET", KEYS[1], + "msg", ARGV[2], + "state", "RETRY", + "process_at", ARGV[3]) local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[4]) @@ -494,7 +522,11 @@ end redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1]) redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4]) redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5]) -redis.call("HSET", KEYS[1], "msg", ARGV[2]) +redis.call("HSET", KEYS[1], + "msg", ARGV[2], + "state", "ARCHIVED", + "process_at", -1, + "last_failed_at", ARGV[3]) local n = redis.call("INCR", KEYS[5]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[5], ARGV[6]) @@ -540,10 +572,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { // and move any tasks that are ready to be processed to the pending set. func (r *RDB) ForwardIfReady(qnames ...string) error { for _, qname := range qnames { - if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil { - return err - } - if err := r.forwardAll(base.RetryKey(qname), base.PendingKey(qname)); err != nil { + if err := r.forwardAll(qname); err != nil { return err } } @@ -553,10 +582,13 @@ func (r *RDB) ForwardIfReady(qnames ...string) error { // KEYS[1] -> source queue (e.g. asynq:{:scheduled or asynq:{}:retry}) // KEYS[2] -> destination queue (e.g. asynq:{}) // ARGV[1] -> current unix time +// ARGV[2] -> task key prefix // Note: Script moves tasks up to 100 at a time to keep the runtime of script short. var forwardCmd = redis.NewScript(` local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) for _, id in ipairs(ids) do + local key = ARGV[2] .. id + redis.call("HSET", key, "state", "PENDING") redis.call("LPUSH", KEYS[2], id) redis.call("ZREM", KEYS[1], id) end @@ -564,23 +596,32 @@ return table.getn(ids)`) // forward moves tasks with a score less than the current unix time // from the src zset to the dst list. It returns the number of tasks moved. -func (r *RDB) forward(src, dst string) (int, error) { - now := float64(time.Now().Unix()) - res, err := forwardCmd.Run(r.client, []string{src, dst}, now).Result() +func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) { + keys := []string{src, dst} + argv := []interface{}{ + time.Now().Unix(), + taskKeyPrefix, + } + res, err := forwardCmd.Run(r.client, keys, argv...).Result() if err != nil { return 0, err } return cast.ToInt(res), nil } -// forwardAll moves tasks with a score less than the current unix time from the src zset, -// until there's no more tasks. -func (r *RDB) forwardAll(src, dst string) (err error) { - n := 1 - for n != 0 { - n, err = r.forward(src, dst) - if err != nil { - return err +// forwardAll moves tasks in scheduled and retry state to pending if the task is ready to processed. +func (r *RDB) forwardAll(qname string) (err error) { + srcs := []string{base.ScheduledKey(qname), base.RetryKey(qname)} + dst := base.PendingKey(qname) + taskKeyPrefix := base.TaskKeyPrefix(qname) + + for _, src := range srcs { + n := 1 + for n != 0 { + n, err = r.forward(src, dst, taskKeyPrefix) + if err != nil { + return err + } } } return nil