diff --git a/benchmark_test.go b/benchmark_test.go index 09617dd..580bab7 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -12,8 +12,7 @@ import ( "time" ) -// Simple E2E Benchmark testing with no scheduled tasks and -// no retries. +// Simple E2E Benchmark testing with no scheduled tasks and retries. func BenchmarkEndToEndSimple(b *testing.B) { const count = 100000 for n := 0; n < b.N; n++ { @@ -101,3 +100,58 @@ func BenchmarkEndToEnd(b *testing.B) { b.StartTimer() // end teardown } } + +// Simple E2E Benchmark testing with no scheduled tasks and retries with multiple queues. +func BenchmarkEndToEndMultipleQueues(b *testing.B) { + // number of tasks to create for each queue + const ( + highCount = 20000 + defaultCount = 20000 + lowCount = 20000 + ) + for n := 0; n < b.N; n++ { + b.StopTimer() // begin setup + setup(b) + redis := &RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + } + client := NewClient(redis) + bg := NewBackground(redis, &Config{ + Concurrency: 10, + Queues: map[string]uint{ + "high": 6, + "default": 3, + "low": 1, + }, + }) + // Create a bunch of tasks + for i := 0; i < highCount; i++ { + t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) + client.Schedule(t, time.Now(), Queue("high")) + } + for i := 0; i < defaultCount; i++ { + t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) + client.Schedule(t, time.Now()) + } + for i := 0; i < lowCount; i++ { + t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) + client.Schedule(t, time.Now(), Queue("low")) + } + + var wg sync.WaitGroup + wg.Add(highCount + defaultCount + lowCount) + handler := func(t *Task) error { + wg.Done() + return nil + } + b.StartTimer() // end setup + + bg.start(HandlerFunc(handler)) + wg.Wait() + + b.StopTimer() // begin teardown + bg.stop() + b.StartTimer() // end teardown + } +} diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 07ed9d3..578b636 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -86,49 +86,48 @@ type DeadTask struct { Queue string } +// KEYS[1] -> asynq:queues +// KEYS[2] -> asynq:in_progress +// KEYS[3] -> asynq:scheduled +// KEYS[4] -> asynq:retry +// KEYS[5] -> asynq:dead +// KEYS[6] -> asynq:processed: +// KEYS[7] -> asynq:failure: +var currentStatsCmd = redis.NewScript(` +local res = {} +local queues = redis.call("SMEMBERS", KEYS[1]) +for _, qkey in ipairs(queues) do + table.insert(res, qkey) + table.insert(res, redis.call("LLEN", qkey)) +end +table.insert(res, KEYS[2]) +table.insert(res, redis.call("LLEN", KEYS[2])) +table.insert(res, KEYS[3]) +table.insert(res, redis.call("ZCARD", KEYS[3])) +table.insert(res, KEYS[4]) +table.insert(res, redis.call("ZCARD", KEYS[4])) +table.insert(res, KEYS[5]) +table.insert(res, redis.call("ZCARD", KEYS[5])) +local pcount = 0 +local p = redis.call("GET", KEYS[6]) +if p then + pcount = tonumber(p) +end +table.insert(res, "processed") +table.insert(res, pcount) +local fcount = 0 +local f = redis.call("GET", KEYS[7]) +if f then + fcount = tonumber(f) +end +table.insert(res, "failed") +table.insert(res, fcount) +return res`) + // CurrentStats returns a current state of the queues. func (r *RDB) CurrentStats() (*Stats, error) { - // KEYS[1] -> asynq:queues - // KEYS[2] -> asynq:in_progress - // KEYS[3] -> asynq:scheduled - // KEYS[4] -> asynq:retry - // KEYS[5] -> asynq:dead - // KEYS[6] -> asynq:processed: - // KEYS[7] -> asynq:failure: - script := redis.NewScript(` - local res = {} - local queues = redis.call("SMEMBERS", KEYS[1]) - for _, qkey in ipairs(queues) do - table.insert(res, qkey) - table.insert(res, redis.call("LLEN", qkey)) - end - table.insert(res, KEYS[2]) - table.insert(res, redis.call("LLEN", KEYS[2])) - table.insert(res, KEYS[3]) - table.insert(res, redis.call("ZCARD", KEYS[3])) - table.insert(res, KEYS[4]) - table.insert(res, redis.call("ZCARD", KEYS[4])) - table.insert(res, KEYS[5]) - table.insert(res, redis.call("ZCARD", KEYS[5])) - local pcount = 0 - local p = redis.call("GET", KEYS[6]) - if p then - pcount = tonumber(p) - end - table.insert(res, "processed") - table.insert(res, pcount) - local fcount = 0 - local f = redis.call("GET", KEYS[7]) - if f then - fcount = tonumber(f) - end - table.insert(res, "failed") - table.insert(res, fcount) - return res - `) - now := time.Now() - res, err := script.Run(r.client, []string{ + res, err := currentStatsCmd.Run(r.client, []string{ base.AllQueues, base.InProgressQueue, base.ScheduledQueue, @@ -173,6 +172,17 @@ func (r *RDB) CurrentStats() (*Stats, error) { return stats, nil } +var historicalStatsCmd = redis.NewScript(` +local res = {} +for _, key in ipairs(KEYS) do + local n = redis.call("GET", key) + if not n then + n = 0 + end + table.insert(res, tonumber(n)) +end +return res`) + // HistoricalStats returns a list of stats from the last n days. func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error) { if n < 1 { @@ -188,18 +198,7 @@ func (r *RDB) HistoricalStats(n int) ([]*DailyStats, error) { keys = append(keys, base.ProcessedKey(ts)) keys = append(keys, base.FailureKey(ts)) } - script := redis.NewScript(` - local res = {} - for _, key in ipairs(KEYS) do - local n = redis.call("GET", key) - if not n then - n = 0 - end - table.insert(res, tonumber(n)) - end - return res - `) - res, err := script.Run(r.client, keys, len(keys)).Result() + res, err := historicalStatsCmd.Run(r.client, keys, len(keys)).Result() if err != nil { return nil, err } @@ -475,21 +474,21 @@ func (r *RDB) EnqueueAllDeadTasks() (int64, error) { return r.removeAndEnqueueAll(base.DeadQueue) } -func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { - script := redis.NewScript(` - local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) - for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - if decoded["ID"] == ARGV[2] then - redis.call("ZREM", KEYS[1], msg) - local qkey = ARGV[3] .. decoded["Queue"] - redis.call("LPUSH", qkey, msg) - return 1 - end +var removeAndEnqueueCmd = redis.NewScript(` +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + if decoded["ID"] == ARGV[2] then + local qkey = ARGV[3] .. decoded["Queue"] + redis.call("LPUSH", qkey, msg) + redis.call("ZREM", KEYS[1], msg) + return 1 end - return 0 - `) - res, err := script.Run(r.client, []string{zset}, score, id, base.QueuePrefix).Result() +end +return 0`) + +func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { + res, err := removeAndEnqueueCmd.Run(r.client, []string{zset}, score, id, base.QueuePrefix).Result() if err != nil { return 0, err } @@ -500,18 +499,18 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { return n, nil } +var removeAndEnqueueAllCmd = redis.NewScript(` +local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + local qkey = ARGV[1] .. decoded["Queue"] + redis.call("LPUSH", qkey, msg) + redis.call("ZREM", KEYS[1], msg) +end +return table.getn(msgs)`) + func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) { - script := redis.NewScript(` - local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) - for _, msg in ipairs(msgs) do - redis.call("ZREM", KEYS[1], msg) - local decoded = cjson.decode(msg) - local qkey = ARGV[1] .. decoded["Queue"] - redis.call("LPUSH", qkey, msg) - end - return table.getn(msgs) - `) - res, err := script.Run(r.client, []string{zset}, base.QueuePrefix).Result() + res, err := removeAndEnqueueAllCmd.Run(r.client, []string{zset}, base.QueuePrefix).Result() if err != nil { return 0, err } @@ -562,31 +561,31 @@ func (r *RDB) KillAllScheduledTasks() (int64, error) { return r.removeAndKillAll(base.ScheduledQueue) } -func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { - // KEYS[1] -> ZSET to move task from (e.g., retry queue) - // KEYS[2] -> asynq:dead - // ARGV[1] -> score of the task to kill - // ARGV[2] -> id of the task to kill - // ARGV[3] -> current timestamp - // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) - // ARGV[5] -> max number of tasks in dead queue (e.g., 100) - script := redis.NewScript(` - local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) - for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - if decoded["ID"] == ARGV[2] then - redis.call("ZREM", KEYS[1], msg) - redis.call("ZADD", KEYS[2], ARGV[3], msg) - redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) - redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) - return 1 - end +// KEYS[1] -> ZSET to move task from (e.g., retry queue) +// KEYS[2] -> asynq:dead +// ARGV[1] -> score of the task to kill +// ARGV[2] -> id of the task to kill +// ARGV[3] -> current timestamp +// ARGV[4] -> cutoff timestamp (e.g., 90 days ago) +// ARGV[5] -> max number of tasks in dead queue (e.g., 100) +var removeAndKillCmd = redis.NewScript(` +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + if decoded["ID"] == ARGV[2] then + redis.call("ZREM", KEYS[1], msg) + redis.call("ZADD", KEYS[2], ARGV[3], msg) + redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) + redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) + return 1 end - return 0 - `) +end +return 0`) + +func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - res, err := script.Run(r.client, + res, err := removeAndKillCmd.Run(r.client, []string{zset, base.DeadQueue}, score, id, now.Unix(), limit, maxDeadTasks).Result() if err != nil { @@ -599,25 +598,25 @@ func (r *RDB) removeAndKill(zset, id string, score float64) (int64, error) { return n, nil } +// KEYS[1] -> ZSET to move task from (e.g., retry queue) +// KEYS[2] -> asynq:dead +// ARGV[1] -> current timestamp +// ARGV[2] -> cutoff timestamp (e.g., 90 days ago) +// ARGV[3] -> max number of tasks in dead queue (e.g., 100) +var removeAndKillAllCmd = redis.NewScript(` +local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) +for _, msg in ipairs(msgs) do + redis.call("ZADD", KEYS[2], ARGV[1], msg) + redis.call("ZREM", KEYS[1], msg) + redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) + redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) +end +return table.getn(msgs)`) + func (r *RDB) removeAndKillAll(zset string) (int64, error) { - // KEYS[1] -> ZSET to move task from (e.g., retry queue) - // KEYS[2] -> asynq:dead - // ARGV[1] -> current timestamp - // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) - // ARGV[3] -> max number of tasks in dead queue (e.g., 100) - script := redis.NewScript(` - local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) - for _, msg in ipairs(msgs) do - redis.call("ZREM", KEYS[1], msg) - redis.call("ZADD", KEYS[2], ARGV[1], msg) - redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2]) - redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3]) - end - return table.getn(msgs) - `) now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - res, err := script.Run(r.client, []string{zset, base.DeadQueue}, + res, err := removeAndKillAllCmd.Run(r.client, []string{zset, base.DeadQueue}, now.Unix(), limit, maxDeadTasks).Result() if err != nil { return 0, err @@ -650,19 +649,19 @@ func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error { return r.deleteTask(base.ScheduledQueue, id.String(), float64(score)) } -func (r *RDB) deleteTask(zset, id string, score float64) error { - script := redis.NewScript(` - local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) - for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - if decoded["ID"] == ARGV[2] then - redis.call("ZREM", KEYS[1], msg) - return 1 - end +var deleteTaskCmd = redis.NewScript(` +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + if decoded["ID"] == ARGV[2] then + redis.call("ZREM", KEYS[1], msg) + return 1 end - return 0 - `) - res, err := script.Run(r.client, []string{zset}, score, id).Result() +end +return 0`) + +func (r *RDB) deleteTask(zset, id string, score float64) error { + res, err := deleteTaskCmd.Run(r.client, []string{zset}, score, id).Result() if err != nil { return err } @@ -709,6 +708,27 @@ func (e *ErrQueueNotEmpty) Error() string { return fmt.Sprintf("queue %q is not empty", e.qname) } +// Skip checking whether queue is empty before removing. +var removeQueueForceCmd = redis.NewScript(` +local n = redis.call("SREM", KEYS[1], KEYS[2]) +if n == 0 then + return redis.error_reply("LIST NOT FOUND") +end +redis.call("DEL", KEYS[2]) +return redis.status_reply("OK")`) + +// Checks whether queue is empty before removing. +var removeQueueCmd = redis.NewScript(` +local l = redis.call("LLEN", KEYS[2]) if l > 0 then + return redis.error_reply("LIST NOT EMPTY") +end +local n = redis.call("SREM", KEYS[1], KEYS[2]) +if n == 0 then + return redis.error_reply("LIST NOT FOUND") +end +redis.call("DEL", KEYS[2]) +return redis.status_reply("OK")`) + // RemoveQueue removes the specified queue. // // If force is set to true, it will remove the queue regardless @@ -718,27 +738,9 @@ func (e *ErrQueueNotEmpty) Error() string { func (r *RDB) RemoveQueue(qname string, force bool) error { var script *redis.Script if force { - script = redis.NewScript(` - local n = redis.call("SREM", KEYS[1], KEYS[2]) - if n == 0 then - return redis.error_reply("LIST NOT FOUND") - end - redis.call("DEL", KEYS[2]) - return redis.status_reply("OK") - `) + script = removeQueueForceCmd } else { - script = redis.NewScript(` - local l = redis.call("LLEN", KEYS[2]) - if l > 0 then - return redis.error_reply("LIST NOT EMPTY") - end - local n = redis.call("SREM", KEYS[1], KEYS[2]) - if n == 0 then - return redis.error_reply("LIST NOT FOUND") - end - redis.call("DEL", KEYS[2]) - return redis.status_reply("OK") - `) + script = removeQueueCmd } err := script.Run(r.client, []string{base.AllQueues, base.QueueKey(qname)}, @@ -756,23 +758,23 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { return nil } +// Note: Script also removes stale keys. +var listProcessesCmd = redis.NewScript(` +local res = {} +local now = tonumber(ARGV[1]) +local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") +for _, key in ipairs(keys) do + local ps = redis.call("GET", key) + if ps then + table.insert(res, ps) + end +end +redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) +return res`) + // ListProcesses returns the list of process statuses. func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) { - // Note: Script also removes stale keys. - script := redis.NewScript(` - local res = {} - local now = tonumber(ARGV[1]) - local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") - for _, key in ipairs(keys) do - local ps = redis.call("GET", key) - if ps then - table.insert(res, ps) - end - end - redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) - return res - `) - res, err := script.Run(r.client, + res, err := listProcessesCmd.Run(r.client, []string{base.AllProcesses}, time.Now().UTC().Unix()).Result() if err != nil { return nil, err diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index ae05a55..b8e1259 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -41,6 +41,14 @@ func (r *RDB) Close() error { return r.client.Close() } +// KEYS[1] -> asynq:queues: +// KEYS[2] -> asynq:queues +// ARGV[1] -> task message data +var enqueueCmd = redis.NewScript(` +redis.call("LPUSH", KEYS[1], ARGV[1]) +redis.call("SADD", KEYS[2], KEYS[1]) +return 1`) + // Enqueue inserts the given task to the tail of the queue. func (r *RDB) Enqueue(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) @@ -48,23 +56,18 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { return err } key := base.QueueKey(msg.Queue) - script := redis.NewScript(` - redis.call("LPUSH", KEYS[1], ARGV[1]) - redis.call("SADD", KEYS[2], KEYS[1]) - return 1 - `) - return script.Run(r.client, []string{key, base.AllQueues}, string(bytes)).Err() + return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, bytes).Err() } -// Dequeue queries given queues in order and pops a task message if there -// is one and returns it. If all queues are empty, ErrNoProcessableTask -// error is returned. +// Dequeue queries given queues in order and pops a task message if there is one and returns it. +// If all queues are empty, ErrNoProcessableTask error is returned. func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { var data string var err error if len(qnames) == 1 { data, err = r.dequeueSingle(base.QueueKey(qnames[0])) } else { + // TODO(hibiken): Take keys are argument and don't compute every time var keys []string for _, q := range qnames { keys = append(keys, base.QueueKey(q)) @@ -90,72 +93,74 @@ func (r *RDB) dequeueSingle(queue string) (data string, err error) { return r.client.BRPopLPush(queue, base.InProgressQueue, time.Second).Result() } +// KEYS[1] -> asynq:in_progress +// ARGV -> List of queues to query in order +var dequeueCmd = redis.NewScript(` +local res +for _, qkey in ipairs(ARGV) do + res = redis.call("RPOPLPUSH", qkey, KEYS[1]) + if res then + return res + end +end +return res`) + func (r *RDB) dequeue(queues ...string) (data string, err error) { var args []interface{} for _, qkey := range queues { args = append(args, qkey) } - script := redis.NewScript(` - local res - for _, qkey in ipairs(ARGV) do - res = redis.call("RPOPLPUSH", qkey, KEYS[1]) - if res then - return res - end - end - return res - `) - res, err := script.Run(r.client, []string{base.InProgressQueue}, args...).Result() + res, err := dequeueCmd.Run(r.client, []string{base.InProgressQueue}, args...).Result() if err != nil { return "", err } return cast.ToStringE(res) } +// KEYS[1] -> asynq:in_progress +// KEYS[2] -> asynq:processed: +// ARGV[1] -> base.TaskMessage value +// ARGV[2] -> stats expiration timestamp +// Note: LREM count ZERO means "remove all elements equal to val" +var doneCmd = redis.NewScript(` +redis.call("LREM", KEYS[1], 0, ARGV[1]) +local n = redis.call("INCR", KEYS[2]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[2], ARGV[2]) +end +return redis.status_reply("OK") +`) + // Done removes the task from in-progress queue to mark the task as done. func (r *RDB) Done(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return err } - // Note: LREM count ZERO means "remove all elements equal to val" - // KEYS[1] -> asynq:in_progress - // KEYS[2] -> asynq:processed: - // ARGV[1] -> base.TaskMessage value - // ARGV[2] -> stats expiration timestamp - script := redis.NewScript(` - redis.call("LREM", KEYS[1], 0, ARGV[1]) - local n = redis.call("INCR", KEYS[2]) - if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[2], ARGV[2]) - end - return redis.status_reply("OK") - `) now := time.Now() processedKey := base.ProcessedKey(now) expireAt := now.Add(statsTTL) - return script.Run(r.client, + return doneCmd.Run(r.client, []string{base.InProgressQueue, processedKey}, - string(bytes), expireAt.Unix()).Err() + bytes, expireAt.Unix()).Err() } -// Requeue moves the task from in-progress queue to the default -// queue. +// KEYS[1] -> asynq:in_progress +// KEYS[2] -> asynq:queues: +// ARGV[1] -> base.TaskMessage value +// Note: Use RPUSH to push to the head of the queue. +var requeueCmd = redis.NewScript(` +redis.call("LREM", KEYS[1], 0, ARGV[1]) +redis.call("RPUSH", KEYS[2], ARGV[1]) +return redis.status_reply("OK")`) + +// Requeue moves the task from in-progress queue to the specified queue. func (r *RDB) Requeue(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return err } - // Note: Use RPUSH to push to the head of the queue. - // KEYS[1] -> asynq:in_progress - // KEYS[2] -> asynq:queues:default - // ARGV[1] -> base.TaskMessage value - script := redis.NewScript(` - redis.call("LREM", KEYS[1], 0, ARGV[1]) - redis.call("RPUSH", KEYS[2], ARGV[1]) - return redis.status_reply("OK") - `) - return script.Run(r.client, + return requeueCmd.Run(r.client, []string{base.InProgressQueue, base.QueueKey(msg.Queue)}, string(bytes)).Err() } @@ -171,6 +176,27 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { &redis.Z{Member: string(bytes), Score: score}).Err() } +// KEYS[1] -> asynq:in_progress +// KEYS[2] -> asynq:retry +// KEYS[3] -> asynq:processed: +// KEYS[4] -> asynq:failure: +// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue +// ARGV[2] -> base.TaskMessage value to add to Retry queue +// ARGV[3] -> retry_at UNIX timestamp +// ARGV[4] -> stats expiration timestamp +var retryCmd = redis.NewScript(` +redis.call("LREM", KEYS[1], 0, ARGV[1]) +redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) +local n = redis.call("INCR", KEYS[3]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[4]) +end +local m = redis.call("INCR", KEYS[4]) +if tonumber(m) == 1 then + redis.call("EXPIREAT", KEYS[4], ARGV[4]) +end +return redis.status_reply("OK")`) + // Retry moves the task from in-progress to retry queue, incrementing retry count // and assigning error message to the task message. func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { @@ -185,32 +211,11 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e if err != nil { return err } - // KEYS[1] -> asynq:in_progress - // KEYS[2] -> asynq:retry - // KEYS[3] -> asynq:processed: - // KEYS[4] -> asynq:failure: - // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue - // ARGV[2] -> base.TaskMessage value to add to Retry queue - // ARGV[3] -> retry_at UNIX timestamp - // ARGV[4] -> stats expiration timestamp - script := redis.NewScript(` - redis.call("LREM", KEYS[1], 0, ARGV[1]) - redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) - local n = redis.call("INCR", KEYS[3]) - if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[3], ARGV[4]) - end - local m = redis.call("INCR", KEYS[4]) - if tonumber(m) == 1 then - redis.call("EXPIREAT", KEYS[4], ARGV[4]) - end - return redis.status_reply("OK") - `) now := time.Now() processedKey := base.ProcessedKey(now) failureKey := base.FailureKey(now) expireAt := now.Add(statsTTL) - return script.Run(r.client, + return retryCmd.Run(r.client, []string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey}, string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Err() } @@ -220,6 +225,31 @@ const ( deadExpirationInDays = 90 ) +// KEYS[1] -> asynq:in_progress +// KEYS[2] -> asynq:dead +// KEYS[3] -> asynq:processed: +// KEYS[4] -> asynq.failure: +// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue +// ARGV[2] -> base.TaskMessage value to add to Dead queue +// ARGV[3] -> died_at UNIX timestamp +// ARGV[4] -> cutoff timestamp (e.g., 90 days ago) +// ARGV[5] -> max number of tasks in dead queue (e.g., 100) +// ARGV[6] -> stats expiration timestamp +var killCmd = redis.NewScript(` +redis.call("LREM", KEYS[1], 0, ARGV[1]) +redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) +redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) +redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) +local n = redis.call("INCR", KEYS[3]) +if tonumber(n) == 1 then + redis.call("EXPIREAT", KEYS[3], ARGV[6]) +end +local m = redis.call("INCR", KEYS[4]) +if tonumber(m) == 1 then + redis.call("EXPIREAT", KEYS[4], ARGV[6]) +end +return redis.status_reply("OK")`) + // Kill sends the task to "dead" queue from in-progress queue, assigning // the error message to the task. // It also trims the set by timestamp and set size. @@ -239,50 +269,27 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { processedKey := base.ProcessedKey(now) failureKey := base.FailureKey(now) expireAt := now.Add(statsTTL) - // KEYS[1] -> asynq:in_progress - // KEYS[2] -> asynq:dead - // KEYS[3] -> asynq:processed: - // KEYS[4] -> asynq.failure: - // ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue - // ARGV[2] -> base.TaskMessage value to add to Dead queue - // ARGV[3] -> died_at UNIX timestamp - // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) - // ARGV[5] -> max number of tasks in dead queue (e.g., 100) - // ARGV[6] -> stats expiration timestamp - script := redis.NewScript(` - redis.call("LREM", KEYS[1], 0, ARGV[1]) - redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) - redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) - redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) - local n = redis.call("INCR", KEYS[3]) - if tonumber(n) == 1 then - redis.call("EXPIREAT", KEYS[3], ARGV[6]) - end - local m = redis.call("INCR", KEYS[4]) - if tonumber(m) == 1 then - redis.call("EXPIREAT", KEYS[4], ARGV[6]) - end - return redis.status_reply("OK") - `) - return script.Run(r.client, + return killCmd.Run(r.client, []string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey}, string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() } -// RestoreUnfinished moves all tasks from in-progress list to the queue +// KEYS[1] -> asynq:in_progress +// ARGV[1] -> queue prefix +var requeueAllCmd = redis.NewScript(` +local msgs = redis.call("LRANGE", KEYS[1], 0, -1) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + local qkey = ARGV[1] .. decoded["Queue"] + redis.call("RPUSH", qkey, msg) + redis.call("LREM", KEYS[1], 0, msg) +end +return table.getn(msgs)`) + +// RequeueAll moves all tasks from in-progress list to the queue // and reports the number of tasks restored. -func (r *RDB) RestoreUnfinished() (int64, error) { - script := redis.NewScript(` - local msgs = redis.call("LRANGE", KEYS[1], 0, -1) - for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - local qkey = ARGV[1] .. decoded["Queue"] - redis.call("LREM", KEYS[1], 0, msg) - redis.call("RPUSH", qkey, msg) - end - return table.getn(msgs) - `) - res, err := script.Run(r.client, []string{base.InProgressQueue}, base.QueuePrefix).Result() +func (r *RDB) RequeueAll() (int64, error) { + res, err := requeueAllCmd.Run(r.client, []string{base.InProgressQueue}, base.QueuePrefix).Result() if err != nil { return 0, err } @@ -313,40 +320,55 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error { return nil } +// KEYS[1] -> source queue (e.g. scheduled or retry queue) +// ARGV[1] -> current unix time +// ARGV[2] -> queue prefix +var forwardCmd = redis.NewScript(` +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) +for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + local qkey = ARGV[2] .. decoded["Queue"] + redis.call("LPUSH", qkey, msg) + redis.call("ZREM", KEYS[1], msg) +end +return msgs`) + // forward moves all tasks with a score less than the current unix time // from the src zset. func (r *RDB) forward(src string) error { - script := redis.NewScript(` - local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) - for _, msg in ipairs(msgs) do - redis.call("ZREM", KEYS[1], msg) - local decoded = cjson.decode(msg) - local qkey = ARGV[2] .. decoded["Queue"] - redis.call("LPUSH", qkey, msg) - end - return msgs - `) now := float64(time.Now().Unix()) - return script.Run(r.client, + return forwardCmd.Run(r.client, []string{src}, now, base.QueuePrefix).Err() } +// KEYS[1] -> source queue (e.g. scheduled or retry queue) +// KEYS[2] -> destination queue +var forwardSingleCmd = redis.NewScript(` +local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) +for _, msg in ipairs(msgs) do + redis.call("LPUSH", KEYS[2], msg) + redis.call("ZREM", KEYS[1], msg) +end +return msgs`) + // forwardSingle moves all tasks with a score less than the current unix time // from the src zset to dst list. func (r *RDB) forwardSingle(src, dst string) error { - script := redis.NewScript(` - local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) - for _, msg in ipairs(msgs) do - redis.call("ZREM", KEYS[1], msg) - redis.call("LPUSH", KEYS[2], msg) - end - return msgs - `) now := float64(time.Now().Unix()) - return script.Run(r.client, + return forwardSingleCmd.Run(r.client, []string{src, dst}, now).Err() } +// KEYS[1] -> asynq:ps +// KEYS[2] -> asynq:ps: +// ARGV[1] -> expiration time +// ARGV[2] -> TTL in seconds +// ARGV[3] -> process info +var writeProcessInfoCmd = redis.NewScript(` +redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) +redis.call("SETEX", KEYS[2], ARGV[2], ARGV[3]) +return redis.status_reply("OK")`) + // WriteProcessInfo writes process information to redis with expiration // set to the value ttl. func (r *RDB) WriteProcessInfo(ps *base.ProcessInfo, ttl time.Duration) error { @@ -358,17 +380,7 @@ func (r *RDB) WriteProcessInfo(ps *base.ProcessInfo, ttl time.Duration) error { // ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996 exp := time.Now().Add(ttl).UTC() key := base.ProcessInfoKey(ps.Host, ps.PID) - // KEYS[1] -> asynq:ps - // KEYS[2] -> asynq:ps: - // ARGV[1] -> expiration time - // ARGV[2] -> TTL in seconds - // ARGV[3] -> process info - script := redis.NewScript(` - redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) - redis.call("SETEX", KEYS[2], ARGV[2], ARGV[3]) - return redis.status_reply("OK") - `) - return script.Run(r.client, []string{base.AllProcesses, key}, float64(exp.Unix()), ttl.Seconds(), string(bytes)).Err() + return writeProcessInfoCmd.Run(r.client, []string{base.AllProcesses, key}, float64(exp.Unix()), ttl.Seconds(), string(bytes)).Err() } // ReadProcessInfo reads process information stored in redis. @@ -386,13 +398,15 @@ func (r *RDB) ReadProcessInfo(host string, pid int) (*base.ProcessInfo, error) { return &pinfo, nil } +// KEYS[1] -> asynq:ps +// KEYS[2] -> asynq:ps: +var clearProcessInfoCmd = redis.NewScript(` +redis.call("ZREM", KEYS[1], KEYS[2]) +redis.call("DEL", KEYS[2]) +return redis.status_reply("OK")`) + // ClearProcessInfo deletes process information from redis. func (r *RDB) ClearProcessInfo(ps *base.ProcessInfo) error { key := base.ProcessInfoKey(ps.Host, ps.PID) - script := redis.NewScript(` - redis.call("ZREM", KEYS[1], KEYS[2]) - redis.call("DEL", KEYS[2]) - return redis.status_reply("OK") - `) - return script.Run(r.client, []string{base.AllProcesses, key}).Err() + return clearProcessInfoCmd.Run(r.client, []string{base.AllProcesses, key}).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index d478608..8b8d52e 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -541,7 +541,7 @@ func TestKill(t *testing.T) { } } -func TestRestoreUnfinished(t *testing.T) { +func TestRequeueAll(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("export_csv", nil) @@ -613,9 +613,9 @@ func TestRestoreUnfinished(t *testing.T) { h.SeedEnqueuedQueue(t, r.client, msgs, qname) } - got, err := r.RestoreUnfinished() + got, err := r.RequeueAll() if got != tc.want || err != nil { - t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want) + t.Errorf("(*RDB).RequeueAll() = %v %v, want %v nil", got, err, tc.want) continue } diff --git a/processor.go b/processor.go index 32b63a3..738128c 100644 --- a/processor.go +++ b/processor.go @@ -191,7 +191,7 @@ func (p *processor) exec() { // restore moves all tasks from "in-progress" back to queue // to restore all unfinished tasks. func (p *processor) restore() { - n, err := p.rdb.RestoreUnfinished() + n, err := p.rdb.RequeueAll() if err != nil { logger.error("Could not restore unfinished tasks: %v", err) }