mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-20 07:40:19 +08:00
Merge 69f4dd7eb072da5ceb5d95552d4239363fc5bbbb into 489e21920b92ae6acfc19c54de91166e56817620
This commit is contained in:
commit
1b0e7159a8
30
client.go
30
client.go
@ -55,6 +55,7 @@ const (
|
|||||||
TimeoutOpt
|
TimeoutOpt
|
||||||
DeadlineOpt
|
DeadlineOpt
|
||||||
UniqueOpt
|
UniqueOpt
|
||||||
|
UniqueKeyOpt
|
||||||
ProcessAtOpt
|
ProcessAtOpt
|
||||||
ProcessInOpt
|
ProcessInOpt
|
||||||
TaskIDOpt
|
TaskIDOpt
|
||||||
@ -82,6 +83,7 @@ type (
|
|||||||
timeoutOption time.Duration
|
timeoutOption time.Duration
|
||||||
deadlineOption time.Time
|
deadlineOption time.Time
|
||||||
uniqueOption time.Duration
|
uniqueOption time.Duration
|
||||||
|
uniqueKeyOption string
|
||||||
processAtOption time.Time
|
processAtOption time.Time
|
||||||
processInOption time.Duration
|
processInOption time.Duration
|
||||||
retentionOption time.Duration
|
retentionOption time.Duration
|
||||||
@ -160,10 +162,11 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
|
|||||||
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
|
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
|
||||||
// TTL duration must be greater than or equal to 1 second.
|
// TTL duration must be greater than or equal to 1 second.
|
||||||
//
|
//
|
||||||
// Uniqueness of a task is based on the following properties:
|
// By default, the uniqueness of a task is based on the following properties:
|
||||||
// - Task Type
|
// - Task Type
|
||||||
// - Task Payload
|
// - Task Payload
|
||||||
// - Queue Name
|
// - Queue Name
|
||||||
|
// UniqueKey can be used to specify a custom string for calculating uniqueness, instead of task payload.
|
||||||
func Unique(ttl time.Duration) Option {
|
func Unique(ttl time.Duration) Option {
|
||||||
return uniqueOption(ttl)
|
return uniqueOption(ttl)
|
||||||
}
|
}
|
||||||
@ -172,6 +175,24 @@ func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", t
|
|||||||
func (ttl uniqueOption) Type() OptionType { return UniqueOpt }
|
func (ttl uniqueOption) Type() OptionType { return UniqueOpt }
|
||||||
func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) }
|
func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) }
|
||||||
|
|
||||||
|
// UniqueKey returns an option to define the custom uniqueness of a task.
|
||||||
|
// If uniqueKey is not empty, the uniqueness of a task is based on the following properties:
|
||||||
|
// - Task Type
|
||||||
|
// - UniqueKey
|
||||||
|
// - Queue Name
|
||||||
|
// Otherwise, task payload will be used, see Unique.
|
||||||
|
//
|
||||||
|
// UniqueKey should be used together with Unique.
|
||||||
|
func UniqueKey(uniqueKey string) Option {
|
||||||
|
return uniqueKeyOption(uniqueKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (uniqueKey uniqueKeyOption) String() string {
|
||||||
|
return fmt.Sprintf("UniqueKey(%q)", string(uniqueKey))
|
||||||
|
}
|
||||||
|
func (uniqueKey uniqueKeyOption) Type() OptionType { return UniqueKeyOpt }
|
||||||
|
func (uniqueKey uniqueKeyOption) Value() interface{} { return string(uniqueKey) }
|
||||||
|
|
||||||
// ProcessAt returns an option to specify when to process the given task.
|
// ProcessAt returns an option to specify when to process the given task.
|
||||||
//
|
//
|
||||||
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
|
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
|
||||||
@ -234,6 +255,7 @@ type option struct {
|
|||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
deadline time.Time
|
deadline time.Time
|
||||||
uniqueTTL time.Duration
|
uniqueTTL time.Duration
|
||||||
|
uniqueKey string
|
||||||
processAt time.Time
|
processAt time.Time
|
||||||
retention time.Duration
|
retention time.Duration
|
||||||
group string
|
group string
|
||||||
@ -278,6 +300,8 @@ func composeOptions(opts ...Option) (option, error) {
|
|||||||
return option{}, errors.New("Unique TTL cannot be less than 1s")
|
return option{}, errors.New("Unique TTL cannot be less than 1s")
|
||||||
}
|
}
|
||||||
res.uniqueTTL = ttl
|
res.uniqueTTL = ttl
|
||||||
|
case uniqueKeyOption:
|
||||||
|
res.uniqueKey = string(opt)
|
||||||
case processAtOption:
|
case processAtOption:
|
||||||
res.processAt = time.Time(opt)
|
res.processAt = time.Time(opt)
|
||||||
case processInOption:
|
case processInOption:
|
||||||
@ -379,8 +403,12 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
|||||||
}
|
}
|
||||||
var uniqueKey string
|
var uniqueKey string
|
||||||
if opt.uniqueTTL > 0 {
|
if opt.uniqueTTL > 0 {
|
||||||
|
if opt.uniqueKey != "" {
|
||||||
|
uniqueKey = base.CustomUniqueKey(opt.queue, task.Type(), opt.uniqueKey)
|
||||||
|
} else {
|
||||||
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
|
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
|
||||||
}
|
}
|
||||||
|
}
|
||||||
msg := &base.TaskMessage{
|
msg := &base.TaskMessage{
|
||||||
ID: opt.taskID,
|
ID: opt.taskID,
|
||||||
Type: task.Type(),
|
Type: task.Type(),
|
||||||
|
@ -1191,3 +1191,46 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientEnqueueUniqueWithUniqueKeyOption(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
c := NewClient(getRedisConnOpt(t))
|
||||||
|
defer c.Close()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
task *Task
|
||||||
|
ttl time.Duration
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})),
|
||||||
|
time.Hour,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
|
|
||||||
|
// Enqueue the task first. It should succeed.
|
||||||
|
_, err := c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
gotTTL := r.TTL(context.Background(), base.CustomUniqueKey(base.DefaultQueueName, tc.task.Type(), "custom_unique_key")).Val()
|
||||||
|
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
||||||
|
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enqueue the task again. It should fail.
|
||||||
|
_, err = c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key"))
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Enqueueing %+v did not return an error", tc.task)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !errors.Is(err, ErrDuplicateTask) {
|
||||||
|
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -960,6 +960,12 @@ func parseOption(s string) (Option, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return Unique(d), nil
|
return Unique(d), nil
|
||||||
|
case "UniqueKey":
|
||||||
|
key, err := strconv.Unquote(arg)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return UniqueKey(key), nil
|
||||||
case "ProcessAt":
|
case "ProcessAt":
|
||||||
t, err := time.Parse(time.UnixDate, arg)
|
t, err := time.Parse(time.UnixDate, arg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -205,6 +205,12 @@ func UniqueKey(qname, tasktype string, payload []byte) string {
|
|||||||
return QueueKeyPrefix(qname) + "unique:" + tasktype + ":" + hex.EncodeToString(checksum[:])
|
return QueueKeyPrefix(qname) + "unique:" + tasktype + ":" + hex.EncodeToString(checksum[:])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CustomUniqueKey returns a redis key with the given type, custom key, and queue name.
|
||||||
|
func CustomUniqueKey(qname, tasktype string, customKey string) string {
|
||||||
|
checksum := md5.Sum([]byte(customKey))
|
||||||
|
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:]))
|
||||||
|
}
|
||||||
|
|
||||||
// GroupKeyPrefix returns a prefix for group key.
|
// GroupKeyPrefix returns a prefix for group key.
|
||||||
func GroupKeyPrefix(qname string) string {
|
func GroupKeyPrefix(qname string) string {
|
||||||
return QueueKeyPrefix(qname) + "g:"
|
return QueueKeyPrefix(qname) + "g:"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user