2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 19:38:49 +08:00

Merge pull request #21 from hibiken/feature/retrycount

Allow custom retry count
This commit is contained in:
Ken Hibino 2019-12-21 10:05:42 -08:00 committed by GitHub
commit 5de314400d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 175 additions and 26 deletions

View File

@ -12,12 +12,8 @@ TODOs:
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
- [P0] Redis Sentinel support - [P0] Redis Sentinel support
- [P1] Add Support for multiple queues and priority - [P1] Add Support for multiple queues and priority
- [P1] User defined max-retry count
*/ */
// Max retry count by default
const defaultMaxRetry = 25
// Task represents a task to be performed. // Task represents a task to be performed.
type Task struct { type Task struct {
// Type indicates the kind of the task to be performed. // Type indicates the kind of the task to be performed.

View File

@ -9,6 +9,7 @@ import (
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid" "github.com/rs/xid"
) )
@ -30,6 +31,12 @@ const (
inProgressQ = "asynq:in_progress" // LIST inProgressQ = "asynq:in_progress" // LIST
) )
// scheduledEntry represents an item in redis sorted set (aka ZSET).
type sortedSetEntry struct {
msg *rdb.TaskMessage
score int64
}
func setup(t *testing.T) *redis.Client { func setup(t *testing.T) *redis.Client {
t.Helper() t.Helper()
r := redis.NewClient(&redis.Options{ r := redis.NewClient(&redis.Options{
@ -59,6 +66,16 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.T
return out return out
}) })
var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry {
out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].msg.ID.String() < out[j].msg.ID.String()
})
return out
})
var ignoreIDOpt = cmpopts.IgnoreFields(rdb.TaskMessage{}, "ID")
func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage { func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage {
return &rdb.TaskMessage{ return &rdb.TaskMessage{
ID: xid.New(), ID: xid.New(),

View File

@ -3,8 +3,8 @@ package asynq
import ( import (
"time" "time"
"github.com/rs/xid"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
) )
// A Client is responsible for scheduling tasks. // A Client is responsible for scheduling tasks.
@ -23,17 +23,62 @@ func NewClient(cfg *RedisConfig) *Client {
return &Client{r} return &Client{r}
} }
// Option configures the behavior of task processing.
type Option interface{}
// max number of times a task will be retried.
type retryOption int
// MaxRetry returns an option to specify the max number of times
// a task will be retried.
//
// Negative retry count is treated as zero retry.
func MaxRetry(n int) Option {
if n < 0 {
n = 0
}
return retryOption(n)
}
type option struct {
retry int
}
func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
}
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res.retry = int(opt)
default:
// ignore unexpected option
}
}
return res
}
const (
// Max retry count by default
defaultMaxRetry = 25
)
// Process registers a task to be processed at the specified time. // Process registers a task to be processed at the specified time.
// //
// Process returns nil if the task is registered successfully, // Process returns nil if the task is registered successfully,
// otherwise returns non-nil error. // otherwise returns non-nil error.
func (c *Client) Process(task *Task, processAt time.Time) error { //
// opts specifies the behavior of task processing. If there are conflicting
// Option the last one overrides the ones before.
func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error {
opt := composeOptions(opts...)
msg := &rdb.TaskMessage{ msg := &rdb.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: task.Type, Type: task.Type,
Payload: task.Payload, Payload: task.Payload,
Queue: "default", Queue: "default",
Retry: defaultMaxRetry, Retry: opt.retry,
} }
return c.enqueue(msg, processAt) return c.enqueue(msg, processAt)
} }

View File

@ -1,32 +1,111 @@
package asynq package asynq
import ( import (
"github.com/hibiken/asynq/internal/rdb"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/hibiken/asynq/internal/rdb"
) )
func TestClient(t *testing.T) { func TestClient(t *testing.T) {
r := setup(t) r := setup(t)
client := &Client{rdb.NewRDB(r)} client := &Client{rdb.NewRDB(r)}
task := &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}
tests := []struct { tests := []struct {
task *Task desc string
processAt time.Time task *Task
wantQueueSize int64 processAt time.Time
wantScheduledSize int64 opts []Option
wantEnqueued []*rdb.TaskMessage
wantScheduled []sortedSetEntry
}{ }{
{ {
task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, desc: "Process task immediately",
processAt: time.Now(), task: task,
wantQueueSize: 1, processAt: time.Now(),
wantScheduledSize: 0, opts: []Option{},
wantEnqueued: []*rdb.TaskMessage{
&rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: defaultMaxRetry,
Queue: "default",
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
}, },
{ {
task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, desc: "Schedule task to be processed in the future",
processAt: time.Now().Add(2 * time.Hour), task: task,
wantQueueSize: 0, processAt: time.Now().Add(2 * time.Hour),
wantScheduledSize: 1, opts: []Option{},
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []sortedSetEntry{
{
msg: &rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: defaultMaxRetry,
Queue: "default",
},
score: time.Now().Add(2 * time.Hour).Unix(),
},
},
},
{
desc: "Process task immediately with a custom retry count",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(3),
},
wantEnqueued: []*rdb.TaskMessage{
&rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 3,
Queue: "default",
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Negative retry count",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(-2),
},
wantEnqueued: []*rdb.TaskMessage{
&rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 0, // Retry count should be set to zero
Queue: "default",
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Conflicting options",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(2),
MaxRetry(10),
},
wantEnqueued: []*rdb.TaskMessage{
&rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 10, // Last option takes precedence
Queue: "default",
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
}, },
} }
@ -36,18 +115,30 @@ func TestClient(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err := client.Process(tc.task, tc.processAt) err := client.Process(tc.task, tc.processAt, tc.opts...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue
} }
if l := r.LLen(defaultQ).Val(); l != tc.wantQueueSize { gotEnqueuedRaw := r.LRange(defaultQ, 0, -1).Val()
t.Errorf("%q has length %d, want %d", defaultQ, l, tc.wantQueueSize) gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, ignoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, defaultQ, diff)
} }
if l := r.ZCard(scheduledQ).Val(); l != tc.wantScheduledSize { gotScheduledRaw := r.ZRangeWithScores(scheduledQ, 0, -1).Val()
t.Errorf("%q has length %d, want %d", scheduledQ, l, tc.wantScheduledSize) var gotScheduled []sortedSetEntry
for _, z := range gotScheduledRaw {
gotScheduled = append(gotScheduled, sortedSetEntry{
msg: mustUnmarshal(t, z.Member.(string)),
score: int64(z.Score),
})
}
cmpOpt := cmp.AllowUnexported(sortedSetEntry{})
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, cmpOpt, ignoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, scheduledQ, diff)
} }
} }
} }

View File

@ -21,7 +21,7 @@ var rootCmd = &cobra.Command{
Short: "A monitoring tool for asynq queues", Short: "A monitoring tool for asynq queues",
Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package. Long: `Asynqmon is a CLI tool to inspect and monitor queues managed by asynq package.
Asynqmon has a few subcommands to query and mutate the current state of the queues. Asynqmon has a few commands to query and mutate the current state of the queues.
Monitoring commands such as "stats" and "ls" can be used in conjunction with the Monitoring commands such as "stats" and "ls" can be used in conjunction with the
"watch" command to continuously run the command at a certain interval. "watch" command to continuously run the command at a certain interval.