From 39f177dabff40c12a83de1fa01cd458d2042fd76 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 4 Dec 2019 06:25:58 -0800 Subject: [PATCH] Change rdb Dequeue signature --- internal/rdb/rdb.go | 48 +++++++++++++++++++--------------------- internal/rdb/rdb_test.go | 2 +- processor.go | 2 +- 3 files changed, 25 insertions(+), 27 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 3a310f1..5bf1f2f 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -1,3 +1,4 @@ +// Package rdb encapsulates the interactions with redis. package rdb import ( @@ -22,9 +23,10 @@ const ( InProgress = "asynq:in_progress" // SET ) +// ErrDequeueTimeout indicates that the blocking dequeue operation timed out. var ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") -// RDB encapsulates the interactions with redis server. +// RDB is a client interface to query and mutate task queues. type RDB struct { client *redis.Client } @@ -34,29 +36,25 @@ func NewRDB(client *redis.Client) *RDB { return &RDB{client} } -// TaskMessage is an internal representation of a task with additional metadata fields. -// This data gets written in redis. +// TaskMessage is the internal representation of a task with additional metadata fields. +// Serialized data of this type gets written in redis. type TaskMessage struct { //-------- Task fields -------- - - Type string + // Type represents the kind of task. + Type string + // Payload holds data needed to process the task. Payload map[string]interface{} - //-------- metadata fields -------- - - // unique identifier for each task + //-------- Metadata fields -------- + // ID is a unique identifier for each task ID uuid.UUID - - // queue name this message should be enqueued to + // Queue is a name this message should be enqueued to Queue string - - // max number of retry for this task. + // Retry is the max number of retry for this task. Retry int - - // number of times we've retried so far + // Retried is the number of times we've retried this task so far Retried int - - // error message from the last failure + // ErrorMsg holds the error message from the last failure ErrorMsg string } @@ -71,7 +69,7 @@ type Stats struct { } // EnqueuedTask is a task in a queue and is ready to be processed. -// This is read only and used for inspection purpose. +// Note: This is read only and used for monitoring purpose. type EnqueuedTask struct { ID uuid.UUID Type string @@ -79,7 +77,7 @@ type EnqueuedTask struct { } // InProgressTask is a task that's currently being processed. -// This is read only and used for inspection purpose. +// Note: This is read only and used for monitoring purpose. type InProgressTask struct { ID uuid.UUID Type string @@ -87,7 +85,7 @@ type InProgressTask struct { } // ScheduledTask is a task that's scheduled to be processed in the future. -// This is read only and used for inspection purpose. +// Note: This is read only and used for monitoring purpose. type ScheduledTask struct { ID uuid.UUID Type string @@ -96,7 +94,7 @@ type ScheduledTask struct { } // RetryTask is a task that's in retry queue because worker failed to process the task. -// This is read only and used for inspection purpose. +// Note: This is read only and used for monitoring purpose. type RetryTask struct { ID uuid.UUID Type string @@ -109,7 +107,7 @@ type RetryTask struct { } // DeadTask is a task in that has exhausted all retries. -// This is read only and used for inspection purpose. +// Note: This is read only and used for monitoring purpose. type DeadTask struct { ID uuid.UUID Type string @@ -144,20 +142,20 @@ func (r *RDB) Enqueue(msg *TaskMessage) error { // Dequeue blocks until there is a task available to be processed, // once a task is available, it adds the task to "in progress" list // and returns the task. -func (r *RDB) Dequeue(qname string, timeout time.Duration) (*TaskMessage, error) { - data, err := r.client.BRPopLPush(qname, InProgress, timeout).Result() +func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) { + data, err := r.client.BRPopLPush(DefaultQueue, InProgress, timeout).Result() if err == redis.Nil { return nil, ErrDequeueTimeout } if err != nil { - return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, InProgress, timeout, err) + return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", DefaultQueue, InProgress, timeout, err) } var msg TaskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err) } - fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, qname) + fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, DefaultQueue) return &msg, nil } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 11d6425..cdfc7b6 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -142,7 +142,7 @@ func TestDequeue(t *testing.T) { for _, m := range tc.queued { r.Enqueue(m) } - got, err := r.Dequeue(DefaultQueue, time.Second) + got, err := r.Dequeue(time.Second) if !cmp.Equal(got, tc.want) || err != tc.err { t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v", DefaultQueue, got, err, tc.want, tc.err) diff --git a/processor.go b/processor.go index b539b80..eae667e 100644 --- a/processor.go +++ b/processor.go @@ -70,7 +70,7 @@ func (p *processor) start() { // exec pulls a task out of the queue and starts a worker goroutine to // process the task. func (p *processor) exec() { - msg, err := p.rdb.Dequeue(rdb.DefaultQueue, p.dequeueTimeout) + msg, err := p.rdb.Dequeue(p.dequeueTimeout) if err == rdb.ErrDequeueTimeout { // timed out, this is a normal behavior. return