diff --git a/internal/base/base.go b/internal/base/base.go index 6123dd1..997e1d2 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -7,6 +7,8 @@ package base import ( "context" + "crypto/md5" + "encoding/hex" "fmt" "sync" "time" @@ -170,9 +172,12 @@ func SchedulerHistoryKey(entryID string) string { } // UniqueKey returns a redis key with the given type, payload, and queue name. -// FIXME: We probably need to generate a hash of payload to make this key unique func UniqueKey(qname, tasktype string, payload []byte) string { - return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, string(payload)) + if payload == nil { + return fmt.Sprintf("%sunique:%s:", QueueKeyPrefix(qname), tasktype) + } + checksum := md5.Sum(payload) + return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:])) } // TaskMessage is the internal representation of a task with additional metadata fields. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index ebd15ce..9df725e 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -6,6 +6,8 @@ package base import ( "context" + "crypto/md5" + "encoding/hex" "encoding/json" "fmt" "sync" @@ -276,6 +278,19 @@ func toBytes(m map[string]interface{}) []byte { } func TestUniqueKey(t *testing.T) { + payload1 := toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}) + payload2 := toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}) + payload3 := toBytes(map[string]interface{}{ + "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, + "names": []string{"bob", "mike", "rob"}}) + payload4 := toBytes(map[string]interface{}{ + "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), + "duration": time.Hour}) + + checksum := func(data []byte) string { + sum := md5.Sum(data) + return hex.EncodeToString(sum[:]) + } tests := []struct { desc string qname string @@ -287,41 +302,29 @@ func TestUniqueKey(t *testing.T) { "with primitive types", "default", "email:send", - toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}), - fmt.Sprintf("asynq:{default}:unique:email:send:%s", - string(toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}))), + payload1, + fmt.Sprintf("asynq:{default}:unique:email:send:%s", checksum(payload1)), }, { "with unsorted keys", "default", "email:send", - toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}), - fmt.Sprintf("asynq:{default}:unique:email:send:%s", - string(toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}))), + payload2, + fmt.Sprintf("asynq:{default}:unique:email:send:%s", checksum(payload2)), }, { "with composite types", "default", "email:send", - toBytes(map[string]interface{}{ - "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, - "names": []string{"bob", "mike", "rob"}}), - fmt.Sprintf("asynq:{default}:unique:email:send:%s", - string(toBytes(map[string]interface{}{ - "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, - "names": []string{"bob", "mike", "rob"}}))), + payload3, + fmt.Sprintf("asynq:{default}:unique:email:send:%s", checksum(payload3)), }, { "with complex types", "default", "email:send", - toBytes(map[string]interface{}{ - "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), - "duration": time.Hour}), - fmt.Sprintf("asynq:{default}:unique:email:send:%s", - string(toBytes(map[string]interface{}{ - "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), - "duration": time.Hour}))), + payload4, + fmt.Sprintf("asynq:{default}:unique:email:send:%s", checksum(payload4)), }, { "with nil payload",