2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 15:22:18 +08:00

Allow client to enqueue a task with unique option

Changes:

- Added Unique option for clients
- Require go v.13 or above (to use new errors wrapping functions)
- Fixed adding queue key to all-queues set (asynq:queues) when scheduling.
This commit is contained in:
Ken Hibino 2020-03-18 06:49:39 -07:00
parent 6df2c3ae2b
commit c33dd447ac
8 changed files with 542 additions and 27 deletions

View File

@ -2,9 +2,7 @@ language: go
go_import_path: github.com/hibiken/asynq go_import_path: github.com/hibiken/asynq
git: git:
depth: 1 depth: 1
env: go: [1.13.x, 1.14.x]
- GO111MODULE=on # go modules are the default
go: [1.12.x, 1.13.x, 1.14.x]
script: script:
- go test -race -v -coverprofile=coverage.txt -covermode=atomic ./... - go test -race -v -coverprofile=coverage.txt -covermode=atomic ./...
services: services:

View File

@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- `Unique` option was added to allow client to enqueue a task only if it's unique within a certain time period.
## [0.6.2] - 2020-03-15 ## [0.6.2] - 2020-03-15
### Added ### Added

View File

@ -205,7 +205,7 @@ go get -u github.com/hibiken/asynq/tools/asynqmon
| Dependency | Version | | Dependency | Version |
| -------------------------- | ------- | | -------------------------- | ------- |
| [Redis](https://redis.io/) | v2.8+ | | [Redis](https://redis.io/) | v2.8+ |
| [Go](https://golang.org/) | v1.12+ | | [Go](https://golang.org/) | v1.13+ |
## Contributing ## Contributing

106
client.go
View File

@ -5,6 +5,9 @@
package asynq package asynq
import ( import (
"errors"
"fmt"
"sort"
"strings" "strings"
"time" "time"
@ -38,6 +41,7 @@ type (
queueOption string queueOption string
timeoutOption time.Duration timeoutOption time.Duration
deadlineOption time.Time deadlineOption time.Time
uniqueOption time.Duration
) )
// MaxRetry returns an option to specify the max number of times // MaxRetry returns an option to specify the max number of times
@ -70,11 +74,30 @@ func Deadline(t time.Time) Option {
return deadlineOption(t) return deadlineOption(t)
} }
// Unique returns an option to enqueue a task only if the given task is unique.
// Task enqueued with this option is guaranteed to be unique within the given ttl.
// Once the task gets processed successfully or once the TTL has expired, another task with the same uniqueness may be enqueued.
// ErrDuplicateTask error is returned when enqueueing a duplicate task.
//
// Uniqueness of a task is based on the following properties:
// - Task Type
// - Task Payload
// - Queue Name
func Unique(ttl time.Duration) Option {
return uniqueOption(ttl)
}
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
//
// ErrDuplicateTask error only applies to tasks enqueued with a Unique option.
var ErrDuplicateTask = errors.New("task already exists")
type option struct { type option struct {
retry int retry int
queue string queue string
timeout time.Duration timeout time.Duration
deadline time.Time deadline time.Time
uniqueTTL time.Duration
} }
func composeOptions(opts ...Option) option { func composeOptions(opts ...Option) option {
@ -94,6 +117,8 @@ func composeOptions(opts ...Option) option {
res.timeout = time.Duration(opt) res.timeout = time.Duration(opt)
case deadlineOption: case deadlineOption:
res.deadline = time.Time(opt) res.deadline = time.Time(opt)
case uniqueOption:
res.uniqueTTL = time.Duration(opt)
default: default:
// ignore unexpected option // ignore unexpected option
} }
@ -101,6 +126,39 @@ func composeOptions(opts ...Option) option {
return res return res
} }
// uniqueKey computes the redis key used for the given task.
// It returns an empty string if ttl is zero.
func uniqueKey(t *Task, ttl time.Duration, qname string) string {
if ttl == 0 {
return ""
}
return fmt.Sprintf("%s:%s:%s", t.Type, serializePayload(t.Payload.data), qname)
}
func serializePayload(payload map[string]interface{}) string {
if payload == nil {
return "nil"
}
type entry struct {
k string
v interface{}
}
var es []entry
for k, v := range payload {
es = append(es, entry{k, v})
}
// sort entries by key
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
var b strings.Builder
for _, e := range es {
if b.Len() > 0 {
b.WriteString(",")
}
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
}
return b.String()
}
const ( const (
// Max retry count by default // Max retry count by default
defaultMaxRetry = 25 defaultMaxRetry = 25
@ -115,15 +173,25 @@ const (
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
opt := composeOptions(opts...) opt := composeOptions(opts...)
msg := &base.TaskMessage{ msg := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: task.Type, Type: task.Type,
Payload: task.Payload.data, Payload: task.Payload.data,
Queue: opt.queue, Queue: opt.queue,
Retry: opt.retry, Retry: opt.retry,
Timeout: opt.timeout.String(), Timeout: opt.timeout.String(),
Deadline: opt.deadline.Format(time.RFC3339), Deadline: opt.deadline.Format(time.RFC3339),
UniqueKey: uniqueKey(task, opt.uniqueTTL, opt.queue),
} }
return c.enqueue(msg, t) var err error
if time.Now().After(t) {
err = c.enqueue(msg, opt.uniqueTTL)
} else {
err = c.schedule(msg, t, opt.uniqueTTL)
}
if err == rdb.ErrDuplicateTask {
return fmt.Errorf("%w", ErrDuplicateTask)
}
return err
} }
// Enqueue enqueues task to be processed immediately. // Enqueue enqueues task to be processed immediately.
@ -146,9 +214,17 @@ func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now().Add(d), task, opts...) return c.EnqueueAt(time.Now().Add(d), task, opts...)
} }
func (c *Client) enqueue(msg *base.TaskMessage, t time.Time) error { func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
if time.Now().After(t) { if uniqueTTL > 0 {
return c.rdb.Enqueue(msg) return c.rdb.EnqueueUnique(msg, uniqueTTL)
}
return c.rdb.Enqueue(msg)
}
func (c *Client) schedule(msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
if uniqueTTL > 0 {
ttl := t.Add(uniqueTTL).Sub(time.Now())
return c.rdb.ScheduleUnique(msg, t, ttl)
} }
return c.rdb.Schedule(msg, t) return c.rdb.Schedule(msg, t)
} }

View File

@ -5,10 +5,12 @@
package asynq package asynq
import ( import (
"errors"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
@ -361,3 +363,210 @@ func TestClientEnqueueIn(t *testing.T) {
} }
} }
} }
func TestUniqueKey(t *testing.T) {
tests := []struct {
desc string
task *Task
ttl time.Duration
qname string
want string
}{
{
"with zero TTL",
NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}),
0,
"default",
"",
},
{
"with primitive types",
NewTask("email:send", map[string]interface{}{"a": 123, "b": "hello", "c": true}),
10 * time.Minute,
"default",
"email:send:a=123,b=hello,c=true:default",
},
{
"with unsorted keys",
NewTask("email:send", map[string]interface{}{"b": "hello", "c": true, "a": 123}),
10 * time.Minute,
"default",
"email:send:a=123,b=hello,c=true:default",
},
{
"with composite types",
NewTask("email:send",
map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}}),
10 * time.Minute,
"default",
"email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]:default",
},
{
"with complex types",
NewTask("email:send",
map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour}),
10 * time.Minute,
"default",
"email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC:default",
},
{
"with nil payload",
NewTask("reindex", nil),
10 * time.Minute,
"default",
"reindex:nil:default",
},
}
for _, tc := range tests {
got := uniqueKey(tc.task, tc.ttl, tc.qname)
if got != tc.want {
t.Errorf("%s: uniqueKey(%v, %v, %q) = %q, want %q", tc.desc, tc.task, tc.ttl, tc.qname, got, tc.want)
}
}
}
func TestEnqueueUnique(t *testing.T) {
r := setup(t)
c := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
task *Task
ttl time.Duration
}{
{
NewTask("email", map[string]interface{}{"user_id": 123}),
time.Hour,
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
// Enqueue the task first. It should succeed.
err := c.Enqueue(tc.task, Unique(tc.ttl))
if err != nil {
t.Fatal(err)
}
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
continue
}
// Enqueue the task again. It should fail.
err = c.Enqueue(tc.task, Unique(tc.ttl))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}
func TestEnqueueInUnique(t *testing.T) {
r := setup(t)
c := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
task *Task
d time.Duration
ttl time.Duration
}{
{
NewTask("reindex", nil),
time.Hour,
10 * time.Minute,
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
// Enqueue the task first. It should succeed.
err := c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
if err != nil {
t.Fatal(err)
}
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
continue
}
// Enqueue the task again. It should fail.
err = c.EnqueueIn(tc.d, tc.task, Unique(tc.ttl))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}
func TestEnqueueAtUnique(t *testing.T) {
r := setup(t)
c := NewClient(RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
})
tests := []struct {
task *Task
at time.Time
ttl time.Duration
}{
{
NewTask("reindex", nil),
time.Now().Add(time.Hour),
10 * time.Minute,
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
// Enqueue the task first. It should succeed.
err := c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
if err != nil {
t.Fatal(err)
}
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
wantTTL := tc.at.Add(tc.ttl).Sub(time.Now())
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
continue
}
// Enqueue the task again. It should fail.
err = c.EnqueueAt(tc.at, tc.task, Unique(tc.ttl))
if err == nil {
t.Errorf("Enqueueing %+v did not return an error", tc.task)
continue
}
if !errors.Is(err, ErrDuplicateTask) {
t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task)
continue
}
}
}

View File

@ -97,6 +97,11 @@ type TaskMessage struct {
// //
// time.Time's zero value means no deadline. // time.Time's zero value means no deadline.
Deadline string Deadline string
// UniqueKey holds the redis key used for uniqueness lock for this task.
//
// Empty string indicates that no uniqueness lock was used.
UniqueKey string
} }
// ProcessState holds process level information. // ProcessState holds process level information.

View File

@ -22,6 +22,9 @@ var (
// ErrTaskNotFound indicates that a task that matches the given identifier was not found. // ErrTaskNotFound indicates that a task that matches the given identifier was not found.
ErrTaskNotFound = errors.New("could not find a task") ErrTaskNotFound = errors.New("could not find a task")
// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
ErrDuplicateTask = errors.New("task already exists")
) )
const statsTTL = 90 * 24 * time.Hour // 90 days const statsTTL = 90 * 24 * time.Hour // 90 days
@ -59,6 +62,46 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, bytes).Err() return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, bytes).Err()
} }
// KEYS[1] -> unique key in the form <type>:<payload>:<qname>
// KEYS[2] -> asynq:queues:<qname>
// KEYS[2] -> asynq:queues
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data
var enqueueUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("LPUSH", KEYS[2], ARGV[3])
redis.call("SADD", KEYS[3], KEYS[2])
return 1
`)
// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired.
// It returns ErrDuplicateTask if the lock cannot be acquired.
func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
key := base.QueueKey(msg.Queue)
res, err := enqueueUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, key, base.AllQueues},
msg.ID.String(), int(ttl.Seconds()), bytes).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("could not cast %v to int64", res)
}
if n == 0 {
return ErrDuplicateTask
}
return nil
}
// Dequeue queries given queues in order and pops a task message if there is one and returns it. // Dequeue queries given queues in order and pops a task message if there is one and returns it.
// If all queues are empty, ErrNoProcessableTask error is returned. // If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
@ -118,8 +161,10 @@ func (r *RDB) dequeue(queues ...string) (data string, err error) {
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:processed:<yyyy-mm-dd> // KEYS[2] -> asynq:processed:<yyyy-mm-dd>
// KEYS[3] -> unique key in the format <type>:<payload>:<qname>
// ARGV[1] -> base.TaskMessage value // ARGV[1] -> base.TaskMessage value
// ARGV[2] -> stats expiration timestamp // ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
// Note: LREM count ZERO means "remove all elements equal to val" // Note: LREM count ZERO means "remove all elements equal to val"
var doneCmd = redis.NewScript(` var doneCmd = redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1]) redis.call("LREM", KEYS[1], 0, ARGV[1])
@ -127,10 +172,14 @@ local n = redis.call("INCR", KEYS[2])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[2], ARGV[2]) redis.call("EXPIREAT", KEYS[2], ARGV[2])
end end
if string.len(KEYS[3]) > 0 and redis.call("GET", KEYS[3]) == ARGV[3] then
redis.call("DEL", KEYS[3])
end
return redis.status_reply("OK") return redis.status_reply("OK")
`) `)
// Done removes the task from in-progress queue to mark the task as done. // Done removes the task from in-progress queue to mark the task as done.
// It removes a uniqueness lock acquired by the task, if any.
func (r *RDB) Done(msg *base.TaskMessage) error { func (r *RDB) Done(msg *base.TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
@ -140,8 +189,8 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
processedKey := base.ProcessedKey(now) processedKey := base.ProcessedKey(now)
expireAt := now.Add(statsTTL) expireAt := now.Add(statsTTL)
return doneCmd.Run(r.client, return doneCmd.Run(r.client,
[]string{base.InProgressQueue, processedKey}, []string{base.InProgressQueue, processedKey, msg.UniqueKey},
bytes, expireAt.Unix()).Err() bytes, expireAt.Unix(), msg.ID.String()).Err()
} }
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
@ -164,15 +213,71 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
string(bytes)).Err() string(bytes)).Err()
} }
// KEYS[1] -> asynq:scheduled
// KEYS[2] -> asynq:queues
// ARGV[1] -> score (process_at timestamp)
// ARGV[2] -> task message
// ARGV[3] -> queue key
var scheduleCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
redis.call("SADD", KEYS[2], ARGV[3])
return 1
`)
// Schedule adds the task to the backlog queue to be processed in the future. // Schedule adds the task to the backlog queue to be processed in the future.
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return err return err
} }
qkey := base.QueueKey(msg.Queue)
score := float64(processAt.Unix()) score := float64(processAt.Unix())
return r.client.ZAdd(base.ScheduledQueue, return scheduleCmd.Run(r.client,
&redis.Z{Member: string(bytes), Score: score}).Err() []string{base.ScheduledQueue, base.AllQueues},
score, bytes, qkey).Err()
}
// KEYS[1] -> unique key in the format <type>:<payload>:<qname>
// KEYS[2] -> asynq:scheduled
// KEYS[3] -> asynq:queues
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> score (process_at timestamp)
// ARGV[4] -> task message
// ARGV[5] -> queue key
var scheduleUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
redis.call("SADD", KEYS[3], ARGV[5])
return 1
`)
// Schedule adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.
// It returns ErrDuplicateTask if the lock cannot be acquired.
func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
bytes, err := json.Marshal(msg)
if err != nil {
return err
}
qkey := base.QueueKey(msg.Queue)
score := float64(processAt.Unix())
res, err := scheduleUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, base.ScheduledQueue, base.AllQueues},
msg.ID.String(), int(ttl.Seconds()), score, bytes, qkey).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("could not cast %v to int64", res)
}
if n == 0 {
return ErrDuplicateTask
}
return nil
} }
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress

View File

@ -16,6 +16,7 @@ import (
"github.com/google/go-cmp/cmp/cmpopts" "github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/rs/xid"
) )
// TODO(hibiken): Get Redis address and db number from ENV variables. // TODO(hibiken): Get Redis address and db number from ENV variables.
@ -69,6 +70,48 @@ func TestEnqueue(t *testing.T) {
} }
} }
func TestEnqueueUnique(t *testing.T) {
r := setup(t)
m1 := base.TaskMessage{
ID: xid.New(),
Type: "email",
Payload: map[string]interface{}{"user_id": 123},
Queue: base.DefaultQueueName,
UniqueKey: "email:user_id=123:default",
}
tests := []struct {
msg *base.TaskMessage
ttl time.Duration // uniqueness ttl
}{
{&m1, time.Minute},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case.
err := r.EnqueueUnique(tc.msg, tc.ttl)
if err != nil {
t.Errorf("First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil",
tc.msg, tc.ttl, err)
continue
}
got := r.EnqueueUnique(tc.msg, tc.ttl)
if got != ErrDuplicateTask {
t.Errorf("Second message: (*RDB).EnqueueUnique(%v, %v) = %v, want %v",
tc.msg, tc.ttl, got, ErrDuplicateTask)
continue
}
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue
}
}
}
func TestDequeue(t *testing.T) { func TestDequeue(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"})
@ -188,6 +231,13 @@ func TestDone(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("export_csv", nil) t2 := h.NewTaskMessage("export_csv", nil)
t3 := &base.TaskMessage{
ID: xid.New(),
Type: "reindex",
Payload: nil,
UniqueKey: "reindex:nil:default",
Queue: "default",
}
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage // initial state of the in-progress list inProgress []*base.TaskMessage // initial state of the in-progress list
@ -204,11 +254,25 @@ func TestDone(t *testing.T) {
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
}, },
{
inProgress: []*base.TaskMessage{t1, t2, t3},
target: t3,
wantInProgress: []*base.TaskMessage{t1, t2},
},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedInProgressQueue(t, r.client, tc.inProgress)
for _, msg := range tc.inProgress {
// Set uniqueness lock if unique key is present.
if len(msg.UniqueKey) > 0 {
err := r.client.SetNX(msg.UniqueKey, msg.ID.String(), time.Minute).Err()
if err != nil {
t.Fatal(err)
}
}
}
err := r.Done(tc.target) err := r.Done(tc.target)
if err != nil { if err != nil {
@ -232,6 +296,10 @@ func TestDone(t *testing.T) {
if gotTTL > statsTTL { if gotTTL > statsTTL {
t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL) t.Errorf("TTL %q = %v, want less than or equal to %v", processedKey, gotTTL, statsTTL)
} }
if len(tc.target.UniqueKey) > 0 && r.client.Exists(tc.target.UniqueKey).Val() != 0 {
t.Errorf("Uniqueness lock %q still exists", tc.target.UniqueKey)
}
} }
} }
@ -344,6 +412,58 @@ func TestSchedule(t *testing.T) {
} }
} }
func TestScheduleUnique(t *testing.T) {
r := setup(t)
m1 := base.TaskMessage{
ID: xid.New(),
Type: "email",
Payload: map[string]interface{}{"user_id": 123},
Queue: base.DefaultQueueName,
UniqueKey: "email:user_id=123:default",
}
tests := []struct {
msg *base.TaskMessage
processAt time.Time
ttl time.Duration // uniqueness lock ttl
}{
{&m1, time.Now().Add(15 * time.Minute), time.Minute},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
desc := fmt.Sprintf("(*RDB).ScheduleUnique(%v, %v, %v)", tc.msg, tc.processAt, tc.ttl)
err := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
if err != nil {
t.Errorf("Frist task: %s = %v, want nil", desc, err)
continue
}
gotScheduled := h.GetScheduledEntries(t, r.client)
if len(gotScheduled) != 1 {
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue)
continue
}
if int64(gotScheduled[0].Score) != tc.processAt.Unix() {
t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix())
continue
}
got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
if got != ErrDuplicateTask {
t.Errorf("Second task: %s = %v, want %v",
desc, got, ErrDuplicateTask)
}
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue
}
}
}
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "Hola!"})
@ -784,8 +904,7 @@ func TestWriteProcessState(t *testing.T) {
} }
// Check ProcessInfo TTL was set correctly // Check ProcessInfo TTL was set correctly
gotTTL := r.client.TTL(pkey).Val() gotTTL := r.client.TTL(pkey).Val()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second) if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
if !cmp.Equal(ttl, gotTTL, timeCmpOpt) {
t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl)
} }
// Check ProcessInfo key was added to the set correctly // Check ProcessInfo key was added to the set correctly
@ -858,8 +977,7 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
} }
// Check ProcessInfo TTL was set correctly // Check ProcessInfo TTL was set correctly
gotTTL := r.client.TTL(pkey).Val() gotTTL := r.client.TTL(pkey).Val()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second) if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
if !cmp.Equal(ttl, gotTTL, timeCmpOpt) {
t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl)
} }
// Check ProcessInfo key was added to the set correctly // Check ProcessInfo key was added to the set correctly