From faa9b6ee22a6d1a0efbda2c6d422dea36234b5a8 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 25 Nov 2019 19:58:24 -0800 Subject: [PATCH] Rename (*rdb).push to enqueue --- client.go | 2 +- rdb.go | 7 ++++--- rdb_test.go | 6 +++--- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/client.go b/client.go index 45c7f79..6a4d9a6 100644 --- a/client.go +++ b/client.go @@ -37,7 +37,7 @@ func (c *Client) Process(task *Task, executeAt time.Time) error { // enqueue pushes a given task to the specified queue. func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error { if time.Now().After(executeAt) { - return c.rdb.push(msg) + return c.rdb.enqueue(msg) } return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg) } diff --git a/rdb.go b/rdb.go index 556c246..425110a 100644 --- a/rdb.go +++ b/rdb.go @@ -37,8 +37,9 @@ func newRDB(client *redis.Client) *rdb { return &rdb{client} } -// push enqueues the task to queue. -func (r *rdb) push(msg *taskMessage) error { +// enqueue inserts the given task to the end of the queue. +// It also adds the queue name to the "all-queues" list. +func (r *rdb) enqueue(msg *taskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not encode task into JSON: %v", err) @@ -130,7 +131,7 @@ func (r *rdb) move(from string, msg *taskMessage) error { return errSerializeTask } if r.client.ZRem(from, string(bytes)).Val() > 0 { - err = r.push(msg) + err = r.enqueue(msg) if err != nil { log.Printf("[SERVERE ERROR] could not push task to queue %q: %v\n", msg.Queue, err) diff --git a/rdb_test.go b/rdb_test.go index 8c1dde6..ee3c72f 100644 --- a/rdb_test.go +++ b/rdb_test.go @@ -42,7 +42,7 @@ func randomTask(taskType, qname string, payload map[string]interface{}) *taskMes } } -func TestPush(t *testing.T) { +func TestEnqueue(t *testing.T) { r := setup(t) tests := []struct { msg *taskMessage @@ -53,7 +53,7 @@ func TestPush(t *testing.T) { } for _, tc := range tests { - err := r.push(tc.msg) + err := r.enqueue(tc.msg) if err != nil { t.Error(err) } @@ -83,7 +83,7 @@ func TestPush(t *testing.T) { func TestDequeueImmediateReturn(t *testing.T) { r := setup(t) msg := randomTask("export_csv", "csv", nil) - r.push(msg) + r.enqueue(msg) res, err := r.dequeue("asynq:queues:csv", time.Second) if err != nil {