From 519985f195d2d89ee5e9911c7411faacb7473b28 Mon Sep 17 00:00:00 2001 From: nyako Date: Fri, 24 Jun 2022 14:55:53 +0800 Subject: [PATCH] feat: add custom unique key option --- client.go | 32 ++++++++++++++++++++++++++++++-- client_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++ inspector.go | 6 ++++++ internal/base/base.go | 6 ++++++ 4 files changed, 85 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index 7948036..69aa30d 100644 --- a/client.go +++ b/client.go @@ -44,6 +44,7 @@ const ( TimeoutOpt DeadlineOpt UniqueOpt + UniqueKeyOpt ProcessAtOpt ProcessInOpt TaskIDOpt @@ -71,6 +72,7 @@ type ( timeoutOption time.Duration deadlineOption time.Time uniqueOption time.Duration + uniqueKeyOption string processAtOption time.Time processInOption time.Duration retentionOption time.Duration @@ -149,10 +151,11 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) } // ErrDuplicateTask error is returned when enqueueing a duplicate task. // TTL duration must be greater than or equal to 1 second. // -// Uniqueness of a task is based on the following properties: +// By default, the uniqueness of a task is based on the following properties: // - Task Type // - Task Payload // - Queue Name +// UniqueKey can be used to specify a custom string for calculating uniqueness, instead of task payload. func Unique(ttl time.Duration) Option { return uniqueOption(ttl) } @@ -161,6 +164,24 @@ func (ttl uniqueOption) String() string { return fmt.Sprintf("Unique(%v)", t func (ttl uniqueOption) Type() OptionType { return UniqueOpt } func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) } +// UniqueKey returns an option to define the custom uniqueness of a task. +// If uniqueKey is not empty, the uniqueness of a task is based on the following properties: +// - Task Type +// - UniqueKey +// - Queue Name +// Otherwise, task payload will be used, see Unique. +// +// UniqueKey should be used together with Unique. +func UniqueKey(uniqueKey string) Option { + return uniqueKeyOption(uniqueKey) +} + +func (uniqueKey uniqueKeyOption) String() string { + return fmt.Sprintf("UniqueKey(%q)", string(uniqueKey)) +} +func (uniqueKey uniqueKeyOption) Type() OptionType { return UniqueKeyOpt } +func (uniqueKey uniqueKeyOption) Value() interface{} { return string(uniqueKey) } + // ProcessAt returns an option to specify when to process the given task. // // If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others. @@ -223,6 +244,7 @@ type option struct { timeout time.Duration deadline time.Time uniqueTTL time.Duration + uniqueKey string processAt time.Time retention time.Duration group string @@ -267,6 +289,8 @@ func composeOptions(opts ...Option) (option, error) { return option{}, errors.New("Unique TTL cannot be less than 1s") } res.uniqueTTL = ttl + case uniqueKeyOption: + res.uniqueKey = string(opt) case processAtOption: res.processAt = time.Time(opt) case processInOption: @@ -365,7 +389,11 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) } var uniqueKey string if opt.uniqueTTL > 0 { - uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload()) + if opt.uniqueKey != "" { + uniqueKey = base.CustomUniqueKey(opt.queue, task.Type(), opt.uniqueKey) + } else { + uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload()) + } } msg := &base.TaskMessage{ ID: opt.taskID, diff --git a/client_test.go b/client_test.go index da24d13..17ab206 100644 --- a/client_test.go +++ b/client_test.go @@ -1176,3 +1176,46 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { } } } + +func TestClientEnqueueUniqueWithUniqueKeyOption(t *testing.T) { + r := setup(t) + c := NewClient(getRedisConnOpt(t)) + defer c.Close() + + tests := []struct { + task *Task + ttl time.Duration + }{ + { + NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), + time.Hour, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + + // Enqueue the task first. It should succeed. + _, err := c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key")) + if err != nil { + t.Fatal(err) + } + + gotTTL := r.TTL(context.Background(), base.CustomUniqueKey(base.DefaultQueueName, tc.task.Type(), "custom_unique_key")).Val() + if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { + t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl) + continue + } + + // Enqueue the task again. It should fail. + _, err = c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key")) + if err == nil { + t.Errorf("Enqueueing %+v did not return an error", tc.task) + continue + } + if !errors.Is(err, ErrDuplicateTask) { + t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task) + continue + } + } +} diff --git a/inspector.go b/inspector.go index e583cf6..7b4edbd 100644 --- a/inspector.go +++ b/inspector.go @@ -945,6 +945,12 @@ func parseOption(s string) (Option, error) { return nil, err } return Unique(d), nil + case "UniqueKey": + key, err := strconv.Unquote(arg) + if err != nil { + return nil, err + } + return UniqueKey(key), nil case "ProcessAt": t, err := time.Parse(time.UnixDate, arg) if err != nil { diff --git a/internal/base/base.go b/internal/base/base.go index ec342f8..f06c264 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -205,6 +205,12 @@ func UniqueKey(qname, tasktype string, payload []byte) string { return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:])) } +// CustomUniqueKey returns a redis key with the given type, custom key, and queue name. +func CustomUniqueKey(qname, tasktype string, customKey string) string { + checksum := md5.Sum([]byte(customKey)) + return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:])) +} + // GroupKeyPrefix returns a prefix for group key. func GroupKeyPrefix(qname string) string { return fmt.Sprintf("%sg:", QueueKeyPrefix(qname))