From 50b6034bf9c78cf5be212dcca89124b6f99d6c6e Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 7 Aug 2020 05:36:54 -0700 Subject: [PATCH] Move unique key generator function to base --- client.go | 40 +++-------------------- client_test.go | 66 -------------------------------------- internal/base/base.go | 30 +++++++++++++++++ internal/base/base_test.go | 57 ++++++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 101 deletions(-) diff --git a/client.go b/client.go index 6ac7a72..7bbe371 100644 --- a/client.go +++ b/client.go @@ -7,7 +7,6 @@ package asynq import ( "errors" "fmt" - "sort" "strings" "sync" "time" @@ -142,39 +141,6 @@ func composeOptions(opts ...Option) option { return res } -// uniqueKey computes the redis key used for the given task. -// It returns an empty string if ttl is zero. -func uniqueKey(t *Task, ttl time.Duration, qname string) string { - if ttl == 0 { - return "" - } - return fmt.Sprintf("%s:%s:%s", t.Type, serializePayload(t.Payload.data), qname) -} - -func serializePayload(payload map[string]interface{}) string { - if payload == nil { - return "nil" - } - type entry struct { - k string - v interface{} - } - var es []entry - for k, v := range payload { - es = append(es, entry{k, v}) - } - // sort entries by key - sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k }) - var b strings.Builder - for _, e := range es { - if b.Len() > 0 { - b.WriteString(",") - } - b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v)) - } - return b.String() -} - const ( // Default max retry count used if nothing is specified. defaultMaxRetry = 25 @@ -286,6 +252,10 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, er // If neither deadline nor timeout are set, use default timeout. timeout = defaultTimeout } + var uniqueKey string + if opt.uniqueTTL > 0 { + uniqueKey = base.UniqueKey(opt.queue, task.Type, task.Payload.data) + } msg := &base.TaskMessage{ ID: uuid.New(), Type: task.Type, @@ -294,7 +264,7 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, er Retry: opt.retry, Deadline: deadline.Unix(), Timeout: int64(timeout.Seconds()), - UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue), + UniqueKey: uniqueKey, } var err error now := time.Now() diff --git a/client_test.go b/client_test.go index 18d27a0..b02dbf0 100644 --- a/client_test.go +++ b/client_test.go @@ -561,72 +561,6 @@ func TestClientDefaultOptions(t *testing.T) { } } -func TestUniqueKey(t *testing.T) { - tests := []struct { - desc string - task *Task - ttl time.Duration - qname string - want string - }{ - { - "with zero TTL", - NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}), - 0, - "default", - "", - }, - { - "with primitive types", - NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}), - 10 * time.Minute, - "default", - "email:send:a=123,b=hello,c=true:default", - }, - { - "with unsorted keys", - NewTask("email:send", map[string]interface{}{"b": "hello", "c": true, "a": 123}), - 10 * time.Minute, - "default", - "email:send:a=123,b=hello,c=true:default", - }, - { - "with composite types", - NewTask("email:send", - map[string]interface{}{ - "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, - "names": []string{"bob", "mike", "rob"}}), - 10 * time.Minute, - "default", - "email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]:default", - }, - { - "with complex types", - NewTask("email:send", - map[string]interface{}{ - "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), - "duration": time.Hour}), - 10 * time.Minute, - "default", - "email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC:default", - }, - { - "with nil payload", - NewTask("reindex", nil), - 10 * time.Minute, - "default", - "reindex:nil:default", - }, - } - - for _, tc := range tests { - got := uniqueKey(tc.task, tc.ttl, tc.qname) - if got != tc.want { - t.Errorf("%s: uniqueKey(%v, %v, %q) = %q, want %q", tc.desc, tc.task, tc.ttl, tc.qname, got, tc.want) - } - } -} - func TestEnqueueUnique(t *testing.T) { r := setup(t) c := NewClient(RedisClientOpt{ diff --git a/internal/base/base.go b/internal/base/base.go index 3f92f87..2c0ad81 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -9,6 +9,7 @@ import ( "context" "encoding/json" "fmt" + "sort" "strings" "sync" "time" @@ -90,6 +91,35 @@ func WorkersKey(hostname string, pid int, sid string) string { return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid) } +// UniqueKey returns a redis key with the given type, payload, and queue name. +func UniqueKey(qname, tasktype string, payload map[string]interface{}) string { + return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, serializePayload(payload)) +} + +func serializePayload(payload map[string]interface{}) string { + if payload == nil { + return "nil" + } + type entry struct { + k string + v interface{} + } + var es []entry + for k, v := range payload { + es = append(es, entry{k, v}) + } + // sort entries by key + sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k }) + var b strings.Builder + for _, e := range es { + if b.Len() > 0 { + b.WriteString(",") + } + b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v)) + } + return b.String() +} + // TaskMessage is the internal representation of a task with additional metadata fields. // Serialized data of this type gets written to redis. type TaskMessage struct { diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 395dd1e..728d2e3 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -212,6 +212,63 @@ func TestWorkersKey(t *testing.T) { } } +func TestUniqueKey(t *testing.T) { + tests := []struct { + desc string + qname string + tasktype string + payload map[string]interface{} + want string + }{ + { + "with primitive types", + "default", + "email:send", + map[string]interface{}{"a": 123, "b": "hello", "c": true}, + "asynq:{default}:unique:email:send:a=123,b=hello,c=true", + }, + { + "with unsorted keys", + "default", + "email:send", + map[string]interface{}{"b": "hello", "c": true, "a": 123}, + "asynq:{default}:unique:email:send:a=123,b=hello,c=true", + }, + { + "with composite types", + "default", + "email:send", + map[string]interface{}{ + "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, + "names": []string{"bob", "mike", "rob"}}, + "asynq:{default}:unique:email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]", + }, + { + "with complex types", + "default", + "email:send", + map[string]interface{}{ + "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), + "duration": time.Hour}, + "asynq:{default}:unique:email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC", + }, + { + "with nil payload", + "default", + "reindex", + nil, + "asynq:{default}:unique:reindex:nil", + }, + } + + for _, tc := range tests { + got := UniqueKey(tc.qname, tc.tasktype, tc.payload) + if got != tc.want { + t.Errorf("%s: UniqueKey(%q, %q, %v) = %q, want %q", tc.desc, tc.qname, tc.tasktype, tc.payload, got, tc.want) + } + } +} + func TestMessageEncoding(t *testing.T) { id := uuid.New() tests := []struct {