From c33dd447ac75fd7f9a2fb1e155d46f36eccb28f0 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 18 Mar 2020 06:49:39 -0700 Subject: [PATCH] Allow client to enqueue a task with unique option Changes: - Added Unique option for clients - Require go v.13 or above (to use new errors wrapping functions) - Fixed adding queue key to all-queues set (asynq:queues) when scheduling. --- .travis.yml | 4 +- CHANGELOG.md | 4 + README.md | 2 +- client.go | 106 +++++++++++++++++--- client_test.go | 209 +++++++++++++++++++++++++++++++++++++++ internal/base/base.go | 5 + internal/rdb/rdb.go | 113 ++++++++++++++++++++- internal/rdb/rdb_test.go | 126 ++++++++++++++++++++++- 8 files changed, 542 insertions(+), 27 deletions(-) diff --git a/.travis.yml b/.travis.yml index b044218..9d47394 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,9 +2,7 @@ language: go go_import_path: github.com/hibiken/asynq git: depth: 1 -env: - - GO111MODULE=on # go modules are the default -go: [1.12.x, 1.13.x, 1.14.x] +go: [1.13.x, 1.14.x] script: - go test -race -v -coverprofile=coverage.txt -covermode=atomic ./... services: diff --git a/CHANGELOG.md b/CHANGELOG.md index dc65560..497c33a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- `Unique` option was added to allow client to enqueue a task only if it's unique within a certain time period. + ## [0.6.2] - 2020-03-15 ### Added diff --git a/README.md b/README.md index 8e29800..a1f7a32 100644 --- a/README.md +++ b/README.md @@ -205,7 +205,7 @@ go get -u github.com/hibiken/asynq/tools/asynqmon | Dependency | Version | | -------------------------- | ------- | | [Redis](https://redis.io/) | v2.8+ | -| [Go](https://golang.org/) | v1.12+ | +| [Go](https://golang.org/) | v1.13+ | ## Contributing diff --git a/client.go b/client.go index b315d0e..79cf38c 100644 --- a/client.go +++ b/client.go @@ -5,6 +5,9 @@ package asynq import ( + "errors" + "fmt" + "sort" "strings" "time" @@ -38,6 +41,7 @@ type ( queueOption string timeoutOption time.Duration deadlineOption time.Time + uniqueOption time.Duration ) // MaxRetry returns an option to specify the max number of times @@ -70,11 +74,30 @@ func Deadline(t time.Time) Option { return deadlineOption(t) } +// Unique returns an option to enqueue a task only if the given task is unique. +// Task enqueued with this option is guaranteed to be unique within the given ttl. +// Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued. +// ErrDuplicateTask error is returned when enqueueing a duplicate task. +// +// Uniqueness of a task is based on the following properties: +// - Task Type +// - Task Payload +// - Queue Name +func Unique(ttl time.Duration) Option { + return uniqueOption(ttl) +} + +// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. +// +// ErrDuplicateTask error only applies to tasks enqueued with a Unique option. +var ErrDuplicateTask = errors.New("task already exists") + type option struct { - retry int - queue string - timeout time.Duration - deadline time.Time + retry int + queue string + timeout time.Duration + deadline time.Time + uniqueTTL time.Duration } func composeOptions(opts ...Option) option { @@ -94,6 +117,8 @@ func composeOptions(opts ...Option) option { res.timeout = time.Duration(opt) case deadlineOption: res.deadline = time.Time(opt) + case uniqueOption: + res.uniqueTTL = time.Duration(opt) default: // ignore unexpected option } @@ -101,6 +126,39 @@ func composeOptions(opts ...Option) option { return res } +// uniqueKey computes the redis key used for the given task. +// It returns an empty string if ttl is zero. +func uniqueKey(t *Task, ttl time.Duration, qname string) string { + if ttl == 0 { + return "" + } + return fmt.Sprintf("%s:%s:%s", t.Type, serializePayload(t.Payload.data), qname) +} + +func serializePayload(payload map[string]interface{}) string { + if payload == nil { + return "nil" + } + type entry struct { + k string + v interface{} + } + var es []entry + for k, v := range payload { + es = append(es, entry{k, v}) + } + // sort entries by key + sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k }) + var b strings.Builder + for _, e := range es { + if b.Len() > 0 { + b.WriteString(",") + } + b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v)) + } + return b.String() +} + const ( // Max retry count by default defaultMaxRetry = 25 @@ -115,15 +173,25 @@ const ( func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { opt := composeOptions(opts...) msg := &base.TaskMessage{ - ID: xid.New(), - Type: task.Type, - Payload: task.Payload.data, - Queue: opt.queue, - Retry: opt.retry, - Timeout: opt.timeout.String(), - Deadline: opt.deadline.Format(time.RFC3339), + ID: xid.New(), + Type: task.Type, + Payload: task.Payload.data, + Queue: opt.queue, + Retry: opt.retry, + Timeout: opt.timeout.String(), + Deadline: opt.deadline.Format(time.RFC3339), + UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue), } - return c.enqueue(msg, t) + var err error + if time.Now().After(t) { + err = c.enqueue(msg, opt.uniqueTTL) + } else { + err = c.schedule(msg, t, opt.uniqueTTL) + } + if err == rdb.ErrDuplicateTask { + return fmt.Errorf("%w", ErrDuplicateTask) + } + return err } // Enqueue enqueues task to be processed immediately. @@ -146,9 +214,17 @@ func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error { return c.EnqueueAt(time.Now().Add(d), task, opts...) } -func (c *Client) enqueue(msg *base.TaskMessage, t time.Time) error { - if time.Now().After(t) { - return c.rdb.Enqueue(msg) +func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { + if uniqueTTL > 0 { + return c.rdb.EnqueueUnique(msg, uniqueTTL) + } + return c.rdb.Enqueue(msg) +} + +func (c *Client) schedule(msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error { + if uniqueTTL > 0 { + ttl := t.Add(uniqueTTL).Sub(time.Now()) + return c.rdb.ScheduleUnique(msg, t, ttl) } return c.rdb.Schedule(msg, t) } diff --git a/client_test.go b/client_test.go index cad0969..2817b5a 100644 --- a/client_test.go +++ b/client_test.go @@ -5,10 +5,12 @@ package asynq import ( + "errors" "testing" "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" ) @@ -361,3 +363,210 @@ func TestClientEnqueueIn(t *testing.T) { } } } + +func TestUniqueKey(t *testing.T) { + tests := []struct { + desc string + task *Task + ttl time.Duration + qname string + want string + }{ + { + "with zero TTL", + NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}), + 0, + "default", + "", + }, + { + "with primitive types", + NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}), + 10 * time.Minute, + "default", + "email:send:a=123,b=hello,c=true:default", + }, + { + "with unsorted keys", + NewTask("email:send", map[string]interface{}{"b": "hello", "c": true, "a": 123}), + 10 * time.Minute, + "default", + "email:send:a=123,b=hello,c=true:default", + }, + { + "with composite types", + NewTask("email:send", + map[string]interface{}{ + "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, + "names": []string{"bob", "mike", "rob"}}), + 10 * time.Minute, + "default", + "email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]:default", + }, + { + "with complex types", + NewTask("email:send", + map[string]interface{}{ + "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), + "duration": time.Hour}), + 10 * time.Minute, + "default", + "email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC:default", + }, + { + "with nil payload", + NewTask("reindex", nil), + 10 * time.Minute, + "default", + "reindex:nil:default", + }, + } + + for _, tc := range tests { + got := uniqueKey(tc.task, tc.ttl, tc.qname) + if got != tc.want { + t.Errorf("%s: uniqueKey(%v, %v, %q) = %q, want %q", tc.desc, tc.task, tc.ttl, tc.qname, got, tc.want) + } + } +} + +func TestEnqueueUnique(t *testing.T) { + r := setup(t) + c := NewClient(RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + }) + + tests := []struct { + task *Task + ttl time.Duration + }{ + { + NewTask("email", map[string]interface{}{"user_id": 123}), + time.Hour, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + + // Enqueue the task first. It should succeed. + err := c.Enqueue(tc.task, Unique(tc.ttl)) + if err != nil { + t.Fatal(err) + } + + gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val() + if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { + t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl) + continue + } + + // Enqueue the task again. It should fail. + err = c.Enqueue(tc.task, Unique(tc.ttl)) + if err == nil { + t.Errorf("Enqueueing %+v did not return an error", tc.task) + continue + } + if !errors.Is(err, ErrDuplicateTask) { + t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task) + continue + } + } +} + +func TestEnqueueInUnique(t *testing.T) { + r := setup(t) + c := NewClient(RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + }) + + tests := []struct { + task *Task + d time.Duration + ttl time.Duration + }{ + { + NewTask("reindex", nil), + time.Hour, + 10 * time.Minute, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + + // Enqueue the task first. It should succeed. + err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) + if err != nil { + t.Fatal(err) + } + + gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val() + wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second + if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { + t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) + continue + } + + // Enqueue the task again. It should fail. + err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl)) + if err == nil { + t.Errorf("Enqueueing %+v did not return an error", tc.task) + continue + } + if !errors.Is(err, ErrDuplicateTask) { + t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task) + continue + } + } +} + +func TestEnqueueAtUnique(t *testing.T) { + r := setup(t) + c := NewClient(RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + }) + + tests := []struct { + task *Task + at time.Time + ttl time.Duration + }{ + { + NewTask("reindex", nil), + time.Now().Add(time.Hour), + 10 * time.Minute, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + + // Enqueue the task first. It should succeed. + err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) + if err != nil { + t.Fatal(err) + } + + gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val() + wantTTL := tc.at.Add(tc.ttl).Sub(time.Now()) + if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { + t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) + continue + } + + // Enqueue the task again. It should fail. + err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl)) + if err == nil { + t.Errorf("Enqueueing %+v did not return an error", tc.task) + continue + } + if !errors.Is(err, ErrDuplicateTask) { + t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task) + continue + } + } +} diff --git a/internal/base/base.go b/internal/base/base.go index d523bc6..7c4ac00 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -97,6 +97,11 @@ type TaskMessage struct { // // time.Time's zero value means no deadline. Deadline string + + // UniqueKey holds the redis key used for uniqueness lock for this task. + // + // Empty string indicates that no uniqueness lock was used. + UniqueKey string } // ProcessState holds process level information. diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index da10cd1..12255d8 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -22,6 +22,9 @@ var ( // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") + + // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. + ErrDuplicateTask = errors.New("task already exists") ) const statsTTL = 90 * 24 * time.Hour // 90 days @@ -59,6 +62,46 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, bytes).Err() } +// KEYS[1] -> unique key in the form :: +// KEYS[2] -> asynq:queues: +// KEYS[2] -> asynq:queues +// ARGV[1] -> task ID +// ARGV[2] -> uniqueness lock TTL +// ARGV[3] -> task message data +var enqueueUniqueCmd = redis.NewScript(` +local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) +if not ok then + return 0 +end +redis.call("LPUSH", KEYS[2], ARGV[3]) +redis.call("SADD", KEYS[3], KEYS[2]) +return 1 +`) + +// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. +// It returns ErrDuplicateTask if the lock cannot be acquired. +func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { + bytes, err := json.Marshal(msg) + if err != nil { + return err + } + key := base.QueueKey(msg.Queue) + res, err := enqueueUniqueCmd.Run(r.client, + []string{msg.UniqueKey, key, base.AllQueues}, + msg.ID.String(), int(ttl.Seconds()), bytes).Result() + if err != nil { + return err + } + n, ok := res.(int64) + if !ok { + return fmt.Errorf("could not cast %v to int64", res) + } + if n == 0 { + return ErrDuplicateTask + } + return nil +} + // Dequeue queries given queues in order and pops a task message if there is one and returns it. // If all queues are empty, ErrNoProcessableTask error is returned. func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { @@ -118,8 +161,10 @@ func (r *RDB) dequeue(queues ...string) (data string, err error) { // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:processed: +// KEYS[3] -> unique key in the format :: // ARGV[1] -> base.TaskMessage value // ARGV[2] -> stats expiration timestamp +// ARGV[3] -> task ID // Note: LREM count ZERO means "remove all elements equal to val" var doneCmd = redis.NewScript(` redis.call("LREM", KEYS[1], 0, ARGV[1]) @@ -127,10 +172,14 @@ local n = redis.call("INCR", KEYS[2]) if tonumber(n) == 1 then redis.call("EXPIREAT", KEYS[2], ARGV[2]) end +if string.len(KEYS[3]) > 0 and redis.call("GET", KEYS[3]) == ARGV[3] then + redis.call("DEL", KEYS[3]) +end return redis.status_reply("OK") `) // Done removes the task from in-progress queue to mark the task as done. +// It removes a uniqueness lock acquired by the task, if any. func (r *RDB) Done(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { @@ -140,8 +189,8 @@ func (r *RDB) Done(msg *base.TaskMessage) error { processedKey := base.ProcessedKey(now) expireAt := now.Add(statsTTL) return doneCmd.Run(r.client, - []string{base.InProgressQueue, processedKey}, - bytes, expireAt.Unix()).Err() + []string{base.InProgressQueue, processedKey, msg.UniqueKey}, + bytes, expireAt.Unix(), msg.ID.String()).Err() } // KEYS[1] -> asynq:in_progress @@ -164,15 +213,71 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { string(bytes)).Err() } +// KEYS[1] -> asynq:scheduled +// KEYS[2] -> asynq:queues +// ARGV[1] -> score (process_at timestamp) +// ARGV[2] -> task message +// ARGV[3] -> queue key +var scheduleCmd = redis.NewScript(` +redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) +redis.call("SADD", KEYS[2], ARGV[3]) +return 1 +`) + // Schedule adds the task to the backlog queue to be processed in the future. func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { bytes, err := json.Marshal(msg) if err != nil { return err } + qkey := base.QueueKey(msg.Queue) score := float64(processAt.Unix()) - return r.client.ZAdd(base.ScheduledQueue, - &redis.Z{Member: string(bytes), Score: score}).Err() + return scheduleCmd.Run(r.client, + []string{base.ScheduledQueue, base.AllQueues}, + score, bytes, qkey).Err() +} + +// KEYS[1] -> unique key in the format :: +// KEYS[2] -> asynq:scheduled +// KEYS[3] -> asynq:queues +// ARGV[1] -> task ID +// ARGV[2] -> uniqueness lock TTL +// ARGV[3] -> score (process_at timestamp) +// ARGV[4] -> task message +// ARGV[5] -> queue key +var scheduleUniqueCmd = redis.NewScript(` +local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) +if not ok then + return 0 +end +redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4]) +redis.call("SADD", KEYS[3], ARGV[5]) +return 1 +`) + +// Schedule adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. +// It returns ErrDuplicateTask if the lock cannot be acquired. +func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error { + bytes, err := json.Marshal(msg) + if err != nil { + return err + } + qkey := base.QueueKey(msg.Queue) + score := float64(processAt.Unix()) + res, err := scheduleUniqueCmd.Run(r.client, + []string{msg.UniqueKey, base.ScheduledQueue, base.AllQueues}, + msg.ID.String(), int(ttl.Seconds()), score, bytes, qkey).Result() + if err != nil { + return err + } + n, ok := res.(int64) + if !ok { + return fmt.Errorf("could not cast %v to int64", res) + } + if n == 0 { + return ErrDuplicateTask + } + return nil } // KEYS[1] -> asynq:in_progress diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 842449f..dc5cba5 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -16,6 +16,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" + "github.com/rs/xid" ) // TODO(hibiken): Get Redis address and db number from ENV variables. @@ -69,6 +70,48 @@ func TestEnqueue(t *testing.T) { } } +func TestEnqueueUnique(t *testing.T) { + r := setup(t) + m1 := base.TaskMessage{ + ID: xid.New(), + Type: "email", + Payload: map[string]interface{}{"user_id": 123}, + Queue: base.DefaultQueueName, + UniqueKey: "email:user_id=123:default", + } + + tests := []struct { + msg *base.TaskMessage + ttl time.Duration // uniqueness ttl + }{ + {&m1, time.Minute}, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case. + + err := r.EnqueueUnique(tc.msg, tc.ttl) + if err != nil { + t.Errorf("First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil", + tc.msg, tc.ttl, err) + continue + } + + got := r.EnqueueUnique(tc.msg, tc.ttl) + if got != ErrDuplicateTask { + t.Errorf("Second message: (*RDB).EnqueueUnique(%v, %v) = %v, want %v", + tc.msg, tc.ttl, got, ErrDuplicateTask) + continue + } + + gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() + if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { + t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) + continue + } + } +} + func TestDequeue(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) @@ -188,6 +231,13 @@ func TestDone(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("export_csv", nil) + t3 := &base.TaskMessage{ + ID: xid.New(), + Type: "reindex", + Payload: nil, + UniqueKey: "reindex:nil:default", + Queue: "default", + } tests := []struct { inProgress []*base.TaskMessage // initial state of the in-progress list @@ -204,11 +254,25 @@ func TestDone(t *testing.T) { target: t1, wantInProgress: []*base.TaskMessage{}, }, + { + inProgress: []*base.TaskMessage{t1, t2, t3}, + target: t3, + wantInProgress: []*base.TaskMessage{t1, t2}, + }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedInProgressQueue(t, r.client, tc.inProgress) + for _, msg := range tc.inProgress { + // Set uniqueness lock if unique key is present. + if len(msg.UniqueKey) > 0 { + err := r.client.SetNX(msg.UniqueKey, msg.ID.String(), time.Minute).Err() + if err != nil { + t.Fatal(err) + } + } + } err := r.Done(tc.target) if err != nil { @@ -232,6 +296,10 @@ func TestDone(t *testing.T) { if gotTTL > statsTTL { t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) } + + if len(tc.target.UniqueKey) > 0 && r.client.Exists(tc.target.UniqueKey).Val() != 0 { + t.Errorf("Uniqueness lock %q still exists", tc.target.UniqueKey) + } } } @@ -344,6 +412,58 @@ func TestSchedule(t *testing.T) { } } +func TestScheduleUnique(t *testing.T) { + r := setup(t) + m1 := base.TaskMessage{ + ID: xid.New(), + Type: "email", + Payload: map[string]interface{}{"user_id": 123}, + Queue: base.DefaultQueueName, + UniqueKey: "email:user_id=123:default", + } + + tests := []struct { + msg *base.TaskMessage + processAt time.Time + ttl time.Duration // uniqueness lock ttl + }{ + {&m1, time.Now().Add(15 * time.Minute), time.Minute}, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + + desc := fmt.Sprintf("(*RDB).ScheduleUnique(%v, %v, %v)", tc.msg, tc.processAt, tc.ttl) + err := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl) + if err != nil { + t.Errorf("Frist task: %s = %v, want nil", desc, err) + continue + } + + gotScheduled := h.GetScheduledEntries(t, r.client) + if len(gotScheduled) != 1 { + t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue) + continue + } + if int64(gotScheduled[0].Score) != tc.processAt.Unix() { + t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix()) + continue + } + + got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl) + if got != ErrDuplicateTask { + t.Errorf("Second task: %s = %v, want %v", + desc, got, ErrDuplicateTask) + } + + gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() + if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { + t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) + continue + } + } +} + func TestRetry(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) @@ -784,8 +904,7 @@ func TestWriteProcessState(t *testing.T) { } // Check ProcessInfo TTL was set correctly gotTTL := r.client.TTL(pkey).Val() - timeCmpOpt := cmpopts.EquateApproxTime(time.Second) - if !cmp.Equal(ttl, gotTTL, timeCmpOpt) { + if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) } // Check ProcessInfo key was added to the set correctly @@ -858,8 +977,7 @@ func TestWriteProcessStateWithWorkers(t *testing.T) { } // Check ProcessInfo TTL was set correctly gotTTL := r.client.TTL(pkey).Val() - timeCmpOpt := cmpopts.EquateApproxTime(time.Second) - if !cmp.Equal(ttl, gotTTL, timeCmpOpt) { + if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) } // Check ProcessInfo key was added to the set correctly