2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Fix: Do not use lua cjson library to encode task to json

Go and Lua json libraries encodes json differently (e.g. order of
key/value) and caused a bug when removing tasks that was previously
encoded by Lua json library and redis was receiving a string generated
by Go json library.
This commit is contained in:
Ken Hibino 2019-12-15 20:05:56 -08:00
parent 442b33a6d2
commit e5686894d3

View File

@ -133,24 +133,29 @@ func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
// Retry moves the task from in-progress to retry queue, incrementing retry count // Retry moves the task from in-progress to retry queue, incrementing retry count
// and assigning error message to the task message. // and assigning error message to the task message.
func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error { func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error {
bytes, err := json.Marshal(msg) bytesToRemove, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
modified := *msg
modified.Retried++
modified.ErrorMsg = errMsg
bytesToAdd, err := json.Marshal(&modified)
if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
}
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:retry // KEYS[2] -> asynq:retry
// ARGV[1] -> TaskMessage value // ARGV[1] -> TaskMessage value to remove from InProgress queue
// ARGV[2] -> error message // ARGV[2] -> TaskMessage value to add to Retry queue
// ARGV[3] -> retry_at UNIX timestamp // ARGV[3] -> retry_at UNIX timestamp
script := redis.NewScript(` script := redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("LREM", KEYS[1], 0, ARGV[1])
local msg = cjson.decode(ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
msg["Retried"] = msg["Retried"] + 1
msg["ErrorMsg"] = ARGV[2]
redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg))
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
_, err = script.Run(r.client, []string{inProgressQ, retryQ}, string(bytes), errMsg, processAt.Unix()).Result() _, err = script.Run(r.client, []string{inProgressQ, retryQ},
string(bytesToRemove), string(bytesToAdd), processAt.Unix()).Result()
return err return err
} }
@ -160,30 +165,34 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
func (r *RDB) Kill(msg *TaskMessage, errMsg string) error { func (r *RDB) Kill(msg *TaskMessage, errMsg string) error {
const maxDeadTask = 10 const maxDeadTask = 10
const deadExpirationInDays = 90 const deadExpirationInDays = 90
bytes, err := json.Marshal(msg) bytesToRemove, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
modified := *msg
modified.ErrorMsg = errMsg
bytesToAdd, err := json.Marshal(&modified)
if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
}
now := time.Now() now := time.Now()
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:dead // KEYS[2] -> asynq:dead
// ARGV[1] -> TaskMessage value // ARGV[1] -> TaskMessage value to remove from InProgress queue
// ARGV[2] -> error message // ARGV[2] -> TaskMessage value to add to Dead queue
// ARGV[3] -> died_at UNIX timestamp // ARGV[3] -> died_at UNIX timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago) // ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in dead queue (e.g., 100) // ARGV[5] -> max number of tasks in dead queue (e.g., 100)
script := redis.NewScript(` script := redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("LREM", KEYS[1], 0, ARGV[1])
local msg = cjson.decode(ARGV[1]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
msg["ErrorMsg"] = ARGV[2]
redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg))
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
_, err = script.Run(r.client, []string{inProgressQ, deadQ}, _, err = script.Run(r.client, []string{inProgressQ, deadQ},
string(bytes), errMsg, now.Unix(), limit, maxDeadTask).Result() string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTask).Result()
return err return err
} }