From ca78b92078d931752a31461d8f6c223d16941127 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 6 Jan 2020 06:53:40 -0800 Subject: [PATCH] Add Queue option to allow user to specify queue from client Added base.QueueKey method to get redis key for given queue name. Changed asynqtest.GetEnqueuedMessages to optionally take queue name. --- client.go | 21 ++++++- client_test.go | 104 +++++++++++++++++++++++--------- internal/asynqtest/asynqtest.go | 12 +++- internal/base/base.go | 24 +++++--- internal/base/base_test.go | 16 +++++ internal/rdb/rdb.go | 2 +- 6 files changed, 136 insertions(+), 43 deletions(-) diff --git a/client.go b/client.go index 64e92e9..1f228d1 100644 --- a/client.go +++ b/client.go @@ -5,6 +5,7 @@ package asynq import ( + "strings" "time" "github.com/go-redis/redis/v7" @@ -32,8 +33,11 @@ func NewClient(r *redis.Client) *Client { // Option specifies the processing behavior for the associated task. type Option interface{} -// max number of times a task will be retried. -type retryOption int +// Internal option representations. +type ( + retryOption int + queueOption string +) // MaxRetry returns an option to specify the max number of times // a task will be retried. @@ -46,18 +50,29 @@ func MaxRetry(n int) Option { return retryOption(n) } +// Queue returns an option to specify which queue to enqueue this task into. +// +// Queue name is case-insensitive and the lowercased version is used. +func Queue(qname string) Option { + return queueOption(strings.ToLower(qname)) +} + type option struct { retry int + queue string } func composeOptions(opts ...Option) option { res := option{ retry: defaultMaxRetry, + queue: base.DefaultQueueName, } for _, opt := range opts { switch opt := opt.(type) { case retryOption: res.retry = int(opt) + case queueOption: + res.queue = string(opt) default: // ignore unexpected option } @@ -83,7 +98,7 @@ func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error ID: xid.New(), Type: task.Type, Payload: task.Payload.data, - Queue: "default", + Queue: opt.queue, Retry: opt.retry, } return c.enqueue(msg, processAt) diff --git a/client_test.go b/client_test.go index 32aebc6..eb51b67 100644 --- a/client_test.go +++ b/client_test.go @@ -24,7 +24,7 @@ func TestClient(t *testing.T) { task *Task processAt time.Time opts []Option - wantEnqueued []*base.TaskMessage + wantEnqueued map[string][]*base.TaskMessage wantScheduled []h.ZSetEntry }{ { @@ -32,12 +32,14 @@ func TestClient(t *testing.T) { task: task, processAt: time.Now(), opts: []Option{}, - wantEnqueued: []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: defaultMaxRetry, - Queue: "default", + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "default", + }, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil @@ -67,12 +69,14 @@ func TestClient(t *testing.T) { opts: []Option{ MaxRetry(3), }, - wantEnqueued: []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 3, - Queue: "default", + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: 3, + Queue: "default", + }, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil @@ -84,12 +88,14 @@ func TestClient(t *testing.T) { opts: []Option{ MaxRetry(-2), }, - wantEnqueued: []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 0, // Retry count should be set to zero - Queue: "default", + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: 0, // Retry count should be set to zero + Queue: "default", + }, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil @@ -102,12 +108,52 @@ func TestClient(t *testing.T) { MaxRetry(2), MaxRetry(10), }, - wantEnqueued: []*base.TaskMessage{ - &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload.data, - Retry: 10, // Last option takes precedence - Queue: "default", + wantEnqueued: map[string][]*base.TaskMessage{ + "default": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: 10, // Last option takes precedence + Queue: "default", + }, + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + }, + { + desc: "With queue option", + task: task, + processAt: time.Now(), + opts: []Option{ + Queue("custom"), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "custom": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "custom", + }, + }, + }, + wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil + }, + { + desc: "Queue option should be case-insensitive", + task: task, + processAt: time.Now(), + opts: []Option{ + Queue("HIGH"), + }, + wantEnqueued: map[string][]*base.TaskMessage{ + "high": []*base.TaskMessage{ + &base.TaskMessage{ + Type: task.Type, + Payload: task.Payload.data, + Retry: defaultMaxRetry, + Queue: "high", + }, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil @@ -123,9 +169,11 @@ func TestClient(t *testing.T) { continue } - gotEnqueued := h.GetEnqueuedMessages(t, r) - if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.IgnoreIDOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.DefaultQueue, diff) + for qname, want := range tc.wantEnqueued { + gotEnqueued := h.GetEnqueuedMessages(t, r, qname) + if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt); diff != "" { + t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff) + } } gotScheduled := h.GetScheduledEntries(t, r) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 3a39c68..148637d 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -156,10 +156,16 @@ func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []ZSetEntry } } -// GetEnqueuedMessages returns all task messages in the default queue. -func GetEnqueuedMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { +// GetEnqueuedMessages returns all task messages in the specified queue. +// +// If queue name option is not passed, it defaults to the default queue. +func GetEnqueuedMessages(tb testing.TB, r *redis.Client, queueOpt ...string) []*base.TaskMessage { tb.Helper() - return getListMessages(tb, r, base.DefaultQueue) + queue := base.DefaultQueue + if len(queueOpt) > 0 { + queue = base.QueueKey(queueOpt[0]) + } + return getListMessages(tb, r, queue) } // GetInProgressMessages returns all task messages in the in-progress queue. diff --git a/internal/base/base.go b/internal/base/base.go index b1862c8..3ff3ede 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -11,18 +11,26 @@ import ( "github.com/rs/xid" ) +// DefaultQueueName is the queue name used if none are specified by user. +const DefaultQueueName = "default" + // Redis keys const ( - processedPrefix = "asynq:processed:" // STRING - asynq:processed: - failurePrefix = "asynq:failure:" // STRING - asynq:failure: - QueuePrefix = "asynq:queues:" // LIST - asynq:queues: - DefaultQueue = QueuePrefix + "default" // LIST - ScheduledQueue = "asynq:scheduled" // ZSET - RetryQueue = "asynq:retry" // ZSET - DeadQueue = "asynq:dead" // ZSET - InProgressQueue = "asynq:in_progress" // LIST + processedPrefix = "asynq:processed:" // STRING - asynq:processed: + failurePrefix = "asynq:failure:" // STRING - asynq:failure: + queuePrefix = "asynq:queues:" // LIST - asynq:queues: + DefaultQueue = queuePrefix + DefaultQueueName // LIST + ScheduledQueue = "asynq:scheduled" // ZSET + RetryQueue = "asynq:retry" // ZSET + DeadQueue = "asynq:dead" // ZSET + InProgressQueue = "asynq:in_progress" // LIST ) +// QueueKey returns a redis key string for the given queue name. +func QueueKey(qname string) string { + return queuePrefix + qname +} + // ProcessedKey returns a redis key string for procesed count // for the given day. func ProcessedKey(t time.Time) string { diff --git a/internal/base/base_test.go b/internal/base/base_test.go index a48f10e..52624c4 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -9,6 +9,22 @@ import ( "time" ) +func TestQueueKey(t *testing.T) { + tests := []struct { + qname string + want string + }{ + {"custom", "asynq:queues:custom"}, + } + + for _, tc := range tests { + got := QueueKey(tc.qname) + if got != tc.want { + t.Errorf("QueueKey(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + func TestProcessedKey(t *testing.T) { tests := []struct { input time.Time diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 8b41053..56bdc94 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -46,7 +46,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { if err != nil { return err } - qname := base.QueuePrefix + msg.Queue + qname := base.QueueKey(msg.Queue) return r.client.LPush(qname, string(bytes)).Err() }