2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Rename (*rdb).push to enqueue

This commit is contained in:
Ken Hibino 2019-11-25 19:58:24 -08:00
parent 737de898eb
commit faa9b6ee22
3 changed files with 8 additions and 7 deletions

View File

@ -37,7 +37,7 @@ func (c *Client) Process(task *Task, executeAt time.Time) error {
// enqueue pushes a given task to the specified queue. // enqueue pushes a given task to the specified queue.
func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error { func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error {
if time.Now().After(executeAt) { if time.Now().After(executeAt) {
return c.rdb.push(msg) return c.rdb.enqueue(msg)
} }
return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg) return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg)
} }

7
rdb.go
View File

@ -37,8 +37,9 @@ func newRDB(client *redis.Client) *rdb {
return &rdb{client} return &rdb{client}
} }
// push enqueues the task to queue. // enqueue inserts the given task to the end of the queue.
func (r *rdb) push(msg *taskMessage) error { // It also adds the queue name to the "all-queues" list.
func (r *rdb) enqueue(msg *taskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not encode task into JSON: %v", err) return fmt.Errorf("could not encode task into JSON: %v", err)
@ -130,7 +131,7 @@ func (r *rdb) move(from string, msg *taskMessage) error {
return errSerializeTask return errSerializeTask
} }
if r.client.ZRem(from, string(bytes)).Val() > 0 { if r.client.ZRem(from, string(bytes)).Val() > 0 {
err = r.push(msg) err = r.enqueue(msg)
if err != nil { if err != nil {
log.Printf("[SERVERE ERROR] could not push task to queue %q: %v\n", log.Printf("[SERVERE ERROR] could not push task to queue %q: %v\n",
msg.Queue, err) msg.Queue, err)

View File

@ -42,7 +42,7 @@ func randomTask(taskType, qname string, payload map[string]interface{}) *taskMes
} }
} }
func TestPush(t *testing.T) { func TestEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
tests := []struct { tests := []struct {
msg *taskMessage msg *taskMessage
@ -53,7 +53,7 @@ func TestPush(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
err := r.push(tc.msg) err := r.enqueue(tc.msg)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }
@ -83,7 +83,7 @@ func TestPush(t *testing.T) {
func TestDequeueImmediateReturn(t *testing.T) { func TestDequeueImmediateReturn(t *testing.T) {
r := setup(t) r := setup(t)
msg := randomTask("export_csv", "csv", nil) msg := randomTask("export_csv", "csv", nil)
r.push(msg) r.enqueue(msg)
res, err := r.dequeue("asynq:queues:csv", time.Second) res, err := r.dequeue("asynq:queues:csv", time.Second)
if err != nil { if err != nil {