diff --git a/client.go b/client.go index 1061393..435769e 100644 --- a/client.go +++ b/client.go @@ -35,5 +35,5 @@ func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error { if time.Now().After(processAt) { return c.rdb.Enqueue(msg) } - return c.rdb.Schedule(rdb.Scheduled, processAt, msg) + return c.rdb.Schedule(msg, processAt) } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 48be550..b8c0d5b 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -173,8 +173,18 @@ func (r *RDB) Done(msg *TaskMessage) error { return nil } -// Schedule adds the task to the zset to be processd at the specified time. -func (r *RDB) Schedule(zset string, processAt time.Time, msg *TaskMessage) error { +// Schedule adds the task to the backlog queue to be processed in the future. +func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { + return r.schedule(Scheduled, processAt, msg) +} + +// RetryLater adds the task to the backlog queue to be retried in the future. +func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { + return r.schedule(Retry, processAt, msg) +} + +// schedule adds the task to the zset to be processd at the specified time. +func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 549e4de..cefa518 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -144,8 +144,8 @@ func TestDequeue(t *testing.T) { } got, err := r.Dequeue(time.Second) if !cmp.Equal(got, tc.want) || err != tc.err { - t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v", - DefaultQueue, got, err, tc.want, tc.err) + t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v", + got, err, tc.want, tc.err) continue } if l := r.client.LLen(InProgress).Val(); l != tc.inProgress { @@ -306,7 +306,7 @@ func TestMoveAll(t *testing.T) { } if err := r.MoveAll(InProgress, DefaultQueue); err != nil { - t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err) + t.Errorf("(*RDB).MoveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err) continue } @@ -370,7 +370,7 @@ func TestForward(t *testing.T) { err := r.Forward(Scheduled) if err != nil { - t.Errorf("(*rdb).forward(%q) = %v, want nil", Scheduled, err) + t.Errorf("(*RDB).Forward(%q) = %v, want nil", Scheduled, err) continue } queued := r.client.LRange(DefaultQueue, 0, -1).Val() @@ -391,12 +391,10 @@ func TestSchedule(t *testing.T) { tests := []struct { msg *TaskMessage processAt time.Time - zset string }{ { randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}), time.Now().Add(15 * time.Minute), - Scheduled, }, } @@ -406,21 +404,64 @@ func TestSchedule(t *testing.T) { t.Fatal(err) } - err := r.Schedule(tc.zset, tc.processAt, tc.msg) + err := r.Schedule(tc.msg, tc.processAt) if err != nil { t.Error(err) continue } - res, err := r.client.ZRangeWithScores(tc.zset, 0, -1).Result() + res, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result() if err != nil { t.Error(err) continue } - desc := fmt.Sprintf("(*rdb).schedule(%q, %v, %v)", tc.zset, tc.processAt, tc.msg) + desc := fmt.Sprintf("(*RDB).Schedule(%v, %v)", tc.msg, tc.processAt) if len(res) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), tc.zset) + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), Scheduled) + continue + } + + if res[0].Score != float64(tc.processAt.Unix()) { + t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix())) + continue + } + } +} + +func TestRetryLater(t *testing.T) { + r := setup(t) + tests := []struct { + msg *TaskMessage + processAt time.Time + }{ + { + randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}), + time.Now().Add(15 * time.Minute), + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + + err := r.RetryLater(tc.msg, tc.processAt) + if err != nil { + t.Error(err) + continue + } + + res, err := r.client.ZRangeWithScores(Retry, 0, -1).Result() + if err != nil { + t.Error(err) + continue + } + + desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt) + if len(res) != 1 { + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), Retry) continue } diff --git a/poller_test.go b/poller_test.go index 81ab104..4821a7d 100644 --- a/poller_test.go +++ b/poller_test.go @@ -67,14 +67,14 @@ func TestPoller(t *testing.T) { } // initialize scheduled queue for _, st := range tc.initScheduled { - err := rdbClient.Schedule(rdb.Scheduled, st.processAt, st.msg) + err := rdbClient.Schedule(st.msg, st.processAt) if err != nil { t.Fatal(err) } } // initialize retry queue for _, st := range tc.initRetry { - err := rdbClient.Schedule(rdb.Retry, st.processAt, st.msg) + err := rdbClient.RetryLater(st.msg, st.processAt) if err != nil { t.Fatal(err) } diff --git a/retry.go b/retry.go index f7a815a..838d07a 100644 --- a/retry.go +++ b/retry.go @@ -21,7 +21,7 @@ func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) { retryAt := time.Now().Add(delaySeconds((msg.Retried))) log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now())) msg.Retried++ - if err := r.Schedule(rdb.Retry, retryAt, msg); err != nil { + if err := r.RetryLater(msg, retryAt); err != nil { log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err) return }