2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Remove (*RDB).RetryLater in favor of Retry method

This commit is contained in:
Ken Hibino 2019-12-15 17:18:43 -08:00
parent d84e8c0ff2
commit 442b33a6d2
2 changed files with 10 additions and 54 deletions

View File

@ -118,12 +118,16 @@ func (r *RDB) Done(msg *TaskMessage) error {
// Schedule adds the task to the backlog queue to be processed in the future. // Schedule adds the task to the backlog queue to be processed in the future.
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
return r.schedule(scheduledQ, processAt, msg) bytes, err := json.Marshal(msg)
} if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
// RetryLater adds the task to the backlog queue to be retried in the future. }
func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { score := float64(processAt.Unix())
return r.schedule(retryQ, processAt, msg) err = r.client.ZAdd(scheduledQ, &redis.Z{Member: string(bytes), Score: score}).Err()
if err != nil {
return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", scheduledQ, score, string(bytes), err)
}
return nil
} }
// Retry moves the task from in-progress to retry queue, incrementing retry count // Retry moves the task from in-progress to retry queue, incrementing retry count
@ -150,20 +154,6 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
return err return err
} }
// schedule adds the task to the zset to be processd at the specified time.
func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error {
bytes, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
}
score := float64(processAt.Unix())
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err()
if err != nil {
return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err)
}
return nil
}
// Kill sends the task to "dead" queue from in-progress queue, assigning // Kill sends the task to "dead" queue from in-progress queue, assigning
// the error message to the task. // the error message to the task.
// It also trims the set by timestamp and set size. // It also trims the set by timestamp and set size.

View File

@ -357,40 +357,6 @@ func TestSchedule(t *testing.T) {
} }
} }
func TestRetryLater(t *testing.T) {
r := setup(t)
tests := []struct {
msg *TaskMessage
processAt time.Time
}{
{
newTaskMessage("send_email", map[string]interface{}{"subject": "hello"}),
time.Now().Add(15 * time.Minute),
},
}
for _, tc := range tests {
flushDB(t, r) // clean up db before each test case
desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt)
err := r.RetryLater(tc.msg, tc.processAt)
if err != nil {
t.Errorf("%s = %v, want nil", desc, err)
continue
}
res := r.client.ZRangeWithScores(retryQ, 0, -1).Val()
if len(res) != 1 {
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ)
continue
}
if res[0].Score != float64(tc.processAt.Unix()) {
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
continue
}
}
}
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
r := setup(t) r := setup(t)
t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"})