2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Move unique key generator function to base

This commit is contained in:
Ken Hibino 2020-08-07 05:36:54 -07:00
parent 154113d0d0
commit 50b6034bf9
4 changed files with 92 additions and 101 deletions

View File

@ -7,7 +7,6 @@ package asynq
import ( import (
"errors" "errors"
"fmt" "fmt"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -142,39 +141,6 @@ func composeOptions(opts ...Option) option {
return res return res
} }
// uniqueKey computes the redis key used for the given task.
// It returns an empty string if ttl is zero.
func uniqueKey(t *Task, ttl time.Duration, qname string) string {
if ttl == 0 {
return ""
}
return fmt.Sprintf("%s:%s:%s", t.Type, serializePayload(t.Payload.data), qname)
}
func serializePayload(payload map[string]interface{}) string {
if payload == nil {
return "nil"
}
type entry struct {
k string
v interface{}
}
var es []entry
for k, v := range payload {
es = append(es, entry{k, v})
}
// sort entries by key
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
var b strings.Builder
for _, e := range es {
if b.Len() > 0 {
b.WriteString(",")
}
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
}
return b.String()
}
const ( const (
// Default max retry count used if nothing is specified. // Default max retry count used if nothing is specified.
defaultMaxRetry = 25 defaultMaxRetry = 25
@ -286,6 +252,10 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, er
// If neither deadline nor timeout are set, use default timeout. // If neither deadline nor timeout are set, use default timeout.
timeout = defaultTimeout timeout = defaultTimeout
} }
var uniqueKey string
if opt.uniqueTTL > 0 {
uniqueKey = base.UniqueKey(opt.queue, task.Type, task.Payload.data)
}
msg := &base.TaskMessage{ msg := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: task.Type, Type: task.Type,
@ -294,7 +264,7 @@ func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) (*Result, er
Retry: opt.retry, Retry: opt.retry,
Deadline: deadline.Unix(), Deadline: deadline.Unix(),
Timeout: int64(timeout.Seconds()), Timeout: int64(timeout.Seconds()),
UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue), UniqueKey: uniqueKey,
} }
var err error var err error
now := time.Now() now := time.Now()

View File

@ -561,72 +561,6 @@ func TestClientDefaultOptions(t *testing.T) {
} }
} }
func TestUniqueKey(t *testing.T) {
tests := []struct {
desc string
task *Task
ttl time.Duration
qname string
want string
}{
{
"with zero TTL",
NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}),
0,
"default",
"",
},
{
"with primitive types",
NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}),
10 * time.Minute,
"default",
"email:send:a=123,b=hello,c=true:default",
},
{
"with unsorted keys",
NewTask("email:send", map[string]interface{}{"b": "hello", "c": true, "a": 123}),
10 * time.Minute,
"default",
"email:send:a=123,b=hello,c=true:default",
},
{
"with composite types",
NewTask("email:send",
map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}}),
10 * time.Minute,
"default",
"email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]:default",
},
{
"with complex types",
NewTask("email:send",
map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour}),
10 * time.Minute,
"default",
"email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC:default",
},
{
"with nil payload",
NewTask("reindex", nil),
10 * time.Minute,
"default",
"reindex:nil:default",
},
}
for _, tc := range tests {
got := uniqueKey(tc.task, tc.ttl, tc.qname)
if got != tc.want {
t.Errorf("%s: uniqueKey(%v, %v, %q) = %q, want %q", tc.desc, tc.task, tc.ttl, tc.qname, got, tc.want)
}
}
}
func TestEnqueueUnique(t *testing.T) { func TestEnqueueUnique(t *testing.T) {
r := setup(t) r := setup(t)
c := NewClient(RedisClientOpt{ c := NewClient(RedisClientOpt{

View File

@ -9,6 +9,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -90,6 +91,35 @@ func WorkersKey(hostname string, pid int, sid string) string {
return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid) return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid)
} }
// UniqueKey returns a redis key with the given type, payload, and queue name.
func UniqueKey(qname, tasktype string, payload map[string]interface{}) string {
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, serializePayload(payload))
}
func serializePayload(payload map[string]interface{}) string {
if payload == nil {
return "nil"
}
type entry struct {
k string
v interface{}
}
var es []entry
for k, v := range payload {
es = append(es, entry{k, v})
}
// sort entries by key
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
var b strings.Builder
for _, e := range es {
if b.Len() > 0 {
b.WriteString(",")
}
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
}
return b.String()
}
// TaskMessage is the internal representation of a task with additional metadata fields. // TaskMessage is the internal representation of a task with additional metadata fields.
// Serialized data of this type gets written to redis. // Serialized data of this type gets written to redis.
type TaskMessage struct { type TaskMessage struct {

View File

@ -212,6 +212,63 @@ func TestWorkersKey(t *testing.T) {
} }
} }
func TestUniqueKey(t *testing.T) {
tests := []struct {
desc string
qname string
tasktype string
payload map[string]interface{}
want string
}{
{
"with primitive types",
"default",
"email:send",
map[string]interface{}{"a": 123, "b": "hello", "c": true},
"asynq:{default}:unique:email:send:a=123,b=hello,c=true",
},
{
"with unsorted keys",
"default",
"email:send",
map[string]interface{}{"b": "hello", "c": true, "a": 123},
"asynq:{default}:unique:email:send:a=123,b=hello,c=true",
},
{
"with composite types",
"default",
"email:send",
map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}},
"asynq:{default}:unique:email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]",
},
{
"with complex types",
"default",
"email:send",
map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour},
"asynq:{default}:unique:email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC",
},
{
"with nil payload",
"default",
"reindex",
nil,
"asynq:{default}:unique:reindex:nil",
},
}
for _, tc := range tests {
got := UniqueKey(tc.qname, tc.tasktype, tc.payload)
if got != tc.want {
t.Errorf("%s: UniqueKey(%q, %q, %v) = %q, want %q", tc.desc, tc.qname, tc.tasktype, tc.payload, got, tc.want)
}
}
}
func TestMessageEncoding(t *testing.T) { func TestMessageEncoding(t *testing.T) {
id := uuid.New() id := uuid.New()
tests := []struct { tests := []struct {