diff --git a/client.go b/client.go index 5f61313..0e8454f 100644 --- a/client.go +++ b/client.go @@ -17,7 +17,7 @@ func NewClient(opt *RedisOpt) *Client { } // Process enqueues the task to be performed at a given time. -func (c *Client) Process(task *Task, executeAt time.Time) error { +func (c *Client) Process(task *Task, processAt time.Time) error { msg := &taskMessage{ ID: uuid.New(), Type: task.Type, @@ -25,13 +25,13 @@ func (c *Client) Process(task *Task, executeAt time.Time) error { Queue: "default", Retry: defaultMaxRetry, } - return c.enqueue(msg, executeAt) + return c.enqueue(msg, processAt) } // enqueue pushes a given task to the specified queue. -func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error { - if time.Now().After(executeAt) { +func (c *Client) enqueue(msg *taskMessage, processAt time.Time) error { + if time.Now().After(processAt) { return c.rdb.enqueue(msg) } - return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg) + return c.rdb.schedule(scheduled, processAt, msg) } diff --git a/rdb.go b/rdb.go index 430e2c0..c84ffd0 100644 --- a/rdb.go +++ b/rdb.go @@ -93,16 +93,17 @@ func (r *rdb) lrem(key string, msg *taskMessage) error { return nil } -// zadd adds the taskMessage to the specified zset (sorted set) with the given score. -func (r *rdb) zadd(zset string, zscore float64, msg *taskMessage) error { +// schedule adds the task to the zset to be processd at the specified time. +func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not encode task into JSON: %v", err) } - err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: zscore}).Err() + score := float64(processAt.Unix()) + err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err() if err != nil { return fmt.Errorf("command ZADD %s %.1f %s failed: %v", - zset, zscore, string(bytes), err) + zset, score, string(bytes), err) } return nil } diff --git a/rdb_test.go b/rdb_test.go index b723851..0ed8af5 100644 --- a/rdb_test.go +++ b/rdb_test.go @@ -2,6 +2,7 @@ package asynq import ( "encoding/json" + "fmt" "math/rand" "sort" "testing" @@ -265,3 +266,48 @@ func TestForward(t *testing.T) { } } } + +func TestSchedule(t *testing.T) { + r := setup(t) + tests := []struct { + msg *taskMessage + processAt time.Time + zset string + }{ + { + randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}), + time.Now().Add(15 * time.Minute), + scheduled, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + + err := r.schedule(tc.zset, tc.processAt, tc.msg) + if err != nil { + t.Error(err) + continue + } + + res, err := r.client.ZRangeWithScores(tc.zset, 0, -1).Result() + if err != nil { + t.Error(err) + continue + } + + desc := fmt.Sprintf("(*rdb).schedule(%q, %v, %v)", tc.zset, tc.processAt, tc.msg) + if len(res) != 1 { + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), tc.zset) + continue + } + + if res[0].Score != float64(tc.processAt.Unix()) { + t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix())) + continue + } + } +} diff --git a/retry.go b/retry.go index 4319b1d..6e93c07 100644 --- a/retry.go +++ b/retry.go @@ -20,7 +20,7 @@ func retryTask(rdb *rdb, msg *taskMessage, err error) { fmt.Printf("[DEBUG] Retrying the task in %v\n", retryAt.Sub(time.Now())) msg.Retried++ msg.ErrorMsg = err.Error() - if err := rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil { + if err := rdb.schedule(retry, retryAt, msg); err != nil { // TODO(hibiken): Not sure how to handle this error log.Printf("[ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) return