mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Define Schedule and RetryLater method for RDB
This commit is contained in:
parent
985018e1b5
commit
4531e90b9d
@ -35,5 +35,5 @@ func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error {
|
|||||||
if time.Now().After(processAt) {
|
if time.Now().After(processAt) {
|
||||||
return c.rdb.Enqueue(msg)
|
return c.rdb.Enqueue(msg)
|
||||||
}
|
}
|
||||||
return c.rdb.Schedule(rdb.Scheduled, processAt, msg)
|
return c.rdb.Schedule(msg, processAt)
|
||||||
}
|
}
|
||||||
|
@ -173,8 +173,18 @@ func (r *RDB) Done(msg *TaskMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Schedule adds the task to the zset to be processd at the specified time.
|
// Schedule adds the task to the backlog queue to be processed in the future.
|
||||||
func (r *RDB) Schedule(zset string, processAt time.Time, msg *TaskMessage) error {
|
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
||||||
|
return r.schedule(Scheduled, processAt, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RetryLater adds the task to the backlog queue to be retried in the future.
|
||||||
|
func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error {
|
||||||
|
return r.schedule(Retry, processAt, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// schedule adds the task to the zset to be processd at the specified time.
|
||||||
|
func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error {
|
||||||
bytes, err := json.Marshal(msg)
|
bytes, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||||
|
@ -144,8 +144,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
got, err := r.Dequeue(time.Second)
|
got, err := r.Dequeue(time.Second)
|
||||||
if !cmp.Equal(got, tc.want) || err != tc.err {
|
if !cmp.Equal(got, tc.want) || err != tc.err {
|
||||||
t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v",
|
t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v",
|
||||||
DefaultQueue, got, err, tc.want, tc.err)
|
got, err, tc.want, tc.err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if l := r.client.LLen(InProgress).Val(); l != tc.inProgress {
|
if l := r.client.LLen(InProgress).Val(); l != tc.inProgress {
|
||||||
@ -306,7 +306,7 @@ func TestMoveAll(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := r.MoveAll(InProgress, DefaultQueue); err != nil {
|
if err := r.MoveAll(InProgress, DefaultQueue); err != nil {
|
||||||
t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err)
|
t.Errorf("(*RDB).MoveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -370,7 +370,7 @@ func TestForward(t *testing.T) {
|
|||||||
|
|
||||||
err := r.Forward(Scheduled)
|
err := r.Forward(Scheduled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*rdb).forward(%q) = %v, want nil", Scheduled, err)
|
t.Errorf("(*RDB).Forward(%q) = %v, want nil", Scheduled, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
queued := r.client.LRange(DefaultQueue, 0, -1).Val()
|
queued := r.client.LRange(DefaultQueue, 0, -1).Val()
|
||||||
@ -391,12 +391,10 @@ func TestSchedule(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
msg *TaskMessage
|
msg *TaskMessage
|
||||||
processAt time.Time
|
processAt time.Time
|
||||||
zset string
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
|
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
|
||||||
time.Now().Add(15 * time.Minute),
|
time.Now().Add(15 * time.Minute),
|
||||||
Scheduled,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -406,21 +404,64 @@ func TestSchedule(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := r.Schedule(tc.zset, tc.processAt, tc.msg)
|
err := r.Schedule(tc.msg, tc.processAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := r.client.ZRangeWithScores(tc.zset, 0, -1).Result()
|
res, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
desc := fmt.Sprintf("(*rdb).schedule(%q, %v, %v)", tc.zset, tc.processAt, tc.msg)
|
desc := fmt.Sprintf("(*RDB).Schedule(%v, %v)", tc.msg, tc.processAt)
|
||||||
if len(res) != 1 {
|
if len(res) != 1 {
|
||||||
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), tc.zset)
|
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), Scheduled)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if res[0].Score != float64(tc.processAt.Unix()) {
|
||||||
|
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRetryLater(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
tests := []struct {
|
||||||
|
msg *TaskMessage
|
||||||
|
processAt time.Time
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
|
||||||
|
time.Now().Add(15 * time.Minute),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
// clean up db before each test case.
|
||||||
|
if err := r.client.FlushDB().Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.RetryLater(tc.msg, tc.processAt)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := r.client.ZRangeWithScores(Retry, 0, -1).Result()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt)
|
||||||
|
if len(res) != 1 {
|
||||||
|
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), Retry)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -67,14 +67,14 @@ func TestPoller(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// initialize scheduled queue
|
// initialize scheduled queue
|
||||||
for _, st := range tc.initScheduled {
|
for _, st := range tc.initScheduled {
|
||||||
err := rdbClient.Schedule(rdb.Scheduled, st.processAt, st.msg)
|
err := rdbClient.Schedule(st.msg, st.processAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// initialize retry queue
|
// initialize retry queue
|
||||||
for _, st := range tc.initRetry {
|
for _, st := range tc.initRetry {
|
||||||
err := rdbClient.Schedule(rdb.Retry, st.processAt, st.msg)
|
err := rdbClient.RetryLater(st.msg, st.processAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
2
retry.go
2
retry.go
@ -21,7 +21,7 @@ func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) {
|
|||||||
retryAt := time.Now().Add(delaySeconds((msg.Retried)))
|
retryAt := time.Now().Add(delaySeconds((msg.Retried)))
|
||||||
log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now()))
|
log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now()))
|
||||||
msg.Retried++
|
msg.Retried++
|
||||||
if err := r.Schedule(rdb.Retry, retryAt, msg); err != nil {
|
if err := r.RetryLater(msg, retryAt); err != nil {
|
||||||
log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err)
|
log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user