2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Extract rdb to internal package

This commit is contained in:
Ken Hibino 2019-12-03 21:01:26 -08:00
parent 593f2b0482
commit d4e442d04f
17 changed files with 488 additions and 971 deletions

View File

@ -1,10 +1,6 @@
package asynq package asynq
import ( import "github.com/go-redis/redis/v7"
"time"
"github.com/google/uuid"
)
/* /*
TODOs: TODOs:
@ -27,32 +23,6 @@ type Task struct {
Payload map[string]interface{} Payload map[string]interface{}
} }
// taskMessage is an internal representation of a task with additional metadata fields.
// This data gets written in redis.
type taskMessage struct {
//-------- Task fields --------
Type string
Payload map[string]interface{}
//-------- metadata fields --------
// unique identifier for each task
ID uuid.UUID
// queue name this message should be enqueued to
Queue string
// max number of retry for this task.
Retry int
// number of times we've retried so far
Retried int
// error message from the last failure
ErrorMsg string
}
// RedisConfig specifies redis configurations. // RedisConfig specifies redis configurations.
type RedisConfig struct { type RedisConfig struct {
Addr string Addr string
@ -62,60 +32,10 @@ type RedisConfig struct {
DB int DB int
} }
// Stats represents a state of queues at a certain time. func newRedisClient(config *RedisConfig) *redis.Client {
type Stats struct { return redis.NewClient(&redis.Options{
Queued int Addr: config.Addr,
InProgress int Password: config.Password,
Scheduled int DB: config.DB,
Retry int })
Dead int
Timestamp time.Time
}
// EnqueuedTask is a task in a queue and is ready to be processed.
// This is read only and used for inspection purpose.
type EnqueuedTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
}
// InProgressTask is a task that's currently being processed.
// This is read only and used for inspection purpose.
type InProgressTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
}
// ScheduledTask is a task that's scheduled to be processed in the future.
// This is read only and used for inspection purpose.
type ScheduledTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
ProcessAt time.Time
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
// This is read only and used for inspection purpose.
type RetryTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
// TODO(hibiken): add LastFailedAt time.Time
ProcessAt time.Time
ErrorMsg string
Retried int
Retry int
}
// DeadTask is a task in that has exhausted all retries.
// This is read only and used for inspection purpose.
type DeadTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
LastFailedAt time.Time
ErrorMsg string
} }

View File

@ -2,29 +2,30 @@ package asynq
import ( import (
"encoding/json" "encoding/json"
"math/rand"
"sort" "sort"
"testing" "testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
) )
// This file defines test helper functions used by // This file defines test helper functions used by
// other test files. // other test files.
func init() { func setup(t *testing.T) *redis.Client {
rand.Seed(time.Now().UnixNano()) t.Helper()
} r := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*taskMessage) []*taskMessage { DB: 2,
out := append([]*taskMessage(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
}) })
return out // Start each test with a clean slate.
}) if err := r.FlushDB().Err(); err != nil {
panic(err)
}
return r
}
var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
out := append([]*Task(nil), in...) // Copy input to avoid mutating it out := append([]*Task(nil), in...) // Copy input to avoid mutating it
@ -34,32 +35,25 @@ var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
return out return out
}) })
// setup connects to a redis database and flush all keys var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.TaskMessage {
// before returning an instance of rdb. out := append([]*rdb.TaskMessage(nil), in...) // Copy input to avoid mutating it
func setup(t *testing.T) *rdb { sort.Slice(out, func(i, j int) bool {
t.Helper() return out[i].ID.String() < out[j].ID.String()
r := newRDB(&RedisConfig{
Addr: "localhost:6379",
DB: 15, // use database 15 to separate from other applications
}) })
// Start each test with a clean slate. return out
if err := r.client.FlushDB().Err(); err != nil { })
panic(err)
}
return r
}
func randomTask(taskType, qname string, payload map[string]interface{}) *taskMessage { func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage {
return &taskMessage{ return &rdb.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: taskType, Type: taskType,
Queue: qname, Queue: qname,
Retry: rand.Intn(100), Retry: defaultMaxRetry,
Payload: make(map[string]interface{}), Payload: make(map[string]interface{}),
} }
} }
func mustMarshal(t *testing.T, task *taskMessage) string { func mustMarshal(t *testing.T, task *rdb.TaskMessage) string {
t.Helper() t.Helper()
data, err := json.Marshal(task) data, err := json.Marshal(task)
if err != nil { if err != nil {
@ -68,9 +62,9 @@ func mustMarshal(t *testing.T, task *taskMessage) string {
return string(data) return string(data)
} }
func mustUnmarshal(t *testing.T, data string) *taskMessage { func mustUnmarshal(t *testing.T, data string) *rdb.TaskMessage {
t.Helper() t.Helper()
var task taskMessage var task rdb.TaskMessage
err := json.Unmarshal([]byte(data), &task) err := json.Unmarshal([]byte(data), &task)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -78,7 +72,7 @@ func mustUnmarshal(t *testing.T, data string) *taskMessage {
return &task return &task
} }
func mustMarshalSlice(t *testing.T, tasks []*taskMessage) []string { func mustMarshalSlice(t *testing.T, tasks []*rdb.TaskMessage) []string {
t.Helper() t.Helper()
var data []string var data []string
for _, task := range tasks { for _, task := range tasks {
@ -87,9 +81,9 @@ func mustMarshalSlice(t *testing.T, tasks []*taskMessage) []string {
return data return data
} }
func mustUnmarshalSlice(t *testing.T, data []string) []*taskMessage { func mustUnmarshalSlice(t *testing.T, data []string) []*rdb.TaskMessage {
t.Helper() t.Helper()
var tasks []*taskMessage var tasks []*rdb.TaskMessage
for _, s := range data { for _, s := range data {
tasks = append(tasks, mustUnmarshal(t, s)) tasks = append(tasks, mustUnmarshal(t, s))
} }

View File

@ -7,6 +7,8 @@ import (
"os/signal" "os/signal"
"sync" "sync"
"time" "time"
"github.com/hibiken/asynq/internal/rdb"
) )
// Background is a top-level entity for the background-task processing. // Background is a top-level entity for the background-task processing.
@ -14,18 +16,18 @@ type Background struct {
mu sync.Mutex mu sync.Mutex
running bool running bool
rdb *rdb rdb *rdb.RDB
poller *poller poller *poller
processor *processor processor *processor
} }
// NewBackground returns a new Background instance. // NewBackground returns a new Background instance.
func NewBackground(numWorkers int, config *RedisConfig) *Background { func NewBackground(numWorkers int, config *RedisConfig) *Background {
rdb := newRDB(config) r := rdb.NewRDB(newRedisClient(config))
poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) poller := newPoller(r, 5*time.Second, []string{rdb.Scheduled, rdb.Retry})
processor := newProcessor(rdb, numWorkers, nil) processor := newProcessor(r, numWorkers, nil)
return &Background{ return &Background{
rdb: rdb, rdb: r,
poller: poller, poller: poller,
processor: processor, processor: processor,
} }
@ -95,7 +97,7 @@ func (bg *Background) stop() {
bg.poller.terminate() bg.poller.terminate()
bg.processor.terminate() bg.processor.terminate()
bg.rdb.client.Close() bg.rdb.Close()
bg.processor.handler = nil bg.processor.handler = nil
bg.running = false bg.running = false
} }

View File

@ -4,21 +4,23 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
) )
// Client is an interface for scheduling tasks. // Client is an interface for scheduling tasks.
type Client struct { type Client struct {
rdb *rdb rdb *rdb.RDB
} }
// NewClient creates and returns a new client. // NewClient creates and returns a new client.
func NewClient(config *RedisConfig) *Client { func NewClient(config *RedisConfig) *Client {
return &Client{rdb: newRDB(config)} r := rdb.NewRDB(newRedisClient(config))
return &Client{r}
} }
// Process enqueues the task to be performed at a given time. // Process enqueues the task to be performed at a given time.
func (c *Client) Process(task *Task, processAt time.Time) error { func (c *Client) Process(task *Task, processAt time.Time) error {
msg := &taskMessage{ msg := &rdb.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: task.Type, Type: task.Type,
Payload: task.Payload, Payload: task.Payload,
@ -29,9 +31,9 @@ func (c *Client) Process(task *Task, processAt time.Time) error {
} }
// enqueue pushes a given task to the specified queue. // enqueue pushes a given task to the specified queue.
func (c *Client) enqueue(msg *taskMessage, processAt time.Time) error { func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error {
if time.Now().After(processAt) { if time.Now().After(processAt) {
return c.rdb.enqueue(msg) return c.rdb.Enqueue(msg)
} }
return c.rdb.schedule(scheduled, processAt, msg) return c.rdb.Schedule(rdb.Scheduled, processAt, msg)
} }

View File

@ -1,13 +1,14 @@
package asynq package asynq
import ( import (
"github.com/hibiken/asynq/internal/rdb"
"testing" "testing"
"time" "time"
) )
func TestClient(t *testing.T) { func TestClient(t *testing.T) {
r := setup(t) r := setup(t)
client := &Client{rdb: r} client := &Client{rdb.NewRDB(r)}
tests := []struct { tests := []struct {
task *Task task *Task
@ -31,7 +32,7 @@ func TestClient(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
// clean up db before each test case. // clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil { if err := r.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -41,12 +42,12 @@ func TestClient(t *testing.T) {
continue continue
} }
if l := r.client.LLen(defaultQueue).Val(); l != tc.wantQueueSize { if l := r.LLen(rdb.DefaultQueue).Val(); l != tc.wantQueueSize {
t.Errorf("%q has length %d, want %d", defaultQueue, l, tc.wantQueueSize) t.Errorf("%q has length %d, want %d", rdb.DefaultQueue, l, tc.wantQueueSize)
} }
if l := r.client.ZCard(scheduled).Val(); l != tc.wantScheduledSize { if l := r.ZCard(rdb.Scheduled).Val(); l != tc.wantScheduledSize {
t.Errorf("%q has length %d, want %d", scheduled, l, tc.wantScheduledSize) t.Errorf("%q has length %d, want %d", rdb.Scheduled, l, tc.wantScheduledSize)
} }
} }
} }

View File

@ -9,19 +9,21 @@ import (
"text/tabwriter" "text/tabwriter"
"time" "time"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
) )
var pollInterval = flag.Duration("interval", 3*time.Second, "polling interval") var pollInterval = flag.Duration("interval", 3*time.Second, "polling interval")
func main() { func main() {
inspector := asynq.NewInspector(&asynq.RedisConfig{ c := redis.NewClient(&redis.Options{
Addr: "localhost:6379", Addr: "localhost:6379",
DB: 2, DB: 2,
}) })
r := rdb.NewClient(c)
for { for {
stats, err := inspector.CurrentStats() stats, err := r.CurrentStats()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -31,7 +33,7 @@ func main() {
} }
} }
func printStats(s *asynq.Stats) { func printStats(s *rdb.Stats) {
format := strings.Repeat("%v\t", 5) + "\n" format := strings.Repeat("%v\t", 5) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, "Enqueued", "InProgress", "Scheduled", "Retry", "Dead") fmt.Fprintf(tw, format, "Enqueued", "InProgress", "Scheduled", "Retry", "Dead")

9
go.sum
View File

@ -8,17 +8,26 @@ github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4= go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -1,84 +0,0 @@
package asynq
// Inspector is used to inspect queues.
type Inspector struct {
rdb *rdb
}
// NewInspector returns a new Inspector instance.
func NewInspector(config *RedisConfig) *Inspector {
return &Inspector{
rdb: newRDB(config),
}
}
// CurrentStats returns a current stats of queues.
func (i *Inspector) CurrentStats() (*Stats, error) {
return i.rdb.currentStats()
}
// toTaskSlice converts a taskMessage slice to a Task slice.
func (i *Inspector) toTaskSlice(msgs []*taskMessage) []*Task {
var tasks []*Task
for _, m := range msgs {
tasks = append(tasks, &Task{Type: m.Type, Payload: m.Payload})
}
return tasks
}
// ListEnqueuedTasks returns a list of tasks ready to be processed.
func (i *Inspector) ListEnqueuedTasks() ([]*EnqueuedTask, error) {
// TODO(hibiken): Support pagination.
msgs, err := i.rdb.listEnqueued()
if err != nil {
return nil, err
}
var tasks []*EnqueuedTask
for _, m := range msgs {
tasks = append(tasks, &EnqueuedTask{
ID: m.ID,
Type: m.Type,
Payload: m.Payload,
})
}
return tasks, nil
}
// ListInProgressTasks returns a list of tasks that are being processed.
func (i *Inspector) ListInProgressTasks() ([]*InProgressTask, error) {
// TODO(hibiken): Support pagination.
msgs, err := i.rdb.listInProgress()
if err != nil {
return nil, err
}
var tasks []*InProgressTask
for _, m := range msgs {
tasks = append(tasks, &InProgressTask{
ID: m.ID,
Type: m.Type,
Payload: m.Payload,
})
}
return tasks, nil
}
// ListScheduledTasks returns a list of tasks that are scheduled to
// be processed in the future.
func (i *Inspector) ListScheduledTasks() ([]*ScheduledTask, error) {
// TODO(hibiken): Support pagination.
return i.rdb.listScheduled()
}
// ListRetryTasks returns a list of tasks what will be retried in the
// future.
func (i *Inspector) ListRetryTasks() ([]*RetryTask, error) {
// TODO(hibiken): Support pagination.
return i.rdb.listRetry()
}
// ListDeadTasks returns a list of tasks that have reached its
// maximum retry limit.
func (i *Inspector) ListDeadTasks() ([]*DeadTask, error) {
// TODO(hibiken): Support pagination.
return i.rdb.listDead()
}

View File

@ -1,503 +0,0 @@
package asynq
import (
"sort"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
)
// ---- TODO(hibiken): Remove this once the new version is released (https://github.com/google/go-cmp/issues/166) ----
// EquateApproxTime returns a Comparer options that
// determine two time.Time values to be equal if they
// are within the given time interval of one another.
// Note that if both times have a monotonic clock reading,
// the monotonic time difference will be used.
//
// The zero time is treated specially: it is only considered
// equal to another zero time value.
//
// It will panic if margin is negative.
func EquateApproxTime(margin time.Duration) cmp.Option {
if margin < 0 {
panic("negative duration in EquateApproxTime")
}
return cmp.FilterValues(func(x, y time.Time) bool {
return !x.IsZero() && !y.IsZero()
}, cmp.Comparer(timeApproximator{margin}.compare))
}
type timeApproximator struct {
margin time.Duration
}
func (a timeApproximator) compare(x, y time.Time) bool {
// Avoid subtracting times to avoid overflow when the
// difference is larger than the largest representible duration.
if x.After(y) {
// Ensure x is always before y
x, y = y, x
}
// We're within the margin if x+margin >= y.
// Note: time.Time doesn't have AfterOrEqual method hence the negation.
return !x.Add(a.margin).Before(y)
}
//-----------------------------
func TestCurrentStats(t *testing.T) {
r := setup(t)
inspector := &Inspector{r}
t1 := randomTask("send_email", "default", nil)
t2 := randomTask("send_email", "default", nil)
t3 := randomTask("gen_export", "default", nil)
t4 := randomTask("gen_thumbnail", "default", nil)
t5 := randomTask("send_email", "default", nil)
tests := []struct {
queue []*taskMessage
inProgress []*taskMessage
scheduled []*taskMessage
retry []*taskMessage
dead []*taskMessage
want *Stats
}{
{
queue: []*taskMessage{t1},
inProgress: []*taskMessage{t2, t3},
scheduled: []*taskMessage{t4},
retry: []*taskMessage{},
dead: []*taskMessage{t5},
want: &Stats{
Queued: 1,
InProgress: 2,
Scheduled: 1,
Retry: 0,
Dead: 1,
},
},
{
queue: []*taskMessage{},
inProgress: []*taskMessage{},
scheduled: []*taskMessage{t1, t2, t4},
retry: []*taskMessage{t3},
dead: []*taskMessage{t5},
want: &Stats{
Queued: 0,
InProgress: 0,
Scheduled: 3,
Retry: 1,
Dead: 1,
},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
for _, msg := range tc.queue {
err := r.client.LPush(defaultQueue, mustMarshal(t, msg)).Err()
if err != nil {
t.Fatal(err)
}
}
for _, msg := range tc.inProgress {
err := r.client.LPush(inProgress, mustMarshal(t, msg)).Err()
if err != nil {
t.Fatal(err)
}
}
for _, msg := range tc.scheduled {
err := r.client.ZAdd(scheduled, &redis.Z{
Member: mustMarshal(t, msg),
Score: float64(time.Now().Add(time.Hour).Unix()),
}).Err()
if err != nil {
t.Fatal(err)
}
}
for _, msg := range tc.retry {
err := r.client.ZAdd(retry, &redis.Z{
Member: mustMarshal(t, msg),
Score: float64(time.Now().Add(time.Hour).Unix()),
}).Err()
if err != nil {
t.Fatal(err)
}
}
for _, msg := range tc.dead {
err := r.client.ZAdd(dead, &redis.Z{
Member: mustMarshal(t, msg),
Score: float64(time.Now().Unix()),
}).Err()
if err != nil {
t.Fatal(err)
}
}
got, err := inspector.CurrentStats()
if err != nil {
t.Error(err)
continue
}
ignoreOpt := cmpopts.IgnoreFields(*tc.want, "Timestamp")
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("(*Inspector).CurrentStats() = %+v, want %+v; (-want, +got)\n%s",
got, tc.want, diff)
continue
}
}
}
func TestListEnqueuedTasks(t *testing.T) {
r := setup(t)
inspector := &Inspector{r}
m1 := randomTask("send_email", "default", nil)
m2 := randomTask("send_email", "default", nil)
m3 := randomTask("gen_export", "default", nil)
t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload}
t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload}
t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload}
tests := []struct {
queued []*taskMessage
want []*EnqueuedTask
}{
{
queued: []*taskMessage{m1, m2, m3},
want: []*EnqueuedTask{t1, t2, t3},
},
{
queued: []*taskMessage{},
want: []*EnqueuedTask{},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
for _, msg := range tc.queued {
err := r.client.LPush(defaultQueue, mustMarshal(t, msg)).Err()
if err != nil {
t.Fatal(err)
}
}
got, err := inspector.ListEnqueuedTasks()
if err != nil {
t.Error(err)
continue
}
sortOpt := cmp.Transformer("SortEnqueuedTasks", func(in []*EnqueuedTask) []*EnqueuedTask {
out := append([]*EnqueuedTask(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
})
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("(*Inspector).ListEnqueuedTasks = %v, want %v; (-want, +got)\n%s",
got, tc.want, diff)
continue
}
}
}
func TestListInProgressTasks(t *testing.T) {
r := setup(t)
inspector := &Inspector{r}
m1 := randomTask("send_email", "default", nil)
m2 := randomTask("send_email", "default", nil)
m3 := randomTask("gen_export", "default", nil)
t1 := &InProgressTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload}
t2 := &InProgressTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload}
t3 := &InProgressTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload}
tests := []struct {
inProgress []*taskMessage
want []*InProgressTask
}{
{
inProgress: []*taskMessage{m1, m2, m3},
want: []*InProgressTask{t1, t2, t3},
},
{
inProgress: []*taskMessage{},
want: []*InProgressTask{},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
for _, msg := range tc.inProgress {
err := r.client.LPush(inProgress, mustMarshal(t, msg)).Err()
if err != nil {
t.Fatal(err)
}
}
got, err := inspector.ListInProgressTasks()
if err != nil {
t.Error(err)
continue
}
sortOpt := cmp.Transformer("SortInProgressTasks", func(in []*InProgressTask) []*InProgressTask {
out := append([]*InProgressTask(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
})
return out
})
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("(*Inspector).ListInProgressTasks = %v, want %v; (-want, +got)\n%s",
got, tc.want, diff)
continue
}
}
}
func TestListScheduledTasks(t *testing.T) {
r := setup(t)
inspector := &Inspector{r}
m1 := randomTask("send_email", "default", nil)
m2 := randomTask("send_email", "default", nil)
m3 := randomTask("gen_export", "default", nil)
t1 := time.Now().Add(5 * time.Minute)
t2 := time.Now().Add(time.Hour)
t3 := time.Now().Add(24 * time.Hour)
s1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: t1}
s2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: t2}
s3 := &ScheduledTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload, ProcessAt: t3}
type scheduledMsg struct {
msg *taskMessage
processAt time.Time
}
tests := []struct {
scheduled []scheduledMsg
want []*ScheduledTask
}{
{
scheduled: []scheduledMsg{
{m1, t1},
{m2, t2},
{m3, t3},
},
want: []*ScheduledTask{s1, s2, s3},
},
{
scheduled: []scheduledMsg{},
want: []*ScheduledTask{},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
for _, s := range tc.scheduled {
err := r.client.ZAdd(scheduled, &redis.Z{Member: mustMarshal(t, s.msg), Score: float64(s.processAt.Unix())}).Err()
if err != nil {
t.Fatal(err)
}
}
got, err := inspector.ListScheduledTasks()
if err != nil {
t.Error(err)
continue
}
sortOpt := cmp.Transformer("SortScheduledTasks", func(in []*ScheduledTask) []*ScheduledTask {
out := append([]*ScheduledTask(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
})
return out
})
timeCmpOpt := EquateApproxTime(time.Second)
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("(*Inspector).ListScheduledTasks = %v, want %v; (-want, +got)\n%s",
got, tc.want, diff)
continue
}
}
}
func TestListRetryTasks(t *testing.T) {
r := setup(t)
inspector := &Inspector{r}
m1 := &taskMessage{
ID: uuid.New(),
Type: "send_email",
Payload: map[string]interface{}{"to": "customer@example.com"},
ErrorMsg: "couldn't send email",
Retry: 25,
Retried: 10,
}
m2 := &taskMessage{
ID: uuid.New(),
Type: "gen_thumbnail",
Payload: map[string]interface{}{"src": "some/path/to/img/file"},
ErrorMsg: "couldn't find a file",
Retry: 20,
Retried: 3,
}
t1 := time.Now().Add(time.Hour)
t2 := time.Now().Add(24 * time.Hour)
r1 := &RetryTask{
ID: m1.ID,
Type: m1.Type,
Payload: m1.Payload,
ProcessAt: t1,
ErrorMsg: m1.ErrorMsg,
Retry: m1.Retry,
Retried: m1.Retried,
}
r2 := &RetryTask{
ID: m2.ID,
Type: m2.Type,
Payload: m2.Payload,
ProcessAt: t2,
ErrorMsg: m2.ErrorMsg,
Retry: m2.Retry,
Retried: m2.Retried,
}
type retryEntry struct {
msg *taskMessage
processAt time.Time
}
tests := []struct {
retry []retryEntry
want []*RetryTask
}{
{
retry: []retryEntry{
{m1, t1},
{m2, t2},
},
want: []*RetryTask{r1, r2},
},
{
retry: []retryEntry{},
want: []*RetryTask{},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
for _, e := range tc.retry {
err := r.client.ZAdd(retry, &redis.Z{
Member: mustMarshal(t, e.msg),
Score: float64(e.processAt.Unix())}).Err()
if err != nil {
t.Fatal(err)
}
}
got, err := inspector.ListRetryTasks()
if err != nil {
t.Error(err)
continue
}
sortOpt := cmp.Transformer("SortRetryTasks", func(in []*RetryTask) []*RetryTask {
out := append([]*RetryTask(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
})
return out
})
timeCmpOpt := EquateApproxTime(time.Second)
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("(*Inspector).ListRetryTasks = %v, want %v; (-want, +got)\n%s",
got, tc.want, diff)
continue
}
}
}
func TestListDeadTasks(t *testing.T) {
r := setup(t)
inspector := &Inspector{r}
m1 := &taskMessage{ID: uuid.New(), Type: "send_email", Payload: map[string]interface{}{"to": "customer@example.com"}, ErrorMsg: "couldn't send email"}
m2 := &taskMessage{ID: uuid.New(), Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "path/to/img/file"}, ErrorMsg: "couldn't find file"}
t1 := time.Now().Add(-5 * time.Second)
t2 := time.Now().Add(-24 * time.Hour)
d1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ErrorMsg: m1.ErrorMsg, LastFailedAt: t1}
d2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ErrorMsg: m2.ErrorMsg, LastFailedAt: t2}
type deadEntry struct {
msg *taskMessage
lastFailedAt time.Time
}
tests := []struct {
dead []deadEntry
want []*DeadTask
}{
{
dead: []deadEntry{
{m1, t1},
{m2, t2},
},
want: []*DeadTask{d1, d2},
},
{
dead: []deadEntry{},
want: []*DeadTask{},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err)
}
for _, d := range tc.dead {
err := r.client.ZAdd(dead, &redis.Z{
Member: mustMarshal(t, d.msg),
Score: float64(d.lastFailedAt.Unix())}).Err()
if err != nil {
t.Fatal(err)
}
}
got, err := inspector.ListDeadTasks()
if err != nil {
t.Errorf("(*Inspector).ListDeadTask = %v, %v; want %v, nil", got, err, tc.want)
continue
}
sortOpt := cmp.Transformer("SortDeadTasks", func(in []*DeadTask) []*DeadTask {
out := append([]*DeadTask(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
})
return out
})
timeCmpOpt := EquateApproxTime(time.Second)
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
t.Errorf("(*Inspector).ListDeadTasks = %v, want %v; (-want, +got)\n%s",
got, tc.want, diff)
continue
}
}
}

View File

@ -1,4 +1,4 @@
package asynq package rdb
import ( import (
"encoding/json" "encoding/json"
@ -8,38 +8,124 @@ import (
"time" "time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/google/uuid"
) )
// Redis keys // Redis keys
const ( const (
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
defaultQueue = queuePrefix + "default" // LIST
allQueues = "asynq:queues" // SET allQueues = "asynq:queues" // SET
scheduled = "asynq:scheduled" // ZSET queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
retry = "asynq:retry" // ZSET DefaultQueue = queuePrefix + "default" // LIST
dead = "asynq:dead" // ZSET Scheduled = "asynq:scheduled" // ZSET
inProgress = "asynq:in_progress" // SET Retry = "asynq:retry" // ZSET
Dead = "asynq:dead" // ZSET
InProgress = "asynq:in_progress" // SET
) )
var errDequeueTimeout = errors.New("blocking dequeue operation timed out") var ErrDequeueTimeout = errors.New("blocking dequeue operation timed out")
// rdb encapsulates the interactions with redis server. // RDB encapsulates the interactions with redis server.
type rdb struct { type RDB struct {
client *redis.Client client *redis.Client
} }
func newRDB(config *RedisConfig) *rdb { // NewRDB returns a new instance of RDB.
client := redis.NewClient(&redis.Options{ func NewRDB(client *redis.Client) *RDB {
Addr: config.Addr, return &RDB{client}
Password: config.Password,
DB: config.DB,
})
return &rdb{client}
} }
// enqueue inserts the given task to the end of the queue. // TaskMessage is an internal representation of a task with additional metadata fields.
// This data gets written in redis.
type TaskMessage struct {
//-------- Task fields --------
Type string
Payload map[string]interface{}
//-------- metadata fields --------
// unique identifier for each task
ID uuid.UUID
// queue name this message should be enqueued to
Queue string
// max number of retry for this task.
Retry int
// number of times we've retried so far
Retried int
// error message from the last failure
ErrorMsg string
}
// Stats represents a state of queues at a certain time.
type Stats struct {
Queued int
InProgress int
Scheduled int
Retry int
Dead int
Timestamp time.Time
}
// EnqueuedTask is a task in a queue and is ready to be processed.
// This is read only and used for inspection purpose.
type EnqueuedTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
}
// InProgressTask is a task that's currently being processed.
// This is read only and used for inspection purpose.
type InProgressTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
}
// ScheduledTask is a task that's scheduled to be processed in the future.
// This is read only and used for inspection purpose.
type ScheduledTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
ProcessAt time.Time
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
// This is read only and used for inspection purpose.
type RetryTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
// TODO(hibiken): add LastFailedAt time.Time
ProcessAt time.Time
ErrorMsg string
Retried int
Retry int
}
// DeadTask is a task in that has exhausted all retries.
// This is read only and used for inspection purpose.
type DeadTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
LastFailedAt time.Time
ErrorMsg string
}
// Close closes the connection with redis server.
func (r *RDB) Close() error {
return r.client.Close()
}
// Enqueue inserts the given task to the end of the queue.
// It also adds the queue name to the "all-queues" list. // It also adds the queue name to the "all-queues" list.
func (r *rdb) enqueue(msg *taskMessage) error { func (r *RDB) Enqueue(msg *TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
@ -55,18 +141,18 @@ func (r *rdb) enqueue(msg *taskMessage) error {
return nil return nil
} }
// dequeue blocks until there is a task available to be processed, // Dequeue blocks until there is a task available to be processed,
// once a task is available, it adds the task to "in progress" list // once a task is available, it adds the task to "in progress" list
// and returns the task. // and returns the task.
func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) { func (r *RDB) Dequeue(qname string, timeout time.Duration) (*TaskMessage, error) {
data, err := r.client.BRPopLPush(qname, inProgress, timeout).Result() data, err := r.client.BRPopLPush(qname, InProgress, timeout).Result()
if err == redis.Nil { if err == redis.Nil {
return nil, errDequeueTimeout return nil, ErrDequeueTimeout
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, inProgress, timeout, err) return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, InProgress, timeout, err)
} }
var msg taskMessage var msg TaskMessage
err = json.Unmarshal([]byte(data), &msg) err = json.Unmarshal([]byte(data), &msg)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err) return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err)
@ -75,8 +161,8 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error)
return &msg, nil return &msg, nil
} }
// remove deletes all elements equal to msg from a redis list with the given key. // Remove deletes all elements equal to msg from a redis list with the given key.
func (r *rdb) remove(key string, msg *taskMessage) error { func (r *RDB) Remove(key string, msg *TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
@ -89,8 +175,8 @@ func (r *rdb) remove(key string, msg *taskMessage) error {
return nil return nil
} }
// schedule adds the task to the zset to be processd at the specified time. // Schedule adds the task to the zset to be processd at the specified time.
func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error { func (r *RDB) Schedule(zset string, processAt time.Time, msg *TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
@ -106,25 +192,25 @@ func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error
const maxDeadTask = 100 const maxDeadTask = 100
const deadExpirationInDays = 90 const deadExpirationInDays = 90
// kill sends the taskMessage to "dead" set. // Kill sends the taskMessage to "dead" set.
// It also trims the sorted set by timestamp and set size. // It also trims the sorted set by timestamp and set size.
func (r *rdb) kill(msg *taskMessage) error { func (r *RDB) Kill(msg *TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
now := time.Now() now := time.Now()
pipe := r.client.Pipeline() pipe := r.client.Pipeline()
pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) pipe.ZAdd(Dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit))) pipe.ZRemRangeByScore(Dead, "-inf", strconv.Itoa(int(limit)))
pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100 pipe.ZRemRangeByRank(Dead, 0, -maxDeadTask) // trim the set to 100
_, err = pipe.Exec() _, err = pipe.Exec()
return err return err
} }
// moveAll moves all tasks from src list to dst list. // MoveAll moves all tasks from src list to dst list.
func (r *rdb) moveAll(src, dst string) error { func (r *RDB) MoveAll(src, dst string) error {
script := redis.NewScript(` script := redis.NewScript(`
local len = redis.call("LLEN", KEYS[1]) local len = redis.call("LLEN", KEYS[1])
for i = len, 1, -1 do for i = len, 1, -1 do
@ -136,10 +222,10 @@ func (r *rdb) moveAll(src, dst string) error {
return err return err
} }
// forward moves all tasks with a score less than the current unix time // Forward moves all tasks with a score less than the current unix time
// from the given zset to the default queue. // from the given zset to the default queue.
// TODO(hibiken): Find a better method name that reflects what this does. // TODO(hibiken): Find a better method name that reflects what this does.
func (r *rdb) forward(from string) error { func (r *RDB) Forward(from string) error {
script := redis.NewScript(` script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do for _, msg in ipairs(msgs) do
@ -150,18 +236,19 @@ func (r *rdb) forward(from string) error {
return msgs return msgs
`) `)
now := float64(time.Now().Unix()) now := float64(time.Now().Unix())
res, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result() res, err := script.Run(r.client, []string{from, allQueues, DefaultQueue}, now).Result()
fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from) fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from)
return err return err
} }
func (r *rdb) currentStats() (*Stats, error) { // CurrentStats returns a current state of the queues.
func (r *RDB) CurrentStats() (*Stats, error) {
pipe := r.client.Pipeline() pipe := r.client.Pipeline()
qlen := pipe.LLen(defaultQueue) qlen := pipe.LLen(DefaultQueue)
plen := pipe.LLen(inProgress) plen := pipe.LLen(InProgress)
slen := pipe.ZCard(scheduled) slen := pipe.ZCard(Scheduled)
rlen := pipe.ZCard(retry) rlen := pipe.ZCard(Retry)
dlen := pipe.ZCard(dead) dlen := pipe.ZCard(Dead)
_, err := pipe.Exec() _, err := pipe.Exec()
if err != nil { if err != nil {
return nil, err return nil, err
@ -176,16 +263,16 @@ func (r *rdb) currentStats() (*Stats, error) {
}, nil }, nil
} }
func (r *rdb) listEnqueued() ([]*taskMessage, error) { func (r *RDB) ListEnqueued() ([]*TaskMessage, error) {
return r.rangeList(defaultQueue) return r.rangeList(DefaultQueue)
} }
func (r *rdb) listInProgress() ([]*taskMessage, error) { func (r *RDB) ListInProgress() ([]*TaskMessage, error) {
return r.rangeList(inProgress) return r.rangeList(InProgress)
} }
func (r *rdb) listScheduled() ([]*ScheduledTask, error) { func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(scheduled, 0, -1).Result() data, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -195,7 +282,7 @@ func (r *rdb) listScheduled() ([]*ScheduledTask, error) {
if !ok { if !ok {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
var msg taskMessage var msg TaskMessage
err := json.Unmarshal([]byte(s), &msg) err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
@ -211,8 +298,8 @@ func (r *rdb) listScheduled() ([]*ScheduledTask, error) {
return tasks, nil return tasks, nil
} }
func (r *rdb) listRetry() ([]*RetryTask, error) { func (r *RDB) ListRetry() ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(retry, 0, -1).Result() data, err := r.client.ZRangeWithScores(Retry, 0, -1).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -222,7 +309,7 @@ func (r *rdb) listRetry() ([]*RetryTask, error) {
if !ok { if !ok {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
var msg taskMessage var msg TaskMessage
err := json.Unmarshal([]byte(s), &msg) err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
@ -241,8 +328,8 @@ func (r *rdb) listRetry() ([]*RetryTask, error) {
return tasks, nil return tasks, nil
} }
func (r *rdb) listDead() ([]*DeadTask, error) { func (r *RDB) ListDead() ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(dead, 0, -1).Result() data, err := r.client.ZRangeWithScores(Dead, 0, -1).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -252,7 +339,7 @@ func (r *rdb) listDead() ([]*DeadTask, error) {
if !ok { if !ok {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
var msg taskMessage var msg TaskMessage
err := json.Unmarshal([]byte(s), &msg) err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
@ -269,7 +356,7 @@ func (r *rdb) listDead() ([]*DeadTask, error) {
return tasks, nil return tasks, nil
} }
func (r *rdb) rangeList(key string) ([]*taskMessage, error) { func (r *RDB) rangeList(key string) ([]*TaskMessage, error) {
data, err := r.client.LRange(key, 0, -1).Result() data, err := r.client.LRange(key, 0, -1).Result()
if err != nil { if err != nil {
return nil, err return nil, err
@ -277,7 +364,7 @@ func (r *rdb) rangeList(key string) ([]*taskMessage, error) {
return r.toMessageSlice(data), nil return r.toMessageSlice(data), nil
} }
func (r *rdb) rangeZSet(key string) ([]*taskMessage, error) { func (r *RDB) rangeZSet(key string) ([]*TaskMessage, error) {
data, err := r.client.ZRange(key, 0, -1).Result() data, err := r.client.ZRange(key, 0, -1).Result()
if err != nil { if err != nil {
return nil, err return nil, err
@ -286,10 +373,10 @@ func (r *rdb) rangeZSet(key string) ([]*taskMessage, error) {
} }
// toMessageSlice convers json strings to a slice of task messages. // toMessageSlice convers json strings to a slice of task messages.
func (r *rdb) toMessageSlice(data []string) []*taskMessage { func (r *RDB) toMessageSlice(data []string) []*TaskMessage {
var msgs []*taskMessage var msgs []*TaskMessage
for _, s := range data { for _, s := range data {
var msg taskMessage var msg TaskMessage
err := json.Unmarshal([]byte(s), &msg) err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
// bad data; ignore and continue // bad data; ignore and continue

View File

@ -1,18 +1,94 @@
package asynq package rdb
import ( import (
"encoding/json"
"fmt" "fmt"
"math/rand"
"sort"
"testing" "testing"
"time" "time"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/uuid"
) )
func init() {
rand.Seed(time.Now().UnixNano())
}
func setup(t *testing.T) *RDB {
t.Helper()
r := NewRDB(redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 15,
}))
// Start each test with a clean slate.
if err := r.client.FlushDB().Err(); err != nil {
panic(err)
}
return r
}
var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessage {
out := append([]*TaskMessage(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].ID.String() < out[j].ID.String()
})
return out
})
func randomTask(taskType, qname string, payload map[string]interface{}) *TaskMessage {
return &TaskMessage{
ID: uuid.New(),
Type: taskType,
Queue: qname,
Retry: 25,
Payload: make(map[string]interface{}),
}
}
func mustMarshal(t *testing.T, task *TaskMessage) string {
t.Helper()
data, err := json.Marshal(task)
if err != nil {
t.Fatal(err)
}
return string(data)
}
func mustUnmarshal(t *testing.T, data string) *TaskMessage {
t.Helper()
var task TaskMessage
err := json.Unmarshal([]byte(data), &task)
if err != nil {
t.Fatal(err)
}
return &task
}
func mustMarshalSlice(t *testing.T, tasks []*TaskMessage) []string {
t.Helper()
var data []string
for _, task := range tasks {
data = append(data, mustMarshal(t, task))
}
return data
}
func mustUnmarshalSlice(t *testing.T, data []string) []*TaskMessage {
t.Helper()
var tasks []*TaskMessage
for _, s := range data {
tasks = append(tasks, mustUnmarshal(t, s))
}
return tasks
}
func TestEnqueue(t *testing.T) { func TestEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
tests := []struct { tests := []struct {
msg *taskMessage msg *TaskMessage
}{ }{
{msg: randomTask("send_email", "default", {msg: randomTask("send_email", "default",
map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})}, map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})},
@ -26,18 +102,18 @@ func TestEnqueue(t *testing.T) {
if err := r.client.FlushDB().Err(); err != nil { if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
err := r.enqueue(tc.msg) err := r.Enqueue(tc.msg)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue
} }
res := r.client.LRange(defaultQueue, 0, -1).Val() res := r.client.LRange(DefaultQueue, 0, -1).Val()
if len(res) != 1 { if len(res) != 1 {
t.Errorf("LIST %q has length %d, want 1", defaultQueue, len(res)) t.Errorf("LIST %q has length %d, want 1", DefaultQueue, len(res))
continue continue
} }
if !r.client.SIsMember(allQueues, defaultQueue).Val() { if !r.client.SIsMember(allQueues, DefaultQueue).Val() {
t.Errorf("SISMEMBER %q %q = false, want true", allQueues, defaultQueue) t.Errorf("SISMEMBER %q %q = false, want true", allQueues, DefaultQueue)
} }
if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" { if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" {
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
@ -49,13 +125,13 @@ func TestDequeue(t *testing.T) {
r := setup(t) r := setup(t)
t1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello!"}) t1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello!"})
tests := []struct { tests := []struct {
queued []*taskMessage queued []*TaskMessage
want *taskMessage want *TaskMessage
err error err error
inProgress int64 // length of "in-progress" tasks after dequeue inProgress int64 // length of "in-progress" tasks after dequeue
}{ }{
{queued: []*taskMessage{t1}, want: t1, err: nil, inProgress: 1}, {queued: []*TaskMessage{t1}, want: t1, err: nil, inProgress: 1},
{queued: []*taskMessage{}, want: nil, err: errDequeueTimeout, inProgress: 0}, {queued: []*TaskMessage{}, want: nil, err: ErrDequeueTimeout, inProgress: 0},
} }
for _, tc := range tests { for _, tc := range tests {
@ -64,16 +140,16 @@ func TestDequeue(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
for _, m := range tc.queued { for _, m := range tc.queued {
r.enqueue(m) r.Enqueue(m)
} }
got, err := r.dequeue(defaultQueue, time.Second) got, err := r.Dequeue(DefaultQueue, time.Second)
if !cmp.Equal(got, tc.want) || err != tc.err { if !cmp.Equal(got, tc.want) || err != tc.err {
t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v", t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v",
defaultQueue, got, err, tc.want, tc.err) DefaultQueue, got, err, tc.want, tc.err)
continue continue
} }
if l := r.client.LLen(inProgress).Val(); l != tc.inProgress { if l := r.client.LLen(InProgress).Val(); l != tc.inProgress {
t.Errorf("LIST %q has length %d, want %d", inProgress, l, tc.inProgress) t.Errorf("LIST %q has length %d, want %d", InProgress, l, tc.inProgress)
} }
} }
} }
@ -84,24 +160,24 @@ func TestRemove(t *testing.T) {
t2 := randomTask("export_csv", "csv", nil) t2 := randomTask("export_csv", "csv", nil)
tests := []struct { tests := []struct {
initial []*taskMessage // initial state of the list initial []*TaskMessage // initial state of the list
target *taskMessage // task to remove target *TaskMessage // task to remove
final []*taskMessage // final state of the list final []*TaskMessage // final state of the list
}{ }{
{ {
initial: []*taskMessage{t1, t2}, initial: []*TaskMessage{t1, t2},
target: t1, target: t1,
final: []*taskMessage{t2}, final: []*TaskMessage{t2},
}, },
{ {
initial: []*taskMessage{t2}, initial: []*TaskMessage{t2},
target: t1, target: t1,
final: []*taskMessage{t2}, final: []*TaskMessage{t2},
}, },
{ {
initial: []*taskMessage{t1}, initial: []*TaskMessage{t1},
target: t1, target: t1,
final: []*taskMessage{}, final: []*TaskMessage{},
}, },
} }
@ -112,26 +188,26 @@ func TestRemove(t *testing.T) {
} }
// set up initial state // set up initial state
for _, task := range tc.initial { for _, task := range tc.initial {
err := r.client.LPush(defaultQueue, mustMarshal(t, task)).Err() err := r.client.LPush(DefaultQueue, mustMarshal(t, task)).Err()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
err := r.remove(defaultQueue, tc.target) err := r.Remove(DefaultQueue, tc.target)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue
} }
var got []*taskMessage var got []*TaskMessage
data := r.client.LRange(defaultQueue, 0, -1).Val() data := r.client.LRange(DefaultQueue, 0, -1).Val()
for _, s := range data { for _, s := range data {
got = append(got, mustUnmarshal(t, s)) got = append(got, mustUnmarshal(t, s))
} }
if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", defaultQueue, diff) t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", DefaultQueue, diff)
continue continue
} }
} }
@ -143,14 +219,14 @@ func TestKill(t *testing.T) {
// TODO(hibiken): add test cases for trimming // TODO(hibiken): add test cases for trimming
tests := []struct { tests := []struct {
initial []*taskMessage // inital state of "dead" set initial []*TaskMessage // inital state of "dead" set
target *taskMessage // task to kill target *TaskMessage // task to kill
want []*taskMessage // final state of "dead" set want []*TaskMessage // final state of "dead" set
}{ }{
{ {
initial: []*taskMessage{}, initial: []*TaskMessage{},
target: t1, target: t1,
want: []*taskMessage{t1}, want: []*TaskMessage{t1},
}, },
} }
@ -161,22 +237,22 @@ func TestKill(t *testing.T) {
} }
// set up initial state // set up initial state
for _, task := range tc.initial { for _, task := range tc.initial {
err := r.client.ZAdd(dead, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err() err := r.client.ZAdd(Dead, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
err := r.kill(tc.target) err := r.Kill(tc.target)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue
} }
actual := r.client.ZRange(dead, 0, -1).Val() actual := r.client.ZRange(Dead, 0, -1).Val()
got := mustUnmarshalSlice(t, actual) got := mustUnmarshalSlice(t, actual)
if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", dead, diff) t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", Dead, diff)
continue continue
} }
} }
@ -189,28 +265,28 @@ func TestMoveAll(t *testing.T) {
t3 := randomTask("sync_stuff", "sync", nil) t3 := randomTask("sync_stuff", "sync", nil)
tests := []struct { tests := []struct {
beforeSrc []*taskMessage beforeSrc []*TaskMessage
beforeDst []*taskMessage beforeDst []*TaskMessage
afterSrc []*taskMessage afterSrc []*TaskMessage
afterDst []*taskMessage afterDst []*TaskMessage
}{ }{
{ {
beforeSrc: []*taskMessage{t1, t2, t3}, beforeSrc: []*TaskMessage{t1, t2, t3},
beforeDst: []*taskMessage{}, beforeDst: []*TaskMessage{},
afterSrc: []*taskMessage{}, afterSrc: []*TaskMessage{},
afterDst: []*taskMessage{t1, t2, t3}, afterDst: []*TaskMessage{t1, t2, t3},
}, },
{ {
beforeSrc: []*taskMessage{}, beforeSrc: []*TaskMessage{},
beforeDst: []*taskMessage{t1, t2, t3}, beforeDst: []*TaskMessage{t1, t2, t3},
afterSrc: []*taskMessage{}, afterSrc: []*TaskMessage{},
afterDst: []*taskMessage{t1, t2, t3}, afterDst: []*TaskMessage{t1, t2, t3},
}, },
{ {
beforeSrc: []*taskMessage{t2, t3}, beforeSrc: []*TaskMessage{t2, t3},
beforeDst: []*taskMessage{t1}, beforeDst: []*TaskMessage{t1},
afterSrc: []*taskMessage{}, afterSrc: []*TaskMessage{},
afterDst: []*taskMessage{t1, t2, t3}, afterDst: []*TaskMessage{t1, t2, t3},
}, },
} }
@ -222,63 +298,63 @@ func TestMoveAll(t *testing.T) {
} }
// seed src list. // seed src list.
for _, msg := range tc.beforeSrc { for _, msg := range tc.beforeSrc {
r.client.LPush(inProgress, mustMarshal(t, msg)) r.client.LPush(InProgress, mustMarshal(t, msg))
} }
// seed dst list. // seed dst list.
for _, msg := range tc.beforeDst { for _, msg := range tc.beforeDst {
r.client.LPush(defaultQueue, mustMarshal(t, msg)) r.client.LPush(DefaultQueue, mustMarshal(t, msg))
} }
if err := r.moveAll(inProgress, defaultQueue); err != nil { if err := r.MoveAll(InProgress, DefaultQueue); err != nil {
t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", inProgress, defaultQueue, err) t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err)
continue continue
} }
src := r.client.LRange(inProgress, 0, -1).Val() src := r.client.LRange(InProgress, 0, -1).Val()
gotSrc := mustUnmarshalSlice(t, src) gotSrc := mustUnmarshalSlice(t, src)
if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgress, diff) t.Errorf("mismatch found in %q (-want, +got)\n%s", InProgress, diff)
} }
dst := r.client.LRange(defaultQueue, 0, -1).Val() dst := r.client.LRange(DefaultQueue, 0, -1).Val()
gotDst := mustUnmarshalSlice(t, dst) gotDst := mustUnmarshalSlice(t, dst)
if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQueue, diff) t.Errorf("mismatch found in %q (-want, +got)\n%s", DefaultQueue, diff)
} }
} }
} }
func TestForward(t *testing.T) { func TestForward(t *testing.T) {
r := setup(t) r := setup(t)
t1 := randomTask("send_email", defaultQueue, nil) t1 := randomTask("send_email", "default", nil)
t2 := randomTask("generate_csv", defaultQueue, nil) t2 := randomTask("generate_csv", "default", nil)
secondAgo := time.Now().Add(-time.Second) secondAgo := time.Now().Add(-time.Second)
hourFromNow := time.Now().Add(time.Hour) hourFromNow := time.Now().Add(time.Hour)
tests := []struct { tests := []struct {
tasks []*redis.Z // scheduled tasks with timestamp as a score tasks []*redis.Z // scheduled tasks with timestamp as a score
wantQueued []*taskMessage // queue after calling forward wantQueued []*TaskMessage // queue after calling forward
wantScheduled []*taskMessage // scheduled queue after calling forward wantScheduled []*TaskMessage // scheduled queue after calling forward
}{ }{
{ {
tasks: []*redis.Z{ tasks: []*redis.Z{
&redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())}, &redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())},
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
wantQueued: []*taskMessage{t1, t2}, wantQueued: []*TaskMessage{t1, t2},
wantScheduled: []*taskMessage{}, wantScheduled: []*TaskMessage{},
}, },
{ {
tasks: []*redis.Z{ tasks: []*redis.Z{
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
wantQueued: []*taskMessage{t2}, wantQueued: []*TaskMessage{t2},
wantScheduled: []*taskMessage{t1}, wantScheduled: []*TaskMessage{t1},
}, },
{ {
tasks: []*redis.Z{ tasks: []*redis.Z{
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
&redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}},
wantQueued: []*taskMessage{}, wantQueued: []*TaskMessage{},
wantScheduled: []*taskMessage{t1, t2}, wantScheduled: []*TaskMessage{t1, t2},
}, },
} }
@ -287,27 +363,25 @@ func TestForward(t *testing.T) {
if err := r.client.FlushDB().Err(); err != nil { if err := r.client.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := r.client.ZAdd(scheduled, tc.tasks...).Err(); err != nil { if err := r.client.ZAdd(Scheduled, tc.tasks...).Err(); err != nil {
t.Error(err) t.Error(err)
continue continue
} }
err := r.forward(scheduled) err := r.Forward(Scheduled)
if err != nil { if err != nil {
t.Errorf("(*rdb).forward(%q) = %v, want nil", scheduled, err) t.Errorf("(*rdb).forward(%q) = %v, want nil", Scheduled, err)
continue continue
} }
queued := r.client.LRange(defaultQueue, 0, -1).Val() queued := r.client.LRange(DefaultQueue, 0, -1).Val()
gotQueued := mustUnmarshalSlice(t, queued) gotQueued := mustUnmarshalSlice(t, queued)
if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" {
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQueue, len(gotQueued), len(tc.wantQueued), diff) t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", DefaultQueue, len(gotQueued), len(tc.wantQueued), diff)
continue
} }
scheduled := r.client.ZRangeByScore(scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val() scheduled := r.client.ZRangeByScore(Scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val()
gotScheduled := mustUnmarshalSlice(t, scheduled) gotScheduled := mustUnmarshalSlice(t, scheduled)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" {
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff) t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff)
continue
} }
} }
} }
@ -315,14 +389,14 @@ func TestForward(t *testing.T) {
func TestSchedule(t *testing.T) { func TestSchedule(t *testing.T) {
r := setup(t) r := setup(t)
tests := []struct { tests := []struct {
msg *taskMessage msg *TaskMessage
processAt time.Time processAt time.Time
zset string zset string
}{ }{
{ {
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}), randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
time.Now().Add(15 * time.Minute), time.Now().Add(15 * time.Minute),
scheduled, Scheduled,
}, },
} }
@ -332,7 +406,7 @@ func TestSchedule(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err := r.schedule(tc.zset, tc.processAt, tc.msg) err := r.Schedule(tc.zset, tc.processAt, tc.msg)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
continue continue

View File

@ -3,10 +3,12 @@ package asynq
import ( import (
"log" "log"
"time" "time"
"github.com/hibiken/asynq/internal/rdb"
) )
type poller struct { type poller struct {
rdb *rdb rdb *rdb.RDB
// channel to communicate back to the long running "poller" goroutine. // channel to communicate back to the long running "poller" goroutine.
done chan struct{} done chan struct{}
@ -18,9 +20,9 @@ type poller struct {
zsets []string zsets []string
} }
func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller { func newPoller(r *rdb.RDB, avgInterval time.Duration, zsets []string) *poller {
return &poller{ return &poller{
rdb: rdb, rdb: r,
done: make(chan struct{}), done: make(chan struct{}),
avgInterval: avgInterval, avgInterval: avgInterval,
zsets: zsets, zsets: zsets,
@ -51,7 +53,7 @@ func (p *poller) start() {
func (p *poller) exec() { func (p *poller) exec() {
for _, zset := range p.zsets { for _, zset := range p.zsets {
if err := p.rdb.forward(zset); err != nil { if err := p.rdb.Forward(zset); err != nil {
log.Printf("[ERROR] could not forward scheduled tasks from %q: %v\n", zset, err) log.Printf("[ERROR] could not forward scheduled tasks from %q: %v\n", zset, err)
} }
} }

View File

@ -5,29 +5,31 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/hibiken/asynq/internal/rdb"
) )
func TestPoller(t *testing.T) { func TestPoller(t *testing.T) {
type scheduledTask struct { type scheduledTask struct {
msg *taskMessage msg *rdb.TaskMessage
processAt time.Time processAt time.Time
} }
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second const pollInterval = time.Second
p := newPoller(r, pollInterval, []string{scheduled, retry}) p := newPoller(rdbClient, pollInterval, []string{rdb.Scheduled, rdb.Retry})
t1 := randomTask("gen_thumbnail", "default", nil) t1 := randomTask("gen_thumbnail", "default", nil)
t2 := randomTask("send_email", "default", nil) t2 := randomTask("send_email", "default", nil)
t3 := randomTask("reindex", "default", nil) t3 := randomTask("reindex", "default", nil)
t4 := randomTask("sync", "default", nil) t4 := randomTask("sync", "default", nil)
tests := []struct { tests := []struct {
initScheduled []scheduledTask // scheduled queue initial state initScheduled []scheduledTask // scheduled queue initial state
initRetry []scheduledTask // retry queue initial state initRetry []scheduledTask // retry queue initial state
initQueue []*taskMessage // default queue initial state initQueue []*rdb.TaskMessage // default queue initial state
wait time.Duration // wait duration before checking for final state wait time.Duration // wait duration before checking for final state
wantScheduled []*taskMessage // schedule queue final state wantScheduled []*rdb.TaskMessage // schedule queue final state
wantRetry []*taskMessage // retry queue final state wantRetry []*rdb.TaskMessage // retry queue final state
wantQueue []*taskMessage // default queue final state wantQueue []*rdb.TaskMessage // default queue final state
}{ }{
{ {
initScheduled: []scheduledTask{ initScheduled: []scheduledTask{
@ -37,11 +39,11 @@ func TestPoller(t *testing.T) {
initRetry: []scheduledTask{ initRetry: []scheduledTask{
{t3, time.Now().Add(-500 * time.Millisecond)}, {t3, time.Now().Add(-500 * time.Millisecond)},
}, },
initQueue: []*taskMessage{t4}, initQueue: []*rdb.TaskMessage{t4},
wait: pollInterval * 2, wait: pollInterval * 2,
wantScheduled: []*taskMessage{t1}, wantScheduled: []*rdb.TaskMessage{t1},
wantRetry: []*taskMessage{}, wantRetry: []*rdb.TaskMessage{},
wantQueue: []*taskMessage{t2, t3, t4}, wantQueue: []*rdb.TaskMessage{t2, t3, t4},
}, },
{ {
initScheduled: []scheduledTask{ initScheduled: []scheduledTask{
@ -50,36 +52,36 @@ func TestPoller(t *testing.T) {
{t3, time.Now().Add(-500 * time.Millisecond)}, {t3, time.Now().Add(-500 * time.Millisecond)},
}, },
initRetry: []scheduledTask{}, initRetry: []scheduledTask{},
initQueue: []*taskMessage{t4}, initQueue: []*rdb.TaskMessage{t4},
wait: pollInterval * 2, wait: pollInterval * 2,
wantScheduled: []*taskMessage{}, wantScheduled: []*rdb.TaskMessage{},
wantRetry: []*taskMessage{}, wantRetry: []*rdb.TaskMessage{},
wantQueue: []*taskMessage{t1, t2, t3, t4}, wantQueue: []*rdb.TaskMessage{t1, t2, t3, t4},
}, },
} }
for _, tc := range tests { for _, tc := range tests {
// clean up db before each test case. // clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil { if err := r.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// initialize scheduled queue // initialize scheduled queue
for _, st := range tc.initScheduled { for _, st := range tc.initScheduled {
err := r.schedule(scheduled, st.processAt, st.msg) err := rdbClient.Schedule(rdb.Scheduled, st.processAt, st.msg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
// initialize retry queue // initialize retry queue
for _, st := range tc.initRetry { for _, st := range tc.initRetry {
err := r.schedule(retry, st.processAt, st.msg) err := rdbClient.Schedule(rdb.Retry, st.processAt, st.msg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
// initialize default queue // initialize default queue
for _, msg := range tc.initQueue { for _, msg := range tc.initQueue {
err := r.enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -89,22 +91,22 @@ func TestPoller(t *testing.T) {
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
gotScheduledRaw := r.client.ZRange(scheduled, 0, -1).Val() gotScheduledRaw := r.ZRange(rdb.Scheduled, 0, -1).Val()
gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", scheduled, diff) t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.Scheduled, diff)
} }
gotRetryRaw := r.client.ZRange(retry, 0, -1).Val() gotRetryRaw := r.ZRange(rdb.Retry, 0, -1).Val()
gotRetry := mustUnmarshalSlice(t, gotRetryRaw) gotRetry := mustUnmarshalSlice(t, gotRetryRaw)
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", retry, diff) t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.Retry, diff)
} }
gotQueueRaw := r.client.LRange(defaultQueue, 0, -1).Val() gotQueueRaw := r.LRange(rdb.DefaultQueue, 0, -1).Val()
gotQueue := mustUnmarshalSlice(t, gotQueueRaw) gotQueue := mustUnmarshalSlice(t, gotQueueRaw)
if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", defaultQueue, diff) t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.DefaultQueue, diff)
} }
} }
} }

View File

@ -4,10 +4,12 @@ import (
"fmt" "fmt"
"log" "log"
"time" "time"
"github.com/hibiken/asynq/internal/rdb"
) )
type processor struct { type processor struct {
rdb *rdb rdb *rdb.RDB
handler Handler handler Handler
@ -24,9 +26,9 @@ type processor struct {
done chan struct{} done chan struct{}
} }
func newProcessor(rdb *rdb, numWorkers int, handler Handler) *processor { func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
return &processor{ return &processor{
rdb: rdb, rdb: r,
handler: handler, handler: handler,
dequeueTimeout: 5 * time.Second, dequeueTimeout: 5 * time.Second,
sema: make(chan struct{}, numWorkers), sema: make(chan struct{}, numWorkers),
@ -68,8 +70,8 @@ func (p *processor) start() {
// exec pulls a task out of the queue and starts a worker goroutine to // exec pulls a task out of the queue and starts a worker goroutine to
// process the task. // process the task.
func (p *processor) exec() { func (p *processor) exec() {
msg, err := p.rdb.dequeue(defaultQueue, p.dequeueTimeout) msg, err := p.rdb.Dequeue(rdb.DefaultQueue, p.dequeueTimeout)
if err == errDequeueTimeout { if err == rdb.ErrDequeueTimeout {
// timed out, this is a normal behavior. // timed out, this is a normal behavior.
return return
} }
@ -83,9 +85,9 @@ func (p *processor) exec() {
go func(task *Task) { go func(task *Task) {
// NOTE: This deferred anonymous function needs to take taskMessage as a value because // NOTE: This deferred anonymous function needs to take taskMessage as a value because
// the message can be mutated by the time this function is called. // the message can be mutated by the time this function is called.
defer func(msg taskMessage) { defer func(msg rdb.TaskMessage) {
if err := p.rdb.remove(inProgress, &msg); err != nil { if err := p.rdb.Remove(rdb.InProgress, &msg); err != nil {
log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err) log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, rdb.InProgress, err)
} }
<-p.sema // release token <-p.sema // release token
}(*msg) }(*msg)
@ -99,9 +101,9 @@ func (p *processor) exec() {
// restore moves all tasks from "in-progress" back to queue // restore moves all tasks from "in-progress" back to queue
// to restore all unfinished tasks. // to restore all unfinished tasks.
func (p *processor) restore() { func (p *processor) restore() {
err := p.rdb.moveAll(inProgress, defaultQueue) err := p.rdb.MoveAll(rdb.InProgress, rdb.DefaultQueue)
if err != nil { if err != nil {
log.Printf("[ERROR] could not move tasks from %q to %q\n", inProgress, defaultQueue) log.Printf("[ERROR] could not move tasks from %q to %q\n", rdb.InProgress, rdb.DefaultQueue)
} }
} }

View File

@ -7,10 +7,12 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/hibiken/asynq/internal/rdb"
) )
func TestProcessorSuccess(t *testing.T) { func TestProcessorSuccess(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r)
m1 := randomTask("send_email", "default", nil) m1 := randomTask("send_email", "default", nil)
m2 := randomTask("gen_thumbnail", "default", nil) m2 := randomTask("gen_thumbnail", "default", nil)
@ -23,20 +25,20 @@ func TestProcessorSuccess(t *testing.T) {
t4 := &Task{Type: m4.Type, Payload: m4.Payload} t4 := &Task{Type: m4.Type, Payload: m4.Payload}
tests := []struct { tests := []struct {
initQueue []*taskMessage // initial default queue state initQueue []*rdb.TaskMessage // initial default queue state
incoming []*taskMessage // tasks to be enqueued during run incoming []*rdb.TaskMessage // tasks to be enqueued during run
wait time.Duration // wait duration between starting and stopping processor for this test case wait time.Duration // wait duration between starting and stopping processor for this test case
wantProcessed []*Task // tasks to be processed at the end wantProcessed []*Task // tasks to be processed at the end
}{ }{
{ {
initQueue: []*taskMessage{m1}, initQueue: []*rdb.TaskMessage{m1},
incoming: []*taskMessage{m2, m3, m4}, incoming: []*rdb.TaskMessage{m2, m3, m4},
wait: time.Second, wait: time.Second,
wantProcessed: []*Task{t1, t2, t3, t4}, wantProcessed: []*Task{t1, t2, t3, t4},
}, },
{ {
initQueue: []*taskMessage{}, initQueue: []*rdb.TaskMessage{},
incoming: []*taskMessage{m1}, incoming: []*rdb.TaskMessage{m1},
wait: time.Second, wait: time.Second,
wantProcessed: []*Task{t1}, wantProcessed: []*Task{t1},
}, },
@ -44,7 +46,7 @@ func TestProcessorSuccess(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
// clean up db before each test case. // clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil { if err := r.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// instantiate a new processor // instantiate a new processor
@ -57,11 +59,11 @@ func TestProcessorSuccess(t *testing.T) {
processed = append(processed, task) processed = append(processed, task)
return nil return nil
} }
p := newProcessor(r, 10, h) p := newProcessor(rdbClient, 10, h)
p.dequeueTimeout = time.Second // short time out for test purpose p.dequeueTimeout = time.Second // short time out for test purpose
// initialize default queue. // initialize default queue.
for _, msg := range tc.initQueue { for _, msg := range tc.initQueue {
err := r.enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -70,7 +72,7 @@ func TestProcessorSuccess(t *testing.T) {
p.start() p.start()
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := r.enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
p.terminate() p.terminate()
t.Fatal(err) t.Fatal(err)
@ -83,14 +85,15 @@ func TestProcessorSuccess(t *testing.T) {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
} }
if l := r.client.LLen(inProgress).Val(); l != 0 { if l := r.LLen(rdb.InProgress).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", inProgress, l) t.Errorf("%q has %d tasks, want 0", rdb.InProgress, l)
} }
} }
} }
func TestProcessorRetry(t *testing.T) { func TestProcessorRetry(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r)
m1 := randomTask("send_email", "default", nil) m1 := randomTask("send_email", "default", nil)
m1.Retried = m1.Retry // m1 has reached its max retry count m1.Retried = m1.Retry // m1 has reached its max retry count
@ -113,24 +116,24 @@ func TestProcessorRetry(t *testing.T) {
r4.Retried = m4.Retried + 1 r4.Retried = m4.Retried + 1
tests := []struct { tests := []struct {
initQueue []*taskMessage // initial default queue state initQueue []*rdb.TaskMessage // initial default queue state
incoming []*taskMessage // tasks to be enqueued during run incoming []*rdb.TaskMessage // tasks to be enqueued during run
wait time.Duration // wait duration between starting and stopping processor for this test case wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []*taskMessage // tasks in retry queue at the end wantRetry []*rdb.TaskMessage // tasks in retry queue at the end
wantDead []*taskMessage // tasks in dead queue at the end wantDead []*rdb.TaskMessage // tasks in dead queue at the end
}{ }{
{ {
initQueue: []*taskMessage{m1, m2}, initQueue: []*rdb.TaskMessage{m1, m2},
incoming: []*taskMessage{m3, m4}, incoming: []*rdb.TaskMessage{m3, m4},
wait: time.Second, wait: time.Second,
wantRetry: []*taskMessage{&r2, &r3, &r4}, wantRetry: []*rdb.TaskMessage{&r2, &r3, &r4},
wantDead: []*taskMessage{&r1}, wantDead: []*rdb.TaskMessage{&r1},
}, },
} }
for _, tc := range tests { for _, tc := range tests {
// clean up db before each test case. // clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil { if err := r.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// instantiate a new processor // instantiate a new processor
@ -138,11 +141,11 @@ func TestProcessorRetry(t *testing.T) {
h = func(task *Task) error { h = func(task *Task) error {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
} }
p := newProcessor(r, 10, h) p := newProcessor(rdbClient, 10, h)
p.dequeueTimeout = time.Second // short time out for test purpose p.dequeueTimeout = time.Second // short time out for test purpose
// initialize default queue. // initialize default queue.
for _, msg := range tc.initQueue { for _, msg := range tc.initQueue {
err := r.enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -150,7 +153,7 @@ func TestProcessorRetry(t *testing.T) {
p.start() p.start()
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := r.enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
p.terminate() p.terminate()
t.Fatal(err) t.Fatal(err)
@ -159,20 +162,20 @@ func TestProcessorRetry(t *testing.T) {
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
gotRetryRaw := r.client.ZRange(retry, 0, -1).Val() gotRetryRaw := r.ZRange(rdb.Retry, 0, -1).Val()
gotRetry := mustUnmarshalSlice(t, gotRetryRaw) gotRetry := mustUnmarshalSlice(t, gotRetryRaw)
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", retry, diff) t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", rdb.Retry, diff)
} }
gotDeadRaw := r.client.ZRange(dead, 0, -1).Val() gotDeadRaw := r.ZRange(rdb.Dead, 0, -1).Val()
gotDead := mustUnmarshalSlice(t, gotDeadRaw) gotDead := mustUnmarshalSlice(t, gotDeadRaw)
if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", dead, diff) t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", rdb.Dead, diff)
} }
if l := r.client.LLen(inProgress).Val(); l != 0 { if l := r.LLen(rdb.InProgress).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", inProgress, l) t.Errorf("%q has %d tasks, want 0", rdb.InProgress, l)
} }
} }
} }

View File

@ -5,13 +5,15 @@ import (
"math" "math"
"math/rand" "math/rand"
"time" "time"
"github.com/hibiken/asynq/internal/rdb"
) )
func retryTask(rdb *rdb, msg *taskMessage, err error) { func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) {
msg.ErrorMsg = err.Error() msg.ErrorMsg = err.Error()
if msg.Retried >= msg.Retry { if msg.Retried >= msg.Retry {
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
if err := rdb.kill(msg); err != nil { if err := r.Kill(msg); err != nil {
log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err) log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err)
} }
return return
@ -19,7 +21,7 @@ func retryTask(rdb *rdb, msg *taskMessage, err error) {
retryAt := time.Now().Add(delaySeconds((msg.Retried))) retryAt := time.Now().Add(delaySeconds((msg.Retried)))
log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now())) log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now()))
msg.Retried++ msg.Retried++
if err := rdb.schedule(retry, retryAt, msg); err != nil { if err := r.Schedule(rdb.Retry, retryAt, msg); err != nil {
log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err) log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err)
return return
} }

View File

@ -6,64 +6,66 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
) )
func TestRetry(t *testing.T) { func TestRetry(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r)
errMsg := "email server not responding" errMsg := "email server not responding"
// t1 is a task with max-retry count reached. // t1 is a task with max-retry count reached.
t1 := &taskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: uuid.New()} t1 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: uuid.New()}
// t2 is t1 with updated error message. // t2 is t1 with updated error message.
t2 := *t1 t2 := *t1
t2.ErrorMsg = errMsg t2.ErrorMsg = errMsg
// t3 is a task which hasn't reached max-retry count. // t3 is a task which hasn't reached max-retry count.
t3 := &taskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: uuid.New()} t3 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: uuid.New()}
// t4 is t3 after retry. // t4 is t3 after retry.
t4 := *t3 t4 := *t3
t4.Retried++ t4.Retried++
t4.ErrorMsg = errMsg t4.ErrorMsg = errMsg
tests := []struct { tests := []struct {
desc string // test case description desc string // test case description
msg *taskMessage // task to retry msg *rdb.TaskMessage // task to retry
err error // error that caused retry err error // error that caused retry
wantDead []*taskMessage // state "dead" queue should be in wantDead []*rdb.TaskMessage // state "dead" queue should be in
wantRetry []*taskMessage // state "retry" queue should be in wantRetry []*rdb.TaskMessage // state "retry" queue should be in
}{ }{
{ {
desc: "With retry exhausted task", desc: "With retry exhausted task",
msg: t1, msg: t1,
err: fmt.Errorf(errMsg), err: fmt.Errorf(errMsg),
wantDead: []*taskMessage{&t2}, wantDead: []*rdb.TaskMessage{&t2},
wantRetry: []*taskMessage{}, wantRetry: []*rdb.TaskMessage{},
}, },
{ {
desc: "With retry-able task", desc: "With retry-able task",
msg: t3, msg: t3,
err: fmt.Errorf(errMsg), err: fmt.Errorf(errMsg),
wantDead: []*taskMessage{}, wantDead: []*rdb.TaskMessage{},
wantRetry: []*taskMessage{&t4}, wantRetry: []*rdb.TaskMessage{&t4},
}, },
} }
for _, tc := range tests { for _, tc := range tests {
// clean up db before each test case. // clean up db before each test case.
if err := r.client.FlushDB().Err(); err != nil { if err := r.FlushDB().Err(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
retryTask(r, tc.msg, tc.err) retryTask(rdbClient, tc.msg, tc.err)
deadQueue := r.client.ZRange(dead, 0, -1).Val() deadQueue := r.ZRange(rdb.Dead, 0, -1).Val()
gotDead := mustUnmarshalSlice(t, deadQueue) gotDead := mustUnmarshalSlice(t, deadQueue)
if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, dead, diff) t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, rdb.Dead, diff)
} }
retryQueue := r.client.ZRange(retry, 0, -1).Val() retryQueue := r.ZRange(rdb.Retry, 0, -1).Val()
gotRetry := mustUnmarshalSlice(t, retryQueue) gotRetry := mustUnmarshalSlice(t, retryQueue)
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, dead, diff) t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, rdb.Dead, diff)
} }
} }
} }