2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Add Queue option to allow user to specify queue from client

Added base.QueueKey method to get redis key for given queue name.
Changed asynqtest.GetEnqueuedMessages to optionally take queue name.
This commit is contained in:
Ken Hibino 2020-01-06 06:53:40 -08:00
parent 29ad70c36a
commit ca78b92078
6 changed files with 136 additions and 43 deletions

View File

@ -5,6 +5,7 @@
package asynq
import (
"strings"
"time"
"github.com/go-redis/redis/v7"
@ -32,8 +33,11 @@ func NewClient(r *redis.Client) *Client {
// Option specifies the processing behavior for the associated task.
type Option interface{}
// max number of times a task will be retried.
type retryOption int
// Internal option representations.
type (
retryOption int
queueOption string
)
// MaxRetry returns an option to specify the max number of times
// a task will be retried.
@ -46,18 +50,29 @@ func MaxRetry(n int) Option {
return retryOption(n)
}
// Queue returns an option to specify which queue to enqueue this task into.
//
// Queue name is case-insensitive and the lowercased version is used.
func Queue(qname string) Option {
return queueOption(strings.ToLower(qname))
}
type option struct {
retry int
queue string
}
func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
queue: base.DefaultQueueName,
}
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res.retry = int(opt)
case queueOption:
res.queue = string(opt)
default:
// ignore unexpected option
}
@ -83,7 +98,7 @@ func (c *Client) Schedule(task *Task, processAt time.Time, opts ...Option) error
ID: xid.New(),
Type: task.Type,
Payload: task.Payload.data,
Queue: "default",
Queue: opt.queue,
Retry: opt.retry,
}
return c.enqueue(msg, processAt)

View File

@ -24,7 +24,7 @@ func TestClient(t *testing.T) {
task *Task
processAt time.Time
opts []Option
wantEnqueued []*base.TaskMessage
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []h.ZSetEntry
}{
{
@ -32,12 +32,14 @@ func TestClient(t *testing.T) {
task: task,
processAt: time.Now(),
opts: []Option{},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@ -67,12 +69,14 @@ func TestClient(t *testing.T) {
opts: []Option{
MaxRetry(3),
},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@ -84,12 +88,14 @@ func TestClient(t *testing.T) {
opts: []Option{
MaxRetry(-2),
},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@ -102,12 +108,52 @@ func TestClient(t *testing.T) {
MaxRetry(2),
MaxRetry(10),
},
wantEnqueued: []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "With queue option",
task: task,
processAt: time.Now(),
opts: []Option{
Queue("custom"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"custom": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Queue option should be case-insensitive",
task: task,
processAt: time.Now(),
opts: []Option{
Queue("HIGH"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"high": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
},
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
@ -123,9 +169,11 @@ func TestClient(t *testing.T) {
continue
}
gotEnqueued := h.GetEnqueuedMessages(t, r)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.DefaultQueue, diff)
for qname, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
if diff := cmp.Diff(want, gotEnqueued, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
}
}
gotScheduled := h.GetScheduledEntries(t, r)

View File

@ -156,10 +156,16 @@ func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []ZSetEntry
}
}
// GetEnqueuedMessages returns all task messages in the default queue.
func GetEnqueuedMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
// GetEnqueuedMessages returns all task messages in the specified queue.
//
// If queue name option is not passed, it defaults to the default queue.
func GetEnqueuedMessages(tb testing.TB, r *redis.Client, queueOpt ...string) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.DefaultQueue)
queue := base.DefaultQueue
if len(queueOpt) > 0 {
queue = base.QueueKey(queueOpt[0])
}
return getListMessages(tb, r, queue)
}
// GetInProgressMessages returns all task messages in the in-progress queue.

View File

@ -11,18 +11,26 @@ import (
"github.com/rs/xid"
)
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
// Redis keys
const (
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
DefaultQueue = QueuePrefix + "default" // LIST
ScheduledQueue = "asynq:scheduled" // ZSET
RetryQueue = "asynq:retry" // ZSET
DeadQueue = "asynq:dead" // ZSET
InProgressQueue = "asynq:in_progress" // LIST
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
DefaultQueue = queuePrefix + DefaultQueueName // LIST
ScheduledQueue = "asynq:scheduled" // ZSET
RetryQueue = "asynq:retry" // ZSET
DeadQueue = "asynq:dead" // ZSET
InProgressQueue = "asynq:in_progress" // LIST
)
// QueueKey returns a redis key string for the given queue name.
func QueueKey(qname string) string {
return queuePrefix + qname
}
// ProcessedKey returns a redis key string for procesed count
// for the given day.
func ProcessedKey(t time.Time) string {

View File

@ -9,6 +9,22 @@ import (
"time"
)
func TestQueueKey(t *testing.T) {
tests := []struct {
qname string
want string
}{
{"custom", "asynq:queues:custom"},
}
for _, tc := range tests {
got := QueueKey(tc.qname)
if got != tc.want {
t.Errorf("QueueKey(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}
func TestProcessedKey(t *testing.T) {
tests := []struct {
input time.Time

View File

@ -46,7 +46,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
if err != nil {
return err
}
qname := base.QueuePrefix + msg.Queue
qname := base.QueueKey(msg.Queue)
return r.client.LPush(qname, string(bytes)).Err()
}