diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 9ee829d..fc3fbd7 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -118,12 +118,16 @@ func (r *RDB) Done(msg *TaskMessage) error { // Schedule adds the task to the backlog queue to be processed in the future. func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error { - return r.schedule(scheduledQ, processAt, msg) -} - -// RetryLater adds the task to the backlog queue to be retried in the future. -func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error { - return r.schedule(retryQ, processAt, msg) + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + } + score := float64(processAt.Unix()) + err = r.client.ZAdd(scheduledQ, &redis.Z{Member: string(bytes), Score: score}).Err() + if err != nil { + return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", scheduledQ, score, string(bytes), err) + } + return nil } // Retry moves the task from in-progress to retry queue, incrementing retry count @@ -150,20 +154,6 @@ func (r *RDB) Retry(msg *TaskMessage, processAt time.Time, errMsg string) error return err } -// schedule adds the task to the zset to be processd at the specified time. -func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error { - bytes, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("could not marshal %+v to json: %v", msg, err) - } - score := float64(processAt.Unix()) - err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err() - if err != nil { - return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err) - } - return nil -} - // Kill sends the task to "dead" queue from in-progress queue, assigning // the error message to the task. // It also trims the set by timestamp and set size. diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 7717b7a..b6f7089 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -357,40 +357,6 @@ func TestSchedule(t *testing.T) { } } -func TestRetryLater(t *testing.T) { - r := setup(t) - tests := []struct { - msg *TaskMessage - processAt time.Time - }{ - { - newTaskMessage("send_email", map[string]interface{}{"subject": "hello"}), - time.Now().Add(15 * time.Minute), - }, - } - - for _, tc := range tests { - flushDB(t, r) // clean up db before each test case - - desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt) - err := r.RetryLater(tc.msg, tc.processAt) - if err != nil { - t.Errorf("%s = %v, want nil", desc, err) - continue - } - - res := r.client.ZRangeWithScores(retryQ, 0, -1).Val() - if len(res) != 1 { - t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ) - continue - } - if res[0].Score != float64(tc.processAt.Unix()) { - t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix())) - continue - } - } -} - func TestRetry(t *testing.T) { r := setup(t) t1 := newTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"})