2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 19:38:49 +08:00

feat: add custom unique key option

This commit is contained in:
nyako 2022-06-24 14:55:53 +08:00
parent a04ba6411d
commit 519985f195
4 changed files with 85 additions and 2 deletions

View File

@ -44,6 +44,7 @@ const (
TimeoutOpt
DeadlineOpt
UniqueOpt
UniqueKeyOpt
ProcessAtOpt
ProcessInOpt
TaskIDOpt
@ -71,6 +72,7 @@ type (
timeoutOption time.Duration
deadlineOption time.Time
uniqueOption time.Duration
uniqueKeyOption string
processAtOption time.Time
processInOption time.Duration
retentionOption time.Duration
@ -149,10 +151,11 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) }
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
// TTL duration must be greater than or equal to 1 second.
//
// Uniqueness of a task is based on the following properties:
// By default, the uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
// UniqueKey can be used to specify a custom string for calculating uniqueness, instead of task payload.
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
@ -161,6 +164,24 @@ func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", t
func (ttl uniqueOption) Type() OptionType { return UniqueOpt }
func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) }
// UniqueKey returns an option to define the custom uniqueness of a task.
// If uniqueKey is not empty, the uniqueness of a task is based on the following properties:
// - Task Type
// - UniqueKey
// - Queue Name
// Otherwise, task payload will be used, see Unique.
//
// UniqueKey should be used together with Unique.
func UniqueKey(uniqueKey string) Option {
return uniqueKeyOption(uniqueKey)
}
func (uniqueKey uniqueKeyOption) String() string {
return fmt.Sprintf("UniqueKey(%q)", string(uniqueKey))
}
func (uniqueKey uniqueKeyOption) Type() OptionType { return UniqueKeyOpt }
func (uniqueKey uniqueKeyOption) Value() interface{} { return string(uniqueKey) }
// ProcessAt returns an option to specify when to process the given task.
//
// If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others.
@ -223,6 +244,7 @@ type option struct {
timeout time.Duration
deadline time.Time
uniqueTTL time.Duration
uniqueKey string
processAt time.Time
retention time.Duration
group string
@ -267,6 +289,8 @@ func composeOptions(opts ...Option) (option, error) {
return option{}, errors.New("Unique TTL cannot be less than 1s")
}
res.uniqueTTL = ttl
case uniqueKeyOption:
res.uniqueKey = string(opt)
case processAtOption:
res.processAt = time.Time(opt)
case processInOption:
@ -365,8 +389,12 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
}
var uniqueKey string
if opt.uniqueTTL > 0 {
if opt.uniqueKey != "" {
uniqueKey = base.CustomUniqueKey(opt.queue, task.Type(), opt.uniqueKey)
} else {
uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload())
}
}
msg := &base.TaskMessage{
ID: opt.taskID,
Type: task.Type(),

View File

@ -1176,3 +1176,46 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
}
}
}
func TestClientEnqueueUniqueWithUniqueKeyOption(t *testing.T) {
r := setup(t)
c := NewClient(getRedisConnOpt(t))
defer c.Close()
tests := []struct {
task *Task
ttl time.Duration
}{
{
NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})),
time.Hour,
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
// Enqueue the task first. It should succeed.
_, err := c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key"))
if err != nil {
t.Fatal(err)
}
gotTTL := r.TTL(context.Background(), base.CustomUniqueKey(base.DefaultQueueName, tc.task.Type(), "custom_unique_key")).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
continue
}
// Enqueue the task again. It should fail.
_, err = c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key"))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}

View File

@ -945,6 +945,12 @@ func parseOption(s string) (Option, error) {
return nil, err
}
return Unique(d), nil
case "UniqueKey":
key, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return UniqueKey(key), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {

View File

@ -205,6 +205,12 @@ func UniqueKey(qname, tasktype string, payload []byte) string {
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:]))
}
// CustomUniqueKey returns a redis key with the given type, custom key, and queue name.
func CustomUniqueKey(qname, tasktype string, customKey string) string {
checksum := md5.Sum([]byte(customKey))
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:]))
}
// GroupKeyPrefix returns a prefix for group key.
func GroupKeyPrefix(qname string) string {
return fmt.Sprintf("%sg:", QueueKeyPrefix(qname))