2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Allow user to define a max retry count for a task

This commit is contained in:
Ken Hibino 2019-12-21 07:42:32 -08:00
parent fc66aaa47f
commit 4229073a24
4 changed files with 153 additions and 25 deletions

View File

@ -12,12 +12,8 @@ TODOs:
- [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment - [P0] Go docs + CONTRIBUTION.md + Github issue template + License comment
- [P0] Redis Sentinel support - [P0] Redis Sentinel support
- [P1] Add Support for multiple queues and priority - [P1] Add Support for multiple queues and priority
- [P1] User defined max-retry count
*/ */
// Max retry count by default
const defaultMaxRetry = 25
// Task represents a task to be performed. // Task represents a task to be performed.
type Task struct { type Task struct {
// Type indicates the kind of the task to be performed. // Type indicates the kind of the task to be performed.

View File

@ -9,6 +9,7 @@ import (
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid" "github.com/rs/xid"
) )
@ -30,6 +31,12 @@ const (
inProgressQ = "asynq:in_progress" // LIST inProgressQ = "asynq:in_progress" // LIST
) )
// scheduledEntry represents an item in redis sorted set (aka ZSET).
type sortedSetEntry struct {
msg *rdb.TaskMessage
score int64
}
func setup(t *testing.T) *redis.Client { func setup(t *testing.T) *redis.Client {
t.Helper() t.Helper()
r := redis.NewClient(&redis.Options{ r := redis.NewClient(&redis.Options{
@ -59,6 +66,16 @@ var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.T
return out return out
}) })
var sortZSetEntryOpt = cmp.Transformer("SortZSetEntry", func(in []sortedSetEntry) []sortedSetEntry {
out := append([]sortedSetEntry(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].msg.ID.String() < out[j].msg.ID.String()
})
return out
})
var ignoreIDOpt = cmpopts.IgnoreFields(rdb.TaskMessage{}, "ID")
func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage { func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage {
return &rdb.TaskMessage{ return &rdb.TaskMessage{
ID: xid.New(), ID: xid.New(),

View File

@ -3,8 +3,8 @@ package asynq
import ( import (
"time" "time"
"github.com/rs/xid"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
) )
// A Client is responsible for scheduling tasks. // A Client is responsible for scheduling tasks.
@ -23,17 +23,59 @@ func NewClient(cfg *RedisConfig) *Client {
return &Client{r} return &Client{r}
} }
// Option configures the behavior of task processing.
type Option interface{}
// max number of times a task will be retried.
type retryOption int
// MaxRetry returns an option to specify the max number of times
// a task will be retried.
//
// Negative retry count is treated as zero retry.
func MaxRetry(n int) Option {
if n < 0 {
n = 0
}
return retryOption(n)
}
type option struct {
retry int
}
func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
}
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res.retry = int(opt)
default:
// ignore unexpected option
}
}
return res
}
const (
// Max retry count by default
defaultMaxRetry = 25
)
// Process registers a task to be processed at the specified time. // Process registers a task to be processed at the specified time.
// //
// Process returns nil if the task is registered successfully, // Process returns nil if the task is registered successfully,
// otherwise returns non-nil error. // otherwise returns non-nil error.
func (c *Client) Process(task *Task, processAt time.Time) error { func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error {
opt := composeOptions(opts...)
msg := &rdb.TaskMessage{ msg := &rdb.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: task.Type, Type: task.Type,
Payload: task.Payload, Payload: task.Payload,
Queue: "default", Queue: "default",
Retry: defaultMaxRetry, Retry: opt.retry,
} }
return c.enqueue(msg, processAt) return c.enqueue(msg, processAt)
} }

View File

@ -1,32 +1,93 @@
package asynq package asynq
import ( import (
"github.com/hibiken/asynq/internal/rdb"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/hibiken/asynq/internal/rdb"
) )
func TestClient(t *testing.T) { func TestClient(t *testing.T) {
r := setup(t) r := setup(t)
client := &Client{rdb.NewRDB(r)} client := &Client{rdb.NewRDB(r)}
task := &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}
tests := []struct { tests := []struct {
desc string
task *Task task *Task
processAt time.Time processAt time.Time
wantQueueSize int64 opts []Option
wantScheduledSize int64 wantEnqueued []*rdb.TaskMessage
wantScheduled []sortedSetEntry
}{ }{
{ {
task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, desc: "Process task immediately",
task: task,
processAt: time.Now(), processAt: time.Now(),
wantQueueSize: 1, opts: []Option{},
wantScheduledSize: 0, wantEnqueued: []*rdb.TaskMessage{
&rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: defaultMaxRetry,
Queue: "default",
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
}, },
{ {
task: &Task{Type: "send_email", Payload: map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}}, desc: "Schedule task to be processed in the future",
task: task,
processAt: time.Now().Add(2 * time.Hour), processAt: time.Now().Add(2 * time.Hour),
wantQueueSize: 0, opts: []Option{},
wantScheduledSize: 1, wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []sortedSetEntry{
{
msg: &rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: defaultMaxRetry,
Queue: "default",
},
score: time.Now().Add(2 * time.Hour).Unix(),
},
},
},
{
desc: "Process task immediately with a custom retry count",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(3),
},
wantEnqueued: []*rdb.TaskMessage{
&rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 3,
Queue: "default",
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
},
{
desc: "Negative retry count",
task: task,
processAt: time.Now(),
opts: []Option{
MaxRetry(-2),
},
wantEnqueued: []*rdb.TaskMessage{
&rdb.TaskMessage{
Type: task.Type,
Payload: task.Payload,
Retry: 0, // Retry count should be set to zero
Queue: "default",
},
},
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
}, },
} }
@ -36,18 +97,30 @@ func TestClient(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err := client.Process(tc.task, tc.processAt) err := client.Process(tc.task, tc.processAt, tc.opts...)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue
} }
if l := r.LLen(defaultQ).Val(); l != tc.wantQueueSize { gotEnqueuedRaw := r.LRange(defaultQ, 0, -1).Val()
t.Errorf("%q has length %d, want %d", defaultQ, l, tc.wantQueueSize) gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, ignoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, defaultQ, diff)
} }
if l := r.ZCard(scheduledQ).Val(); l != tc.wantScheduledSize { gotScheduledRaw := r.ZRangeWithScores(scheduledQ, 0, -1).Val()
t.Errorf("%q has length %d, want %d", scheduledQ, l, tc.wantScheduledSize) var gotScheduled []sortedSetEntry
for _, z := range gotScheduledRaw {
gotScheduled = append(gotScheduled, sortedSetEntry{
msg: mustUnmarshal(t, z.Member.(string)),
score: int64(z.Score),
})
}
cmpOpt := cmp.AllowUnexported(sortedSetEntry{})
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, cmpOpt, ignoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, scheduledQ, diff)
} }
} }
} }