2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Add Retry method to *RDB

(*RDB).Retry method takes a TaskMessage and will atomically moves the
message from in_progress queue to retry queue. Additionally it
increments the Retried counter and assigns the error message to the
message.
This commit is contained in:
Ken Hibino 2019-12-15 16:15:07 -08:00
parent 1857d12cea
commit 1b1662bb12
4 changed files with 109 additions and 0 deletions

View File

@ -4,6 +4,8 @@ import "github.com/go-redis/redis/v7"
/* /*
TODOs: TODOs:
- [P0] Proper OS Signal handling
- [P0] Wait for a certain amount of time for wokers to finish on TERM signal
- [P0] asynqmon kill <taskID>, asynqmon killall <qname> - [P0] asynqmon kill <taskID>, asynqmon killall <qname>
- [P0] Assigning int or any number type to Payload will be converted to float64 in handler - [P0] Assigning int or any number type to Payload will be converted to float64 in handler
- [P0] Redis Memory Usage, Connection info in stats - [P0] Redis Memory Usage, Connection info in stats

View File

@ -45,6 +45,14 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessa
return out return out
}) })
var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry {
out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].msg.ID.String() < out[j].msg.ID.String()
})
return out
})
func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage { func newTaskMessage(taskType string, payload map[string]interface{}) *TaskMessage {
return &TaskMessage{ return &TaskMessage{
ID: xid.New(), ID: xid.New(),

View File

@ -127,6 +127,30 @@ func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error {
return r.schedule(retryQ, processAt, msg) return r.schedule(retryQ, processAt, msg)
} }
// Retry moves the task from in-progress to retry queue, incrementing retry count
// and assigning error message to the task message.
func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error {
bytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
}
// KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:retry
// ARGV[1] -> TaskMessage value
// ARGV[2] -> error message
// ARGV[3] -> retry_at UNIX timestamp
script := redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1])
local msg = cjson.decode(ARGV[1])
msg["Retried"] = msg["Retried"] + 1
msg["ErrorMsg"] = ARGV[2]
redis.call("ZADD", KEYS[2], ARGV[3], cjson.encode(msg))
return redis.status_reply("OK")
`)
_, err = script.Run(r.client, []string{inProgressQ, retryQ}, string(bytes), errMsg, processAt.Unix()).Result()
return err
}
// schedule adds the task to the zset to be processd at the specified time. // schedule adds the task to the zset to be processd at the specified time.
func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error { func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)

View File

@ -345,3 +345,78 @@ func TestRetryLater(t *testing.T) {
} }
} }
} }
func TestRetry(t *testing.T) {
r := setup(t)
t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"})
t2 := newTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image.jpg"})
t3 := newTaskMessage("reindex", nil)
t1.Retried = 10
errMsg := "SMTP server is not responding"
t1AfterRetry := &TaskMessage{
ID: t1.ID,
Type: t1.Type,
Payload: t1.Payload,
Queue: t1.Queue,
Retry: t1.Retry,
Retried: t1.Retried + 1,
ErrorMsg: errMsg,
}
now := time.Now()
tests := []struct {
inProgress []*TaskMessage
retry []sortedSetEntry
msg *TaskMessage
processAt time.Time
errMsg string
wantInProgress []*TaskMessage
wantRetry []sortedSetEntry
}{
{
inProgress: []*TaskMessage{t1, t2},
retry: []sortedSetEntry{
{t3, now.Add(time.Minute).Unix()},
},
msg: t1,
processAt: now.Add(5 * time.Minute),
errMsg: errMsg,
wantInProgress: []*TaskMessage{t2},
wantRetry: []sortedSetEntry{
{t1AfterRetry, now.Add(5 * time.Minute).Unix()},
{t3, now.Add(time.Minute).Unix()},
},
},
}
for _, tc := range tests {
flushDB(t, r)
seedInProgressQueue(t, r, tc.inProgress)
seedRetryQueue(t, r, tc.retry)
err := r.Retry(tc.msg, tc.processAt, tc.errMsg)
if err != nil {
t.Errorf("(*RDB).Retry = %v, want nil", err)
continue
}
gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val()
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", inProgressQ, diff)
}
gotRetryRaw := r.client.ZRangeWithScores(retryQ, 0, -1).Val()
var gotRetry []sortedSetEntry
for _, z := range gotRetryRaw {
gotRetry = append(gotRetry, sortedSetEntry{
msg: mustUnmarshal(t, z.Member.(string)),
score: int64(z.Score),
})
}
cmpOpt := cmp.AllowUnexported(sortedSetEntry{})
if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff)
}
}
}