2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Change rdb Dequeue signature

This commit is contained in:
Ken Hibino 2019-12-04 06:25:58 -08:00
parent d4e442d04f
commit 39f177dabf
3 changed files with 25 additions and 27 deletions

View File

@ -1,3 +1,4 @@
// Package rdb encapsulates the interactions with redis.
package rdb package rdb
import ( import (
@ -22,9 +23,10 @@ const (
InProgress = "asynq:in_progress" // SET InProgress = "asynq:in_progress" // SET
) )
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out.
var ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") var ErrDequeueTimeout = errors.New("blocking dequeue operation timed out")
// RDB encapsulates the interactions with redis server. // RDB is a client interface to query and mutate task queues.
type RDB struct { type RDB struct {
client *redis.Client client *redis.Client
} }
@ -34,29 +36,25 @@ func NewRDB(client *redis.Client) *RDB {
return &RDB{client} return &RDB{client}
} }
// TaskMessage is an internal representation of a task with additional metadata fields. // TaskMessage is the internal representation of a task with additional metadata fields.
// This data gets written in redis. // Serialized data of this type gets written in redis.
type TaskMessage struct { type TaskMessage struct {
//-------- Task fields -------- //-------- Task fields --------
// Type represents the kind of task.
Type string Type string
// Payload holds data needed to process the task.
Payload map[string]interface{} Payload map[string]interface{}
//-------- metadata fields -------- //-------- Metadata fields --------
// ID is a unique identifier for each task
// unique identifier for each task
ID uuid.UUID ID uuid.UUID
// Queue is a name this message should be enqueued to
// queue name this message should be enqueued to
Queue string Queue string
// Retry is the max number of retry for this task.
// max number of retry for this task.
Retry int Retry int
// Retried is the number of times we've retried this task so far
// number of times we've retried so far
Retried int Retried int
// ErrorMsg holds the error message from the last failure
// error message from the last failure
ErrorMsg string ErrorMsg string
} }
@ -71,7 +69,7 @@ type Stats struct {
} }
// EnqueuedTask is a task in a queue and is ready to be processed. // EnqueuedTask is a task in a queue and is ready to be processed.
// This is read only and used for inspection purpose. // Note: This is read only and used for monitoring purpose.
type EnqueuedTask struct { type EnqueuedTask struct {
ID uuid.UUID ID uuid.UUID
Type string Type string
@ -79,7 +77,7 @@ type EnqueuedTask struct {
} }
// InProgressTask is a task that's currently being processed. // InProgressTask is a task that's currently being processed.
// This is read only and used for inspection purpose. // Note: This is read only and used for monitoring purpose.
type InProgressTask struct { type InProgressTask struct {
ID uuid.UUID ID uuid.UUID
Type string Type string
@ -87,7 +85,7 @@ type InProgressTask struct {
} }
// ScheduledTask is a task that's scheduled to be processed in the future. // ScheduledTask is a task that's scheduled to be processed in the future.
// This is read only and used for inspection purpose. // Note: This is read only and used for monitoring purpose.
type ScheduledTask struct { type ScheduledTask struct {
ID uuid.UUID ID uuid.UUID
Type string Type string
@ -96,7 +94,7 @@ type ScheduledTask struct {
} }
// RetryTask is a task that's in retry queue because worker failed to process the task. // RetryTask is a task that's in retry queue because worker failed to process the task.
// This is read only and used for inspection purpose. // Note: This is read only and used for monitoring purpose.
type RetryTask struct { type RetryTask struct {
ID uuid.UUID ID uuid.UUID
Type string Type string
@ -109,7 +107,7 @@ type RetryTask struct {
} }
// DeadTask is a task in that has exhausted all retries. // DeadTask is a task in that has exhausted all retries.
// This is read only and used for inspection purpose. // Note: This is read only and used for monitoring purpose.
type DeadTask struct { type DeadTask struct {
ID uuid.UUID ID uuid.UUID
Type string Type string
@ -144,20 +142,20 @@ func (r *RDB) Enqueue(msg *TaskMessage) error {
// Dequeue blocks until there is a task available to be processed, // Dequeue blocks until there is a task available to be processed,
// once a task is available, it adds the task to "in progress" list // once a task is available, it adds the task to "in progress" list
// and returns the task. // and returns the task.
func (r *RDB) Dequeue(qname string, timeout time.Duration) (*TaskMessage, error) { func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) {
data, err := r.client.BRPopLPush(qname, InProgress, timeout).Result() data, err := r.client.BRPopLPush(DefaultQueue, InProgress, timeout).Result()
if err == redis.Nil { if err == redis.Nil {
return nil, ErrDequeueTimeout return nil, ErrDequeueTimeout
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, InProgress, timeout, err) return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", DefaultQueue, InProgress, timeout, err)
} }
var msg TaskMessage var msg TaskMessage
err = json.Unmarshal([]byte(data), &msg) err = json.Unmarshal([]byte(data), &msg)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err) return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err)
} }
fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, qname) fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, DefaultQueue)
return &msg, nil return &msg, nil
} }

View File

@ -142,7 +142,7 @@ func TestDequeue(t *testing.T) {
for _, m := range tc.queued { for _, m := range tc.queued {
r.Enqueue(m) r.Enqueue(m)
} }
got, err := r.Dequeue(DefaultQueue, time.Second) got, err := r.Dequeue(time.Second)
if !cmp.Equal(got, tc.want) || err != tc.err { if !cmp.Equal(got, tc.want) || err != tc.err {
t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v", t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v",
DefaultQueue, got, err, tc.want, tc.err) DefaultQueue, got, err, tc.want, tc.err)

View File

@ -70,7 +70,7 @@ func (p *processor) start() {
// exec pulls a task out of the queue and starts a worker goroutine to // exec pulls a task out of the queue and starts a worker goroutine to
// process the task. // process the task.
func (p *processor) exec() { func (p *processor) exec() {
msg, err := p.rdb.Dequeue(rdb.DefaultQueue, p.dequeueTimeout) msg, err := p.rdb.Dequeue(p.dequeueTimeout)
if err == rdb.ErrDequeueTimeout { if err == rdb.ErrDequeueTimeout {
// timed out, this is a normal behavior. // timed out, this is a normal behavior.
return return