mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Merge pull request #6 from hibiken/refactor/internalpkg
Create internal package for reuse
This commit is contained in:
commit
3d72ca5a74
3
.gitignore
vendored
3
.gitignore
vendored
@ -13,3 +13,6 @@
|
||||
|
||||
# Ignore examples for now
|
||||
/examples
|
||||
|
||||
# Ignore command binary
|
||||
/cmd/asynqmon/asynqmon
|
36
asynq.go
36
asynq.go
@ -1,6 +1,6 @@
|
||||
package asynq
|
||||
|
||||
import "github.com/google/uuid"
|
||||
import "github.com/go-redis/redis/v7"
|
||||
|
||||
/*
|
||||
TODOs:
|
||||
@ -23,32 +23,6 @@ type Task struct {
|
||||
Payload map[string]interface{}
|
||||
}
|
||||
|
||||
// taskMessage is an internal representation of a task with additional metadata fields.
|
||||
// This data gets written in redis.
|
||||
type taskMessage struct {
|
||||
//-------- Task fields --------
|
||||
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
|
||||
//-------- metadata fields --------
|
||||
|
||||
// unique identifier for each task
|
||||
ID uuid.UUID
|
||||
|
||||
// queue name this message should be enqueued to
|
||||
Queue string
|
||||
|
||||
// max number of retry for this task.
|
||||
Retry int
|
||||
|
||||
// number of times we've retried so far
|
||||
Retried int
|
||||
|
||||
// error message from the last failure
|
||||
ErrorMsg string
|
||||
}
|
||||
|
||||
// RedisConfig specifies redis configurations.
|
||||
type RedisConfig struct {
|
||||
Addr string
|
||||
@ -57,3 +31,11 @@ type RedisConfig struct {
|
||||
// DB specifies which redis database to select.
|
||||
DB int
|
||||
}
|
||||
|
||||
func newRedisClient(config *RedisConfig) *redis.Client {
|
||||
return redis.NewClient(&redis.Options{
|
||||
Addr: config.Addr,
|
||||
Password: config.Password,
|
||||
DB: config.DB,
|
||||
})
|
||||
}
|
||||
|
@ -7,8 +7,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// This file defines test helper functions used by
|
||||
@ -18,13 +20,28 @@ func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*taskMessage) []*taskMessage {
|
||||
out := append([]*taskMessage(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
// Redis keys
|
||||
const (
|
||||
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
||||
defaultQ = queuePrefix + "default" // LIST
|
||||
scheduledQ = "asynq:scheduled" // ZSET
|
||||
retryQ = "asynq:retry" // ZSET
|
||||
deadQ = "asynq:dead" // ZSET
|
||||
inProgressQ = "asynq:in_progress" // LIST
|
||||
)
|
||||
|
||||
func setup(t *testing.T) *redis.Client {
|
||||
t.Helper()
|
||||
r := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 14,
|
||||
})
|
||||
// Start each test with a clean slate.
|
||||
if err := r.FlushDB().Err(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
|
||||
out := append([]*Task(nil), in...) // Copy input to avoid mutating it
|
||||
@ -34,32 +51,25 @@ var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
|
||||
return out
|
||||
})
|
||||
|
||||
// setup connects to a redis database and flush all keys
|
||||
// before returning an instance of rdb.
|
||||
func setup(t *testing.T) *rdb {
|
||||
t.Helper()
|
||||
r := newRDB(&RedisConfig{
|
||||
Addr: "localhost:6379",
|
||||
DB: 15, // use database 15 to separate from other applications
|
||||
var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.TaskMessage {
|
||||
out := append([]*rdb.TaskMessage(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
// Start each test with a clean slate.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func randomTask(taskType, qname string, payload map[string]interface{}) *taskMessage {
|
||||
return &taskMessage{
|
||||
func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage {
|
||||
return &rdb.TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: taskType,
|
||||
Queue: qname,
|
||||
Retry: rand.Intn(100),
|
||||
Retry: defaultMaxRetry,
|
||||
Payload: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func mustMarshal(t *testing.T, task *taskMessage) string {
|
||||
func mustMarshal(t *testing.T, task *rdb.TaskMessage) string {
|
||||
t.Helper()
|
||||
data, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
@ -68,9 +78,9 @@ func mustMarshal(t *testing.T, task *taskMessage) string {
|
||||
return string(data)
|
||||
}
|
||||
|
||||
func mustUnmarshal(t *testing.T, data string) *taskMessage {
|
||||
func mustUnmarshal(t *testing.T, data string) *rdb.TaskMessage {
|
||||
t.Helper()
|
||||
var task taskMessage
|
||||
var task rdb.TaskMessage
|
||||
err := json.Unmarshal([]byte(data), &task)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -78,7 +88,7 @@ func mustUnmarshal(t *testing.T, data string) *taskMessage {
|
||||
return &task
|
||||
}
|
||||
|
||||
func mustMarshalSlice(t *testing.T, tasks []*taskMessage) []string {
|
||||
func mustMarshalSlice(t *testing.T, tasks []*rdb.TaskMessage) []string {
|
||||
t.Helper()
|
||||
var data []string
|
||||
for _, task := range tasks {
|
||||
@ -87,9 +97,9 @@ func mustMarshalSlice(t *testing.T, tasks []*taskMessage) []string {
|
||||
return data
|
||||
}
|
||||
|
||||
func mustUnmarshalSlice(t *testing.T, data []string) []*taskMessage {
|
||||
func mustUnmarshalSlice(t *testing.T, data []string) []*rdb.TaskMessage {
|
||||
t.Helper()
|
||||
var tasks []*taskMessage
|
||||
var tasks []*rdb.TaskMessage
|
||||
for _, s := range data {
|
||||
tasks = append(tasks, mustUnmarshal(t, s))
|
||||
}
|
||||
|
@ -7,6 +7,8 @@ import (
|
||||
"os/signal"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// Background is a top-level entity for the background-task processing.
|
||||
@ -14,18 +16,18 @@ type Background struct {
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
|
||||
rdb *rdb
|
||||
rdb *rdb.RDB
|
||||
poller *poller
|
||||
processor *processor
|
||||
}
|
||||
|
||||
// NewBackground returns a new Background instance.
|
||||
func NewBackground(numWorkers int, config *RedisConfig) *Background {
|
||||
rdb := newRDB(config)
|
||||
poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry})
|
||||
processor := newProcessor(rdb, numWorkers, nil)
|
||||
r := rdb.NewRDB(newRedisClient(config))
|
||||
poller := newPoller(r, 5*time.Second)
|
||||
processor := newProcessor(r, numWorkers, nil)
|
||||
return &Background{
|
||||
rdb: rdb,
|
||||
rdb: r,
|
||||
poller: poller,
|
||||
processor: processor,
|
||||
}
|
||||
@ -95,7 +97,7 @@ func (bg *Background) stop() {
|
||||
bg.poller.terminate()
|
||||
bg.processor.terminate()
|
||||
|
||||
bg.rdb.client.Close()
|
||||
bg.rdb.Close()
|
||||
bg.processor.handler = nil
|
||||
bg.running = false
|
||||
}
|
||||
|
15
client.go
15
client.go
@ -4,21 +4,23 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// Client is an interface for scheduling tasks.
|
||||
type Client struct {
|
||||
rdb *rdb
|
||||
rdb *rdb.RDB
|
||||
}
|
||||
|
||||
// NewClient creates and returns a new client.
|
||||
func NewClient(config *RedisConfig) *Client {
|
||||
return &Client{rdb: newRDB(config)}
|
||||
r := rdb.NewRDB(newRedisClient(config))
|
||||
return &Client{r}
|
||||
}
|
||||
|
||||
// Process enqueues the task to be performed at a given time.
|
||||
func (c *Client) Process(task *Task, processAt time.Time) error {
|
||||
msg := &taskMessage{
|
||||
msg := &rdb.TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: task.Type,
|
||||
Payload: task.Payload,
|
||||
@ -28,10 +30,9 @@ func (c *Client) Process(task *Task, processAt time.Time) error {
|
||||
return c.enqueue(msg, processAt)
|
||||
}
|
||||
|
||||
// enqueue pushes a given task to the specified queue.
|
||||
func (c *Client) enqueue(msg *taskMessage, processAt time.Time) error {
|
||||
func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error {
|
||||
if time.Now().After(processAt) {
|
||||
return c.rdb.enqueue(msg)
|
||||
return c.rdb.Enqueue(msg)
|
||||
}
|
||||
return c.rdb.schedule(scheduled, processAt, msg)
|
||||
return c.rdb.Schedule(msg, processAt)
|
||||
}
|
||||
|
@ -1,13 +1,14 @@
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestClient(t *testing.T) {
|
||||
r := setup(t)
|
||||
client := &Client{rdb: r}
|
||||
client := &Client{rdb.NewRDB(r)}
|
||||
|
||||
tests := []struct {
|
||||
task *Task
|
||||
@ -31,7 +32,7 @@ func TestClient(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
if err := r.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@ -41,12 +42,12 @@ func TestClient(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
|
||||
if l := r.client.LLen(defaultQueue).Val(); l != tc.wantQueueSize {
|
||||
t.Errorf("%q has length %d, want %d", defaultQueue, l, tc.wantQueueSize)
|
||||
if l := r.LLen(defaultQ).Val(); l != tc.wantQueueSize {
|
||||
t.Errorf("%q has length %d, want %d", defaultQ, l, tc.wantQueueSize)
|
||||
}
|
||||
|
||||
if l := r.client.ZCard(scheduled).Val(); l != tc.wantScheduledSize {
|
||||
t.Errorf("%q has length %d, want %d", scheduled, l, tc.wantScheduledSize)
|
||||
if l := r.ZCard(scheduledQ).Val(); l != tc.wantScheduledSize {
|
||||
t.Errorf("%q has length %d, want %d", scheduledQ, l, tc.wantScheduledSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
38
cmd/asynqmon/main.go
Normal file
38
cmd/asynqmon/main.go
Normal file
@ -0,0 +1,38 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
// Example usage: watch -n5 asynqmon
|
||||
|
||||
func main() {
|
||||
c := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 2,
|
||||
})
|
||||
r := rdb.NewRDB(c)
|
||||
|
||||
stats, err := r.CurrentStats()
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
printStats(stats)
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
func printStats(s *rdb.Stats) {
|
||||
format := strings.Repeat("%v\t", 5) + "\n"
|
||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||
fmt.Fprintf(tw, format, "Enqueued", "InProgress", "Scheduled", "Retry", "Dead")
|
||||
fmt.Fprintf(tw, format, "--------", "----------", "---------", "-----", "----")
|
||||
fmt.Fprintf(tw, format, s.Enqueued, s.InProgress, s.Scheduled, s.Retry, s.Dead)
|
||||
tw.Flush()
|
||||
}
|
9
go.sum
9
go.sum
@ -8,17 +8,26 @@ github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
|
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w=
|
||||
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||
github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo=
|
||||
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
|
||||
go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4=
|
||||
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
|
||||
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
|
||||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
|
||||
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
223
internal/rdb/inspect.go
Normal file
223
internal/rdb/inspect.go
Normal file
@ -0,0 +1,223 @@
|
||||
package rdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Stats represents a state of queues at a certain time.
|
||||
type Stats struct {
|
||||
Enqueued int
|
||||
InProgress int
|
||||
Scheduled int
|
||||
Retry int
|
||||
Dead int
|
||||
Timestamp time.Time
|
||||
}
|
||||
|
||||
// EnqueuedTask is a task in a queue and is ready to be processed.
|
||||
// Note: This is read only and used for monitoring purpose.
|
||||
type EnqueuedTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
}
|
||||
|
||||
// InProgressTask is a task that's currently being processed.
|
||||
// Note: This is read only and used for monitoring purpose.
|
||||
type InProgressTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
}
|
||||
|
||||
// ScheduledTask is a task that's scheduled to be processed in the future.
|
||||
// Note: This is read only and used for monitoring purpose.
|
||||
type ScheduledTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
ProcessAt time.Time
|
||||
}
|
||||
|
||||
// RetryTask is a task that's in retry queue because worker failed to process the task.
|
||||
// Note: This is read only and used for monitoring purpose.
|
||||
type RetryTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
// TODO(hibiken): add LastFailedAt time.Time
|
||||
ProcessAt time.Time
|
||||
ErrorMsg string
|
||||
Retried int
|
||||
Retry int
|
||||
}
|
||||
|
||||
// DeadTask is a task in that has exhausted all retries.
|
||||
// Note: This is read only and used for monitoring purpose.
|
||||
type DeadTask struct {
|
||||
ID uuid.UUID
|
||||
Type string
|
||||
Payload map[string]interface{}
|
||||
LastFailedAt time.Time
|
||||
ErrorMsg string
|
||||
}
|
||||
|
||||
// CurrentStats returns a current state of the queues.
|
||||
func (r *RDB) CurrentStats() (*Stats, error) {
|
||||
pipe := r.client.Pipeline()
|
||||
qlen := pipe.LLen(defaultQ)
|
||||
plen := pipe.LLen(inProgressQ)
|
||||
slen := pipe.ZCard(scheduledQ)
|
||||
rlen := pipe.ZCard(retryQ)
|
||||
dlen := pipe.ZCard(deadQ)
|
||||
_, err := pipe.Exec()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Stats{
|
||||
Enqueued: int(qlen.Val()),
|
||||
InProgress: int(plen.Val()),
|
||||
Scheduled: int(slen.Val()),
|
||||
Retry: int(rlen.Val()),
|
||||
Dead: int(dlen.Val()),
|
||||
Timestamp: time.Now(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListEnqueued returns all enqueued tasks that are ready to be processed.
|
||||
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
|
||||
data, err := r.client.LRange(defaultQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*EnqueuedTask
|
||||
for _, s := range data {
|
||||
var msg TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
// continue // bad data, ignore and continue
|
||||
return nil, err
|
||||
}
|
||||
tasks = append(tasks, &EnqueuedTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListInProgress returns all tasks that are currently being processed.
|
||||
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
|
||||
data, err := r.client.LRange(inProgressQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*InProgressTask
|
||||
for _, s := range data {
|
||||
var msg TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
tasks = append(tasks, &InProgressTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListScheduled returns all tasks that are scheduled to be processed
|
||||
// in the future.
|
||||
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*ScheduledTask
|
||||
for _, z := range data {
|
||||
s, ok := z.Member.(string)
|
||||
if !ok {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
var msg TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
processAt := time.Unix(int64(z.Score), 0)
|
||||
tasks = append(tasks, &ScheduledTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
ProcessAt: processAt,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListRetry returns all tasks that have failed before and willl be retried
|
||||
// in the future.
|
||||
func (r *RDB) ListRetry() ([]*RetryTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*RetryTask
|
||||
for _, z := range data {
|
||||
s, ok := z.Member.(string)
|
||||
if !ok {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
var msg TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
processAt := time.Unix(int64(z.Score), 0)
|
||||
tasks = append(tasks, &RetryTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
ErrorMsg: msg.ErrorMsg,
|
||||
Retry: msg.Retry,
|
||||
Retried: msg.Retried,
|
||||
ProcessAt: processAt,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListDead returns all tasks that have exhausted its retry limit.
|
||||
func (r *RDB) ListDead() ([]*DeadTask, error) {
|
||||
data, err := r.client.ZRangeWithScores(deadQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var tasks []*DeadTask
|
||||
for _, z := range data {
|
||||
s, ok := z.Member.(string)
|
||||
if !ok {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
var msg TaskMessage
|
||||
err := json.Unmarshal([]byte(s), &msg)
|
||||
if err != nil {
|
||||
continue // bad data, ignore and continue
|
||||
}
|
||||
lastFailedAt := time.Unix(int64(z.Score), 0)
|
||||
tasks = append(tasks, &DeadTask{
|
||||
ID: msg.ID,
|
||||
Type: msg.Type,
|
||||
Payload: msg.Payload,
|
||||
ErrorMsg: msg.ErrorMsg,
|
||||
LastFailedAt: lastFailedAt,
|
||||
})
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
468
internal/rdb/inspect_test.go
Normal file
468
internal/rdb/inspect_test.go
Normal file
@ -0,0 +1,468 @@
|
||||
package rdb
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func TestCurrentStats(t *testing.T) {
|
||||
r := setup(t)
|
||||
m1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello"})
|
||||
m2 := randomTask("reindex", "default", nil)
|
||||
m3 := randomTask("gen_thumbnail", "default", map[string]interface{}{"src": "some/path/to/img"})
|
||||
m4 := randomTask("sync", "default", nil)
|
||||
|
||||
tests := []struct {
|
||||
enqueued []*TaskMessage
|
||||
inProgress []*TaskMessage
|
||||
scheduled []*TaskMessage
|
||||
retry []*TaskMessage
|
||||
dead []*TaskMessage
|
||||
want *Stats
|
||||
}{
|
||||
{
|
||||
enqueued: []*TaskMessage{m1},
|
||||
inProgress: []*TaskMessage{m2},
|
||||
scheduled: []*TaskMessage{m3, m4},
|
||||
retry: []*TaskMessage{},
|
||||
dead: []*TaskMessage{},
|
||||
want: &Stats{
|
||||
Enqueued: 1,
|
||||
InProgress: 1,
|
||||
Scheduled: 2,
|
||||
Retry: 0,
|
||||
Dead: 0,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
},
|
||||
{
|
||||
enqueued: []*TaskMessage{},
|
||||
inProgress: []*TaskMessage{},
|
||||
scheduled: []*TaskMessage{m3, m4},
|
||||
retry: []*TaskMessage{m1},
|
||||
dead: []*TaskMessage{m2},
|
||||
want: &Stats{
|
||||
Enqueued: 0,
|
||||
InProgress: 0,
|
||||
Scheduled: 2,
|
||||
Retry: 1,
|
||||
Dead: 1,
|
||||
Timestamp: time.Now(),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// initialize the queues.
|
||||
for _, msg := range tc.enqueued {
|
||||
if err := r.Enqueue(msg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.inProgress {
|
||||
if err := r.client.LPush(inProgressQ, mustMarshal(t, msg)).Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.scheduled {
|
||||
if err := r.Schedule(msg, time.Now().Add(time.Hour)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.retry {
|
||||
if err := r.RetryLater(msg, time.Now().Add(time.Hour)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
for _, msg := range tc.dead {
|
||||
if err := r.Kill(msg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
got, err := r.CurrentStats()
|
||||
if err != nil {
|
||||
t.Errorf("r.CurrentStats() = %v, %v, want %v, nil", got, err, tc.want)
|
||||
continue
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
|
||||
t.Errorf("r.CurrentStats() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestListEnqueued(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
m1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello"})
|
||||
m2 := randomTask("reindex", "default", nil)
|
||||
t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload}
|
||||
t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload}
|
||||
tests := []struct {
|
||||
enqueued []*TaskMessage
|
||||
want []*EnqueuedTask
|
||||
}{
|
||||
{
|
||||
enqueued: []*TaskMessage{m1, m2},
|
||||
want: []*EnqueuedTask{t1, t2},
|
||||
},
|
||||
{
|
||||
enqueued: []*TaskMessage{},
|
||||
want: []*EnqueuedTask{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// initialize the list
|
||||
for _, msg := range tc.enqueued {
|
||||
if err := r.Enqueue(msg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
got, err := r.ListEnqueued()
|
||||
if err != nil {
|
||||
t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil", got, err, tc.want)
|
||||
continue
|
||||
}
|
||||
sortOpt := cmp.Transformer("SortMsg", func(in []*EnqueuedTask) []*EnqueuedTask {
|
||||
out := append([]*EnqueuedTask(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
||||
t.Errorf("r.ListEnqueued() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListInProgress(t *testing.T) {
|
||||
r := setup(t)
|
||||
|
||||
m1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello"})
|
||||
m2 := randomTask("reindex", "default", nil)
|
||||
t1 := &InProgressTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload}
|
||||
t2 := &InProgressTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload}
|
||||
tests := []struct {
|
||||
enqueued []*TaskMessage
|
||||
want []*InProgressTask
|
||||
}{
|
||||
{
|
||||
enqueued: []*TaskMessage{m1, m2},
|
||||
want: []*InProgressTask{t1, t2},
|
||||
},
|
||||
{
|
||||
enqueued: []*TaskMessage{},
|
||||
want: []*InProgressTask{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// initialize the list
|
||||
for _, msg := range tc.enqueued {
|
||||
if err := r.client.LPush(inProgressQ, mustMarshal(t, msg)).Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
got, err := r.ListInProgress()
|
||||
if err != nil {
|
||||
t.Errorf("r.ListInProgress() = %v, %v, want %v, nil", got, err, tc.want)
|
||||
continue
|
||||
}
|
||||
sortOpt := cmp.Transformer("SortMsg", func(in []*InProgressTask) []*InProgressTask {
|
||||
out := append([]*InProgressTask(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
||||
t.Errorf("r.ListInProgress() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListScheduled(t *testing.T) {
|
||||
r := setup(t)
|
||||
m1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello"})
|
||||
m2 := randomTask("reindex", "default", nil)
|
||||
p1 := time.Now().Add(30 * time.Minute)
|
||||
p2 := time.Now().Add(24 * time.Hour)
|
||||
t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1}
|
||||
t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2}
|
||||
|
||||
type scheduledEntry struct {
|
||||
msg *TaskMessage
|
||||
processAt time.Time
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
scheduled []scheduledEntry
|
||||
want []*ScheduledTask
|
||||
}{
|
||||
{
|
||||
scheduled: []scheduledEntry{
|
||||
{m1, p1},
|
||||
{m2, p2},
|
||||
},
|
||||
want: []*ScheduledTask{t1, t2},
|
||||
},
|
||||
{
|
||||
scheduled: []scheduledEntry{},
|
||||
want: []*ScheduledTask{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// initialize the scheduled queue
|
||||
for _, s := range tc.scheduled {
|
||||
err := r.Schedule(s.msg, s.processAt)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
got, err := r.ListScheduled()
|
||||
if err != nil {
|
||||
t.Errorf("r.ListScheduled() = %v, %v, want %v, nil", got, err, tc.want)
|
||||
continue
|
||||
}
|
||||
sortOpt := cmp.Transformer("SortMsg", func(in []*ScheduledTask) []*ScheduledTask {
|
||||
out := append([]*ScheduledTask(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
|
||||
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
|
||||
t.Errorf("r.ListScheduled() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListRetry(t *testing.T) {
|
||||
r := setup(t)
|
||||
m1 := &TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: "send_email",
|
||||
Queue: "default",
|
||||
Payload: map[string]interface{}{"subject": "hello"},
|
||||
ErrorMsg: "email server not responding",
|
||||
Retry: 25,
|
||||
Retried: 10,
|
||||
}
|
||||
m2 := &TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: "reindex",
|
||||
Queue: "default",
|
||||
Payload: nil,
|
||||
ErrorMsg: "search engine not responding",
|
||||
Retry: 25,
|
||||
Retried: 2,
|
||||
}
|
||||
p1 := time.Now().Add(5 * time.Minute)
|
||||
p2 := time.Now().Add(24 * time.Hour)
|
||||
t1 := &RetryTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload,
|
||||
ProcessAt: p1, ErrorMsg: m1.ErrorMsg, Retried: m1.Retried, Retry: m1.Retry}
|
||||
t2 := &RetryTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload,
|
||||
ProcessAt: p2, ErrorMsg: m2.ErrorMsg, Retried: m2.Retried, Retry: m2.Retry}
|
||||
|
||||
type retryEntry struct {
|
||||
msg *TaskMessage
|
||||
processAt time.Time
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
dead []retryEntry
|
||||
want []*RetryTask
|
||||
}{
|
||||
{
|
||||
dead: []retryEntry{
|
||||
{m1, p1},
|
||||
{m2, p2},
|
||||
},
|
||||
want: []*RetryTask{t1, t2},
|
||||
},
|
||||
{
|
||||
dead: []retryEntry{},
|
||||
want: []*RetryTask{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// initialize the scheduled queue
|
||||
for _, d := range tc.dead {
|
||||
r.client.ZAdd(retryQ, &redis.Z{
|
||||
Member: mustMarshal(t, d.msg),
|
||||
Score: float64(d.processAt.Unix()),
|
||||
})
|
||||
}
|
||||
|
||||
got, err := r.ListRetry()
|
||||
if err != nil {
|
||||
t.Errorf("r.ListRetry() = %v, %v, want %v, nil", got, err, tc.want)
|
||||
continue
|
||||
}
|
||||
sortOpt := cmp.Transformer("SortMsg", func(in []*RetryTask) []*RetryTask {
|
||||
out := append([]*RetryTask(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
|
||||
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
|
||||
t.Errorf("r.ListRetry() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListDead(t *testing.T) {
|
||||
r := setup(t)
|
||||
m1 := &TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: "send_email",
|
||||
Queue: "default",
|
||||
Payload: map[string]interface{}{"subject": "hello"},
|
||||
ErrorMsg: "email server not responding",
|
||||
}
|
||||
m2 := &TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: "reindex",
|
||||
Queue: "default",
|
||||
Payload: nil,
|
||||
ErrorMsg: "search engine not responding",
|
||||
}
|
||||
f1 := time.Now().Add(-5 * time.Minute)
|
||||
f2 := time.Now().Add(-24 * time.Hour)
|
||||
t1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, LastFailedAt: f1, ErrorMsg: m1.ErrorMsg}
|
||||
t2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, LastFailedAt: f2, ErrorMsg: m2.ErrorMsg}
|
||||
|
||||
type deadEntry struct {
|
||||
msg *TaskMessage
|
||||
lastFailedAt time.Time
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
dead []deadEntry
|
||||
want []*DeadTask
|
||||
}{
|
||||
{
|
||||
dead: []deadEntry{
|
||||
{m1, f1},
|
||||
{m2, f2},
|
||||
},
|
||||
want: []*DeadTask{t1, t2},
|
||||
},
|
||||
{
|
||||
dead: []deadEntry{},
|
||||
want: []*DeadTask{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// initialize the scheduled queue
|
||||
for _, d := range tc.dead {
|
||||
r.client.ZAdd(deadQ, &redis.Z{
|
||||
Member: mustMarshal(t, d.msg),
|
||||
Score: float64(d.lastFailedAt.Unix()),
|
||||
})
|
||||
}
|
||||
|
||||
got, err := r.ListDead()
|
||||
if err != nil {
|
||||
t.Errorf("r.ListDead() = %v, %v, want %v, nil", got, err, tc.want)
|
||||
continue
|
||||
}
|
||||
sortOpt := cmp.Transformer("SortMsg", func(in []*DeadTask) []*DeadTask {
|
||||
out := append([]*DeadTask(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
|
||||
if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" {
|
||||
t.Errorf("r.ListDead() = %v, %v, want %v, nil; (-want, +got)\n%s", got, err, tc.want, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var timeCmpOpt = EquateApproxTime(time.Second)
|
||||
|
||||
// TODO(hibiken): Replace this with cmpopts.EquateApproxTime once it becomes availalble.
|
||||
// https://github.com/google/go-cmp/issues/166
|
||||
//
|
||||
// EquateApproxTime returns a Comparer options that
|
||||
// determine two time.Time values to be equal if they
|
||||
// are within the given time interval of one another.
|
||||
// Note that if both times have a monotonic clock reading,
|
||||
// the monotonic time difference will be used.
|
||||
//
|
||||
// The zero time is treated specially: it is only considered
|
||||
// equal to another zero time value.
|
||||
//
|
||||
// It will panic if margin is negative.
|
||||
func EquateApproxTime(margin time.Duration) cmp.Option {
|
||||
if margin < 0 {
|
||||
panic("negative duration in EquateApproxTime")
|
||||
}
|
||||
return cmp.FilterValues(func(x, y time.Time) bool {
|
||||
return !x.IsZero() && !y.IsZero()
|
||||
}, cmp.Comparer(timeApproximator{margin}.compare))
|
||||
}
|
||||
|
||||
type timeApproximator struct {
|
||||
margin time.Duration
|
||||
}
|
||||
|
||||
func (a timeApproximator) compare(x, y time.Time) bool {
|
||||
// Avoid subtracting times to avoid overflow when the
|
||||
// difference is larger than the largest representible duration.
|
||||
if x.After(y) {
|
||||
// Ensure x is always before y
|
||||
x, y = y, x
|
||||
}
|
||||
// We're within the margin if x+margin >= y.
|
||||
// Note: time.Time doesn't have AfterOrEqual method hence the negation.
|
||||
return !x.Add(a.margin).Before(y)
|
||||
}
|
202
internal/rdb/rdb.go
Normal file
202
internal/rdb/rdb.go
Normal file
@ -0,0 +1,202 @@
|
||||
// Package rdb encapsulates the interactions with redis.
|
||||
package rdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// Redis keys
|
||||
const (
|
||||
allQueues = "asynq:queues" // SET
|
||||
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
||||
defaultQ = queuePrefix + "default" // LIST
|
||||
scheduledQ = "asynq:scheduled" // ZSET
|
||||
retryQ = "asynq:retry" // ZSET
|
||||
deadQ = "asynq:dead" // ZSET
|
||||
inProgressQ = "asynq:in_progress" // LIST
|
||||
)
|
||||
|
||||
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out.
|
||||
var ErrDequeueTimeout = errors.New("blocking dequeue operation timed out")
|
||||
|
||||
// RDB is a client interface to query and mutate task queues.
|
||||
type RDB struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
// NewRDB returns a new instance of RDB.
|
||||
func NewRDB(client *redis.Client) *RDB {
|
||||
return &RDB{client}
|
||||
}
|
||||
|
||||
// TaskMessage is the internal representation of a task with additional metadata fields.
|
||||
// Serialized data of this type gets written in redis.
|
||||
type TaskMessage struct {
|
||||
//-------- Task fields --------
|
||||
// Type represents the kind of task.
|
||||
Type string
|
||||
// Payload holds data needed to process the task.
|
||||
Payload map[string]interface{}
|
||||
|
||||
//-------- Metadata fields --------
|
||||
// ID is a unique identifier for each task
|
||||
ID uuid.UUID
|
||||
// Queue is a name this message should be enqueued to
|
||||
Queue string
|
||||
// Retry is the max number of retry for this task.
|
||||
Retry int
|
||||
// Retried is the number of times we've retried this task so far
|
||||
Retried int
|
||||
// ErrorMsg holds the error message from the last failure
|
||||
ErrorMsg string
|
||||
}
|
||||
|
||||
// Close closes the connection with redis server.
|
||||
func (r *RDB) Close() error {
|
||||
return r.client.Close()
|
||||
}
|
||||
|
||||
// Enqueue inserts the given task to the end of the queue.
|
||||
// It also adds the queue name to the "all-queues" list.
|
||||
func (r *RDB) Enqueue(msg *TaskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
qname := queuePrefix + msg.Queue
|
||||
pipe := r.client.Pipeline()
|
||||
pipe.SAdd(allQueues, qname)
|
||||
pipe.LPush(qname, string(bytes))
|
||||
_, err = pipe.Exec()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not enqueue the task %+v to %q: %v", msg, qname, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Dequeue blocks until there is a task available to be processed,
|
||||
// once a task is available, it adds the task to "in progress" list
|
||||
// and returns the task.
|
||||
func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) {
|
||||
data, err := r.client.BRPopLPush(defaultQ, inProgressQ, timeout).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, ErrDequeueTimeout
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", defaultQ, inProgressQ, timeout, err)
|
||||
}
|
||||
var msg TaskMessage
|
||||
err = json.Unmarshal([]byte(data), &msg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err)
|
||||
}
|
||||
fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, defaultQ)
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// Done removes the task from in-progress queue to mark the task as done.
|
||||
func (r *RDB) Done(msg *TaskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
// NOTE: count ZERO means "remove all elements equal to val"
|
||||
err = r.client.LRem(inProgressQ, 0, string(bytes)).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command `LREM %s 0 %s` failed: %v", inProgressQ, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Schedule adds the task to the backlog queue to be processed in the future.
|
||||
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
||||
return r.schedule(scheduledQ, processAt, msg)
|
||||
}
|
||||
|
||||
// RetryLater adds the task to the backlog queue to be retried in the future.
|
||||
func (r *RDB) RetryLater(msg *TaskMessage, processAt time.Time) error {
|
||||
return r.schedule(retryQ, processAt, msg)
|
||||
}
|
||||
|
||||
// schedule adds the task to the zset to be processd at the specified time.
|
||||
func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
score := float64(processAt.Unix())
|
||||
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Kill sends the task to "dead" set.
|
||||
// It also trims the set by timestamp and set size.
|
||||
func (r *RDB) Kill(msg *TaskMessage) error {
|
||||
const maxDeadTask = 10
|
||||
const deadExpirationInDays = 90
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
now := time.Now()
|
||||
pipe := r.client.Pipeline()
|
||||
pipe.ZAdd(deadQ, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
pipe.ZRemRangeByScore(deadQ, "-inf", strconv.Itoa(int(limit)))
|
||||
pipe.ZRemRangeByRank(deadQ, 0, -maxDeadTask) // trim the set to 100
|
||||
_, err = pipe.Exec()
|
||||
return err
|
||||
}
|
||||
|
||||
// RestoreUnfinished moves all tasks from in-progress list to the queue.
|
||||
func (r *RDB) RestoreUnfinished() error {
|
||||
script := redis.NewScript(`
|
||||
local len = redis.call("LLEN", KEYS[1])
|
||||
for i = len, 1, -1 do
|
||||
redis.call("RPOPLPUSH", KEYS[1], KEYS[2])
|
||||
end
|
||||
return len
|
||||
`)
|
||||
_, err := script.Run(r.client, []string{inProgressQ, defaultQ}).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
|
||||
// have to be processed.
|
||||
func (r *RDB) CheckAndEnqueue() error {
|
||||
delayed := []string{scheduledQ, retryQ}
|
||||
for _, zset := range delayed {
|
||||
if err := r.forward(zset); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Forward moves all tasks with a score less than the current unix time
|
||||
// from the given zset to the default queue.
|
||||
func (r *RDB) forward(from string) error {
|
||||
script := redis.NewScript(`
|
||||
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
||||
for _, msg in ipairs(msgs) do
|
||||
redis.call("ZREM", KEYS[1], msg)
|
||||
redis.call("SADD", KEYS[2], KEYS[3])
|
||||
redis.call("LPUSH", KEYS[3], msg)
|
||||
end
|
||||
return msgs
|
||||
`)
|
||||
now := float64(time.Now().Unix())
|
||||
res, err := script.Run(r.client, []string{from, allQueues, defaultQ}, now).Result()
|
||||
fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from)
|
||||
return err
|
||||
}
|
489
internal/rdb/rdb_test.go
Normal file
489
internal/rdb/rdb_test.go
Normal file
@ -0,0 +1,489 @@
|
||||
package rdb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func init() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
}
|
||||
|
||||
func setup(t *testing.T) *RDB {
|
||||
t.Helper()
|
||||
r := NewRDB(redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
DB: 13,
|
||||
}))
|
||||
// Start each test with a clean slate.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessage {
|
||||
out := append([]*TaskMessage(nil), in...) // Copy input to avoid mutating it
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].ID.String() < out[j].ID.String()
|
||||
})
|
||||
return out
|
||||
})
|
||||
|
||||
func randomTask(taskType, qname string, payload map[string]interface{}) *TaskMessage {
|
||||
return &TaskMessage{
|
||||
ID: uuid.New(),
|
||||
Type: taskType,
|
||||
Queue: qname,
|
||||
Retry: 25,
|
||||
Payload: make(map[string]interface{}),
|
||||
}
|
||||
}
|
||||
|
||||
func mustMarshal(t *testing.T, task *TaskMessage) string {
|
||||
t.Helper()
|
||||
data, err := json.Marshal(task)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return string(data)
|
||||
}
|
||||
|
||||
func mustUnmarshal(t *testing.T, data string) *TaskMessage {
|
||||
t.Helper()
|
||||
var task TaskMessage
|
||||
err := json.Unmarshal([]byte(data), &task)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return &task
|
||||
}
|
||||
|
||||
func mustMarshalSlice(t *testing.T, tasks []*TaskMessage) []string {
|
||||
t.Helper()
|
||||
var data []string
|
||||
for _, task := range tasks {
|
||||
data = append(data, mustMarshal(t, task))
|
||||
}
|
||||
return data
|
||||
}
|
||||
|
||||
func mustUnmarshalSlice(t *testing.T, data []string) []*TaskMessage {
|
||||
t.Helper()
|
||||
var tasks []*TaskMessage
|
||||
for _, s := range data {
|
||||
tasks = append(tasks, mustUnmarshal(t, s))
|
||||
}
|
||||
return tasks
|
||||
}
|
||||
|
||||
func TestEnqueue(t *testing.T) {
|
||||
r := setup(t)
|
||||
tests := []struct {
|
||||
msg *TaskMessage
|
||||
}{
|
||||
{msg: randomTask("send_email", "default",
|
||||
map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})},
|
||||
{msg: randomTask("generate_csv", "default",
|
||||
map[string]interface{}{})},
|
||||
{msg: randomTask("sync", "default", nil)},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err := r.Enqueue(tc.msg)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
res := r.client.LRange(defaultQ, 0, -1).Val()
|
||||
if len(res) != 1 {
|
||||
t.Errorf("LIST %q has length %d, want 1", defaultQ, len(res))
|
||||
continue
|
||||
}
|
||||
if !r.client.SIsMember(allQueues, defaultQ).Val() {
|
||||
t.Errorf("SISMEMBER %q %q = false, want true", allQueues, defaultQ)
|
||||
}
|
||||
if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" {
|
||||
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDequeue(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello!"})
|
||||
tests := []struct {
|
||||
queued []*TaskMessage
|
||||
want *TaskMessage
|
||||
err error
|
||||
inProgress int64 // length of "in-progress" tasks after dequeue
|
||||
}{
|
||||
{queued: []*TaskMessage{t1}, want: t1, err: nil, inProgress: 1},
|
||||
{queued: []*TaskMessage{}, want: nil, err: ErrDequeueTimeout, inProgress: 0},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, m := range tc.queued {
|
||||
r.Enqueue(m)
|
||||
}
|
||||
got, err := r.Dequeue(time.Second)
|
||||
if !cmp.Equal(got, tc.want) || err != tc.err {
|
||||
t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v",
|
||||
got, err, tc.want, tc.err)
|
||||
continue
|
||||
}
|
||||
if l := r.client.LLen(inProgressQ).Val(); l != tc.inProgress {
|
||||
t.Errorf("LIST %q has length %d, want %d", inProgressQ, l, tc.inProgress)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDone(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
t2 := randomTask("export_csv", "csv", nil)
|
||||
|
||||
tests := []struct {
|
||||
initial []*TaskMessage // initial state of the in-progress list
|
||||
target *TaskMessage // task to remove
|
||||
final []*TaskMessage // final state of the in-progress list
|
||||
}{
|
||||
{
|
||||
initial: []*TaskMessage{t1, t2},
|
||||
target: t1,
|
||||
final: []*TaskMessage{t2},
|
||||
},
|
||||
{
|
||||
initial: []*TaskMessage{t2},
|
||||
target: t1,
|
||||
final: []*TaskMessage{t2},
|
||||
},
|
||||
{
|
||||
initial: []*TaskMessage{t1},
|
||||
target: t1,
|
||||
final: []*TaskMessage{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// set up initial state
|
||||
for _, task := range tc.initial {
|
||||
err := r.client.LPush(inProgressQ, mustMarshal(t, task)).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err := r.Done(tc.target)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
var got []*TaskMessage
|
||||
data := r.client.LRange(inProgressQ, 0, -1).Val()
|
||||
for _, s := range data {
|
||||
got = append(got, mustUnmarshal(t, s))
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", defaultQ, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKill(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
|
||||
// TODO(hibiken): add test cases for trimming
|
||||
tests := []struct {
|
||||
initial []*TaskMessage // inital state of "dead" set
|
||||
target *TaskMessage // task to kill
|
||||
want []*TaskMessage // final state of "dead" set
|
||||
}{
|
||||
{
|
||||
initial: []*TaskMessage{},
|
||||
target: t1,
|
||||
want: []*TaskMessage{t1},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// set up initial state
|
||||
for _, task := range tc.initial {
|
||||
err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err := r.Kill(tc.target)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
actual := r.client.ZRange(deadQ, 0, -1).Val()
|
||||
got := mustUnmarshalSlice(t, actual)
|
||||
if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", deadQ, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRestoreUnfinished(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
t2 := randomTask("export_csv", "csv", nil)
|
||||
t3 := randomTask("sync_stuff", "sync", nil)
|
||||
|
||||
tests := []struct {
|
||||
beforeSrc []*TaskMessage
|
||||
beforeDst []*TaskMessage
|
||||
afterSrc []*TaskMessage
|
||||
afterDst []*TaskMessage
|
||||
}{
|
||||
{
|
||||
beforeSrc: []*TaskMessage{t1, t2, t3},
|
||||
beforeDst: []*TaskMessage{},
|
||||
afterSrc: []*TaskMessage{},
|
||||
afterDst: []*TaskMessage{t1, t2, t3},
|
||||
},
|
||||
{
|
||||
beforeSrc: []*TaskMessage{},
|
||||
beforeDst: []*TaskMessage{t1, t2, t3},
|
||||
afterSrc: []*TaskMessage{},
|
||||
afterDst: []*TaskMessage{t1, t2, t3},
|
||||
},
|
||||
{
|
||||
beforeSrc: []*TaskMessage{t2, t3},
|
||||
beforeDst: []*TaskMessage{t1},
|
||||
afterSrc: []*TaskMessage{},
|
||||
afterDst: []*TaskMessage{t1, t2, t3},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
// seed src list.
|
||||
for _, msg := range tc.beforeSrc {
|
||||
r.client.LPush(inProgressQ, mustMarshal(t, msg))
|
||||
}
|
||||
// seed dst list.
|
||||
for _, msg := range tc.beforeDst {
|
||||
r.client.LPush(defaultQ, mustMarshal(t, msg))
|
||||
}
|
||||
|
||||
if err := r.RestoreUnfinished(); err != nil {
|
||||
t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err)
|
||||
continue
|
||||
}
|
||||
|
||||
src := r.client.LRange(inProgressQ, 0, -1).Val()
|
||||
gotSrc := mustUnmarshalSlice(t, src)
|
||||
if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgressQ, diff)
|
||||
}
|
||||
dst := r.client.LRange(defaultQ, 0, -1).Val()
|
||||
gotDst := mustUnmarshalSlice(t, dst)
|
||||
if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQ, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckAndEnqueue(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
t2 := randomTask("generate_csv", "default", nil)
|
||||
t3 := randomTask("gen_thumbnail", "default", nil)
|
||||
secondAgo := time.Now().Add(-time.Second)
|
||||
hourFromNow := time.Now().Add(time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
initScheduled []*redis.Z // tasks to be processed later
|
||||
initRetry []*redis.Z // tasks to be retired later
|
||||
wantQueued []*TaskMessage // queue after calling forward
|
||||
wantScheduled []*TaskMessage // tasks in scheduled queue after calling the method
|
||||
wantRetry []*TaskMessage // tasks in retry queue after calling the method
|
||||
}{
|
||||
{
|
||||
initScheduled: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())},
|
||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
||||
initRetry: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}},
|
||||
wantQueued: []*TaskMessage{t1, t2, t3},
|
||||
wantScheduled: []*TaskMessage{},
|
||||
wantRetry: []*TaskMessage{},
|
||||
},
|
||||
{
|
||||
initScheduled: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
||||
initRetry: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}},
|
||||
wantQueued: []*TaskMessage{t2, t3},
|
||||
wantScheduled: []*TaskMessage{t1},
|
||||
wantRetry: []*TaskMessage{},
|
||||
},
|
||||
{
|
||||
initScheduled: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}},
|
||||
initRetry: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t3), Score: float64(hourFromNow.Unix())}},
|
||||
wantQueued: []*TaskMessage{},
|
||||
wantScheduled: []*TaskMessage{t1, t2},
|
||||
wantRetry: []*TaskMessage{t3},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := r.client.ZAdd(scheduledQ, tc.initScheduled...).Err(); err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
if err := r.client.ZAdd(retryQ, tc.initRetry...).Err(); err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
err := r.CheckAndEnqueue()
|
||||
if err != nil {
|
||||
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err)
|
||||
continue
|
||||
}
|
||||
queued := r.client.LRange(defaultQ, 0, -1).Val()
|
||||
gotQueued := mustUnmarshalSlice(t, queued)
|
||||
if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" {
|
||||
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQ, len(gotQueued), len(tc.wantQueued), diff)
|
||||
}
|
||||
scheduled := r.client.ZRangeByScore(scheduledQ, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val()
|
||||
gotScheduled := mustUnmarshalSlice(t, scheduled)
|
||||
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" {
|
||||
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSchedule(t *testing.T) {
|
||||
r := setup(t)
|
||||
tests := []struct {
|
||||
msg *TaskMessage
|
||||
processAt time.Time
|
||||
}{
|
||||
{
|
||||
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
|
||||
time.Now().Add(15 * time.Minute),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err := r.Schedule(tc.msg, tc.processAt)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := r.client.ZRangeWithScores(scheduledQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
desc := fmt.Sprintf("(*RDB).Schedule(%v, %v)", tc.msg, tc.processAt)
|
||||
if len(res) != 1 {
|
||||
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), scheduledQ)
|
||||
continue
|
||||
}
|
||||
|
||||
if res[0].Score != float64(tc.processAt.Unix()) {
|
||||
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryLater(t *testing.T) {
|
||||
r := setup(t)
|
||||
tests := []struct {
|
||||
msg *TaskMessage
|
||||
processAt time.Time
|
||||
}{
|
||||
{
|
||||
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
|
||||
time.Now().Add(15 * time.Minute),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err := r.RetryLater(tc.msg, tc.processAt)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := r.client.ZRangeWithScores(retryQ, 0, -1).Result()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
desc := fmt.Sprintf("(*RDB).RetryLater(%v, %v)", tc.msg, tc.processAt)
|
||||
if len(res) != 1 {
|
||||
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), retryQ)
|
||||
continue
|
||||
}
|
||||
|
||||
if res[0].Score != float64(tc.processAt.Unix()) {
|
||||
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
18
poller.go
18
poller.go
@ -3,27 +3,25 @@ package asynq
|
||||
import (
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
type poller struct {
|
||||
rdb *rdb
|
||||
rdb *rdb.RDB
|
||||
|
||||
// channel to communicate back to the long running "poller" goroutine.
|
||||
done chan struct{}
|
||||
|
||||
// poll interval on average
|
||||
avgInterval time.Duration
|
||||
|
||||
// redis ZSETs to poll
|
||||
zsets []string
|
||||
}
|
||||
|
||||
func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller {
|
||||
func newPoller(r *rdb.RDB, avgInterval time.Duration) *poller {
|
||||
return &poller{
|
||||
rdb: rdb,
|
||||
rdb: r,
|
||||
done: make(chan struct{}),
|
||||
avgInterval: avgInterval,
|
||||
zsets: zsets,
|
||||
}
|
||||
}
|
||||
|
||||
@ -50,9 +48,7 @@ func (p *poller) start() {
|
||||
}
|
||||
|
||||
func (p *poller) exec() {
|
||||
for _, zset := range p.zsets {
|
||||
if err := p.rdb.forward(zset); err != nil {
|
||||
log.Printf("[ERROR] could not forward scheduled tasks from %q: %v\n", zset, err)
|
||||
}
|
||||
if err := p.rdb.CheckAndEnqueue(); err != nil {
|
||||
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
@ -5,16 +5,18 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
func TestPoller(t *testing.T) {
|
||||
type scheduledTask struct {
|
||||
msg *taskMessage
|
||||
msg *rdb.TaskMessage
|
||||
processAt time.Time
|
||||
}
|
||||
r := setup(t)
|
||||
rdbClient := rdb.NewRDB(r)
|
||||
const pollInterval = time.Second
|
||||
p := newPoller(r, pollInterval, []string{scheduled, retry})
|
||||
p := newPoller(rdbClient, pollInterval)
|
||||
t1 := randomTask("gen_thumbnail", "default", nil)
|
||||
t2 := randomTask("send_email", "default", nil)
|
||||
t3 := randomTask("reindex", "default", nil)
|
||||
@ -23,11 +25,11 @@ func TestPoller(t *testing.T) {
|
||||
tests := []struct {
|
||||
initScheduled []scheduledTask // scheduled queue initial state
|
||||
initRetry []scheduledTask // retry queue initial state
|
||||
initQueue []*taskMessage // default queue initial state
|
||||
initQueue []*rdb.TaskMessage // default queue initial state
|
||||
wait time.Duration // wait duration before checking for final state
|
||||
wantScheduled []*taskMessage // schedule queue final state
|
||||
wantRetry []*taskMessage // retry queue final state
|
||||
wantQueue []*taskMessage // default queue final state
|
||||
wantScheduled []*rdb.TaskMessage // schedule queue final state
|
||||
wantRetry []*rdb.TaskMessage // retry queue final state
|
||||
wantQueue []*rdb.TaskMessage // default queue final state
|
||||
}{
|
||||
{
|
||||
initScheduled: []scheduledTask{
|
||||
@ -37,11 +39,11 @@ func TestPoller(t *testing.T) {
|
||||
initRetry: []scheduledTask{
|
||||
{t3, time.Now().Add(-500 * time.Millisecond)},
|
||||
},
|
||||
initQueue: []*taskMessage{t4},
|
||||
initQueue: []*rdb.TaskMessage{t4},
|
||||
wait: pollInterval * 2,
|
||||
wantScheduled: []*taskMessage{t1},
|
||||
wantRetry: []*taskMessage{},
|
||||
wantQueue: []*taskMessage{t2, t3, t4},
|
||||
wantScheduled: []*rdb.TaskMessage{t1},
|
||||
wantRetry: []*rdb.TaskMessage{},
|
||||
wantQueue: []*rdb.TaskMessage{t2, t3, t4},
|
||||
},
|
||||
{
|
||||
initScheduled: []scheduledTask{
|
||||
@ -50,36 +52,36 @@ func TestPoller(t *testing.T) {
|
||||
{t3, time.Now().Add(-500 * time.Millisecond)},
|
||||
},
|
||||
initRetry: []scheduledTask{},
|
||||
initQueue: []*taskMessage{t4},
|
||||
initQueue: []*rdb.TaskMessage{t4},
|
||||
wait: pollInterval * 2,
|
||||
wantScheduled: []*taskMessage{},
|
||||
wantRetry: []*taskMessage{},
|
||||
wantQueue: []*taskMessage{t1, t2, t3, t4},
|
||||
wantScheduled: []*rdb.TaskMessage{},
|
||||
wantRetry: []*rdb.TaskMessage{},
|
||||
wantQueue: []*rdb.TaskMessage{t1, t2, t3, t4},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
if err := r.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// initialize scheduled queue
|
||||
for _, st := range tc.initScheduled {
|
||||
err := r.schedule(scheduled, st.processAt, st.msg)
|
||||
err := rdbClient.Schedule(st.msg, st.processAt)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
// initialize retry queue
|
||||
for _, st := range tc.initRetry {
|
||||
err := r.schedule(retry, st.processAt, st.msg)
|
||||
err := rdbClient.RetryLater(st.msg, st.processAt)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
// initialize default queue
|
||||
for _, msg := range tc.initQueue {
|
||||
err := r.enqueue(msg)
|
||||
err := rdbClient.Enqueue(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -89,22 +91,22 @@ func TestPoller(t *testing.T) {
|
||||
time.Sleep(tc.wait)
|
||||
p.terminate()
|
||||
|
||||
gotScheduledRaw := r.client.ZRange(scheduled, 0, -1).Val()
|
||||
gotScheduledRaw := r.ZRange(scheduledQ, 0, -1).Val()
|
||||
gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw)
|
||||
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", scheduled, diff)
|
||||
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", scheduledQ, diff)
|
||||
}
|
||||
|
||||
gotRetryRaw := r.client.ZRange(retry, 0, -1).Val()
|
||||
gotRetryRaw := r.ZRange(retryQ, 0, -1).Val()
|
||||
gotRetry := mustUnmarshalSlice(t, gotRetryRaw)
|
||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", retry, diff)
|
||||
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", retryQ, diff)
|
||||
}
|
||||
|
||||
gotQueueRaw := r.client.LRange(defaultQueue, 0, -1).Val()
|
||||
gotQueueRaw := r.LRange(defaultQ, 0, -1).Val()
|
||||
gotQueue := mustUnmarshalSlice(t, gotQueueRaw)
|
||||
if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", defaultQueue, diff)
|
||||
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", defaultQ, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
22
processor.go
22
processor.go
@ -4,10 +4,12 @@ import (
|
||||
"fmt"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
type processor struct {
|
||||
rdb *rdb
|
||||
rdb *rdb.RDB
|
||||
|
||||
handler Handler
|
||||
|
||||
@ -24,9 +26,9 @@ type processor struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newProcessor(rdb *rdb, numWorkers int, handler Handler) *processor {
|
||||
func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
|
||||
return &processor{
|
||||
rdb: rdb,
|
||||
rdb: r,
|
||||
handler: handler,
|
||||
dequeueTimeout: 5 * time.Second,
|
||||
sema: make(chan struct{}, numWorkers),
|
||||
@ -68,8 +70,8 @@ func (p *processor) start() {
|
||||
// exec pulls a task out of the queue and starts a worker goroutine to
|
||||
// process the task.
|
||||
func (p *processor) exec() {
|
||||
msg, err := p.rdb.dequeue(defaultQueue, p.dequeueTimeout)
|
||||
if err == errDequeueTimeout {
|
||||
msg, err := p.rdb.Dequeue(p.dequeueTimeout)
|
||||
if err == rdb.ErrDequeueTimeout {
|
||||
// timed out, this is a normal behavior.
|
||||
return
|
||||
}
|
||||
@ -83,9 +85,9 @@ func (p *processor) exec() {
|
||||
go func(task *Task) {
|
||||
// NOTE: This deferred anonymous function needs to take taskMessage as a value because
|
||||
// the message can be mutated by the time this function is called.
|
||||
defer func(msg taskMessage) {
|
||||
if err := p.rdb.remove(inProgress, &msg); err != nil {
|
||||
log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err)
|
||||
defer func(msg rdb.TaskMessage) {
|
||||
if err := p.rdb.Done(&msg); err != nil {
|
||||
log.Printf("[ERROR] could not mark task %+v as done: %v\n", msg, err)
|
||||
}
|
||||
<-p.sema // release token
|
||||
}(*msg)
|
||||
@ -99,9 +101,9 @@ func (p *processor) exec() {
|
||||
// restore moves all tasks from "in-progress" back to queue
|
||||
// to restore all unfinished tasks.
|
||||
func (p *processor) restore() {
|
||||
err := p.rdb.moveAll(inProgress, defaultQueue)
|
||||
err := p.rdb.RestoreUnfinished()
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] could not move tasks from %q to %q\n", inProgress, defaultQueue)
|
||||
log.Printf("[ERROR] could not restore unfinished tasks: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,10 +7,12 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
func TestProcessorSuccess(t *testing.T) {
|
||||
r := setup(t)
|
||||
rdbClient := rdb.NewRDB(r)
|
||||
|
||||
m1 := randomTask("send_email", "default", nil)
|
||||
m2 := randomTask("gen_thumbnail", "default", nil)
|
||||
@ -23,20 +25,20 @@ func TestProcessorSuccess(t *testing.T) {
|
||||
t4 := &Task{Type: m4.Type, Payload: m4.Payload}
|
||||
|
||||
tests := []struct {
|
||||
initQueue []*taskMessage // initial default queue state
|
||||
incoming []*taskMessage // tasks to be enqueued during run
|
||||
initQueue []*rdb.TaskMessage // initial default queue state
|
||||
incoming []*rdb.TaskMessage // tasks to be enqueued during run
|
||||
wait time.Duration // wait duration between starting and stopping processor for this test case
|
||||
wantProcessed []*Task // tasks to be processed at the end
|
||||
}{
|
||||
{
|
||||
initQueue: []*taskMessage{m1},
|
||||
incoming: []*taskMessage{m2, m3, m4},
|
||||
initQueue: []*rdb.TaskMessage{m1},
|
||||
incoming: []*rdb.TaskMessage{m2, m3, m4},
|
||||
wait: time.Second,
|
||||
wantProcessed: []*Task{t1, t2, t3, t4},
|
||||
},
|
||||
{
|
||||
initQueue: []*taskMessage{},
|
||||
incoming: []*taskMessage{m1},
|
||||
initQueue: []*rdb.TaskMessage{},
|
||||
incoming: []*rdb.TaskMessage{m1},
|
||||
wait: time.Second,
|
||||
wantProcessed: []*Task{t1},
|
||||
},
|
||||
@ -44,7 +46,7 @@ func TestProcessorSuccess(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
if err := r.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// instantiate a new processor
|
||||
@ -57,11 +59,11 @@ func TestProcessorSuccess(t *testing.T) {
|
||||
processed = append(processed, task)
|
||||
return nil
|
||||
}
|
||||
p := newProcessor(r, 10, h)
|
||||
p := newProcessor(rdbClient, 10, h)
|
||||
p.dequeueTimeout = time.Second // short time out for test purpose
|
||||
// initialize default queue.
|
||||
for _, msg := range tc.initQueue {
|
||||
err := r.enqueue(msg)
|
||||
err := rdbClient.Enqueue(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -70,7 +72,7 @@ func TestProcessorSuccess(t *testing.T) {
|
||||
p.start()
|
||||
|
||||
for _, msg := range tc.incoming {
|
||||
err := r.enqueue(msg)
|
||||
err := rdbClient.Enqueue(msg)
|
||||
if err != nil {
|
||||
p.terminate()
|
||||
t.Fatal(err)
|
||||
@ -83,14 +85,15 @@ func TestProcessorSuccess(t *testing.T) {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
}
|
||||
|
||||
if l := r.client.LLen(inProgress).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", inProgress, l)
|
||||
if l := r.LLen(inProgressQ).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", inProgressQ, l)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessorRetry(t *testing.T) {
|
||||
r := setup(t)
|
||||
rdbClient := rdb.NewRDB(r)
|
||||
|
||||
m1 := randomTask("send_email", "default", nil)
|
||||
m1.Retried = m1.Retry // m1 has reached its max retry count
|
||||
@ -113,24 +116,24 @@ func TestProcessorRetry(t *testing.T) {
|
||||
r4.Retried = m4.Retried + 1
|
||||
|
||||
tests := []struct {
|
||||
initQueue []*taskMessage // initial default queue state
|
||||
incoming []*taskMessage // tasks to be enqueued during run
|
||||
initQueue []*rdb.TaskMessage // initial default queue state
|
||||
incoming []*rdb.TaskMessage // tasks to be enqueued during run
|
||||
wait time.Duration // wait duration between starting and stopping processor for this test case
|
||||
wantRetry []*taskMessage // tasks in retry queue at the end
|
||||
wantDead []*taskMessage // tasks in dead queue at the end
|
||||
wantRetry []*rdb.TaskMessage // tasks in retry queue at the end
|
||||
wantDead []*rdb.TaskMessage // tasks in dead queue at the end
|
||||
}{
|
||||
{
|
||||
initQueue: []*taskMessage{m1, m2},
|
||||
incoming: []*taskMessage{m3, m4},
|
||||
initQueue: []*rdb.TaskMessage{m1, m2},
|
||||
incoming: []*rdb.TaskMessage{m3, m4},
|
||||
wait: time.Second,
|
||||
wantRetry: []*taskMessage{&r2, &r3, &r4},
|
||||
wantDead: []*taskMessage{&r1},
|
||||
wantRetry: []*rdb.TaskMessage{&r2, &r3, &r4},
|
||||
wantDead: []*rdb.TaskMessage{&r1},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
if err := r.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// instantiate a new processor
|
||||
@ -138,11 +141,11 @@ func TestProcessorRetry(t *testing.T) {
|
||||
h = func(task *Task) error {
|
||||
return fmt.Errorf(errMsg)
|
||||
}
|
||||
p := newProcessor(r, 10, h)
|
||||
p := newProcessor(rdbClient, 10, h)
|
||||
p.dequeueTimeout = time.Second // short time out for test purpose
|
||||
// initialize default queue.
|
||||
for _, msg := range tc.initQueue {
|
||||
err := r.enqueue(msg)
|
||||
err := rdbClient.Enqueue(msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -150,7 +153,7 @@ func TestProcessorRetry(t *testing.T) {
|
||||
|
||||
p.start()
|
||||
for _, msg := range tc.incoming {
|
||||
err := r.enqueue(msg)
|
||||
err := rdbClient.Enqueue(msg)
|
||||
if err != nil {
|
||||
p.terminate()
|
||||
t.Fatal(err)
|
||||
@ -159,20 +162,20 @@ func TestProcessorRetry(t *testing.T) {
|
||||
time.Sleep(tc.wait)
|
||||
p.terminate()
|
||||
|
||||
gotRetryRaw := r.client.ZRange(retry, 0, -1).Val()
|
||||
gotRetryRaw := r.ZRange(retryQ, 0, -1).Val()
|
||||
gotRetry := mustUnmarshalSlice(t, gotRetryRaw)
|
||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", retry, diff)
|
||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", retryQ, diff)
|
||||
}
|
||||
|
||||
gotDeadRaw := r.client.ZRange(dead, 0, -1).Val()
|
||||
gotDeadRaw := r.ZRange(deadQ, 0, -1).Val()
|
||||
gotDead := mustUnmarshalSlice(t, gotDeadRaw)
|
||||
if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", dead, diff)
|
||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", deadQ, diff)
|
||||
}
|
||||
|
||||
if l := r.client.LLen(inProgress).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", inProgress, l)
|
||||
if l := r.LLen(inProgressQ).Val(); l != 0 {
|
||||
t.Errorf("%q has %d tasks, want 0", inProgressQ, l)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
156
rdb.go
156
rdb.go
@ -1,156 +0,0 @@
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
)
|
||||
|
||||
// Redis keys
|
||||
const (
|
||||
queuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
||||
defaultQueue = queuePrefix + "default" // LIST
|
||||
allQueues = "asynq:queues" // SET
|
||||
scheduled = "asynq:scheduled" // ZSET
|
||||
retry = "asynq:retry" // ZSET
|
||||
dead = "asynq:dead" // ZSET
|
||||
inProgress = "asynq:in_progress" // SET
|
||||
)
|
||||
|
||||
var errDequeueTimeout = errors.New("blocking dequeue operation timed out")
|
||||
|
||||
// rdb encapsulates the interactions with redis server.
|
||||
type rdb struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
func newRDB(config *RedisConfig) *rdb {
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: config.Addr,
|
||||
Password: config.Password,
|
||||
DB: config.DB,
|
||||
})
|
||||
return &rdb{client}
|
||||
}
|
||||
|
||||
// enqueue inserts the given task to the end of the queue.
|
||||
// It also adds the queue name to the "all-queues" list.
|
||||
func (r *rdb) enqueue(msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
qname := queuePrefix + msg.Queue
|
||||
pipe := r.client.Pipeline()
|
||||
pipe.SAdd(allQueues, qname)
|
||||
pipe.LPush(qname, string(bytes))
|
||||
_, err = pipe.Exec()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not enqueue the task %+v to %q: %v", msg, qname, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// dequeue blocks until there is a task available to be processed,
|
||||
// once a task is available, it adds the task to "in progress" list
|
||||
// and returns the task.
|
||||
func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) {
|
||||
data, err := r.client.BRPopLPush(qname, inProgress, timeout).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, errDequeueTimeout
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, inProgress, timeout, err)
|
||||
}
|
||||
var msg taskMessage
|
||||
err = json.Unmarshal([]byte(data), &msg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err)
|
||||
}
|
||||
fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, qname)
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// remove deletes all elements equal to msg from a redis list with the given key.
|
||||
func (r *rdb) remove(key string, msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
// NOTE: count ZERO means "remove all elements equal to val"
|
||||
err = r.client.LRem(key, 0, string(bytes)).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command `LREM %s 0 %s` failed: %v", key, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// schedule adds the task to the zset to be processd at the specified time.
|
||||
func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
score := float64(processAt.Unix())
|
||||
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const maxDeadTask = 100
|
||||
const deadExpirationInDays = 90
|
||||
|
||||
// kill sends the taskMessage to "dead" set.
|
||||
// It also trims the sorted set by timestamp and set size.
|
||||
func (r *rdb) kill(msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
now := time.Now()
|
||||
pipe := r.client.Pipeline()
|
||||
pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit)))
|
||||
pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100
|
||||
_, err = pipe.Exec()
|
||||
return err
|
||||
}
|
||||
|
||||
// moveAll moves all tasks from src list to dst list.
|
||||
func (r *rdb) moveAll(src, dst string) error {
|
||||
script := redis.NewScript(`
|
||||
local len = redis.call("LLEN", KEYS[1])
|
||||
for i = len, 1, -1 do
|
||||
redis.call("RPOPLPUSH", KEYS[1], KEYS[2])
|
||||
end
|
||||
return len
|
||||
`)
|
||||
_, err := script.Run(r.client, []string{src, dst}).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
// forward moves all tasks with a score less than the current unix time
|
||||
// from the given zset to the default queue.
|
||||
// TODO(hibiken): Find a better method name that reflects what this does.
|
||||
func (r *rdb) forward(from string) error {
|
||||
script := redis.NewScript(`
|
||||
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
||||
for _, msg in ipairs(msgs) do
|
||||
redis.call("ZREM", KEYS[1], msg)
|
||||
redis.call("SADD", KEYS[2], KEYS[3])
|
||||
redis.call("LPUSH", KEYS[3], msg)
|
||||
end
|
||||
return msgs
|
||||
`)
|
||||
now := float64(time.Now().Unix())
|
||||
res, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result()
|
||||
fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from)
|
||||
return err
|
||||
}
|
358
rdb_test.go
358
rdb_test.go
@ -1,358 +0,0 @@
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
)
|
||||
|
||||
func TestEnqueue(t *testing.T) {
|
||||
r := setup(t)
|
||||
tests := []struct {
|
||||
msg *taskMessage
|
||||
}{
|
||||
{msg: randomTask("send_email", "default",
|
||||
map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})},
|
||||
{msg: randomTask("generate_csv", "default",
|
||||
map[string]interface{}{})},
|
||||
{msg: randomTask("sync", "default", nil)},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err := r.enqueue(tc.msg)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
res := r.client.LRange(defaultQueue, 0, -1).Val()
|
||||
if len(res) != 1 {
|
||||
t.Errorf("LIST %q has length %d, want 1", defaultQueue, len(res))
|
||||
continue
|
||||
}
|
||||
if !r.client.SIsMember(allQueues, defaultQueue).Val() {
|
||||
t.Errorf("SISMEMBER %q %q = false, want true", allQueues, defaultQueue)
|
||||
}
|
||||
if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" {
|
||||
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDequeue(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello!"})
|
||||
tests := []struct {
|
||||
queued []*taskMessage
|
||||
want *taskMessage
|
||||
err error
|
||||
inProgress int64 // length of "in-progress" tasks after dequeue
|
||||
}{
|
||||
{queued: []*taskMessage{t1}, want: t1, err: nil, inProgress: 1},
|
||||
{queued: []*taskMessage{}, want: nil, err: errDequeueTimeout, inProgress: 0},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for _, m := range tc.queued {
|
||||
r.enqueue(m)
|
||||
}
|
||||
got, err := r.dequeue(defaultQueue, time.Second)
|
||||
if !cmp.Equal(got, tc.want) || err != tc.err {
|
||||
t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v",
|
||||
defaultQueue, got, err, tc.want, tc.err)
|
||||
continue
|
||||
}
|
||||
if l := r.client.LLen(inProgress).Val(); l != tc.inProgress {
|
||||
t.Errorf("LIST %q has length %d, want %d", inProgress, l, tc.inProgress)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemove(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
t2 := randomTask("export_csv", "csv", nil)
|
||||
|
||||
tests := []struct {
|
||||
initial []*taskMessage // initial state of the list
|
||||
target *taskMessage // task to remove
|
||||
final []*taskMessage // final state of the list
|
||||
}{
|
||||
{
|
||||
initial: []*taskMessage{t1, t2},
|
||||
target: t1,
|
||||
final: []*taskMessage{t2},
|
||||
},
|
||||
{
|
||||
initial: []*taskMessage{t2},
|
||||
target: t1,
|
||||
final: []*taskMessage{t2},
|
||||
},
|
||||
{
|
||||
initial: []*taskMessage{t1},
|
||||
target: t1,
|
||||
final: []*taskMessage{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// set up initial state
|
||||
for _, task := range tc.initial {
|
||||
err := r.client.LPush(defaultQueue, mustMarshal(t, task)).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err := r.remove(defaultQueue, tc.target)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
var got []*taskMessage
|
||||
data := r.client.LRange(defaultQueue, 0, -1).Val()
|
||||
for _, s := range data {
|
||||
got = append(got, mustUnmarshal(t, s))
|
||||
}
|
||||
|
||||
if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", defaultQueue, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKill(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
|
||||
// TODO(hibiken): add test cases for trimming
|
||||
tests := []struct {
|
||||
initial []*taskMessage // inital state of "dead" set
|
||||
target *taskMessage // task to kill
|
||||
want []*taskMessage // final state of "dead" set
|
||||
}{
|
||||
{
|
||||
initial: []*taskMessage{},
|
||||
target: t1,
|
||||
want: []*taskMessage{t1},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
// set up initial state
|
||||
for _, task := range tc.initial {
|
||||
err := r.client.ZAdd(dead, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
err := r.kill(tc.target)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
actual := r.client.ZRange(dead, 0, -1).Val()
|
||||
got := mustUnmarshalSlice(t, actual)
|
||||
if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", dead, diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMoveAll(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", "default", nil)
|
||||
t2 := randomTask("export_csv", "csv", nil)
|
||||
t3 := randomTask("sync_stuff", "sync", nil)
|
||||
|
||||
tests := []struct {
|
||||
beforeSrc []*taskMessage
|
||||
beforeDst []*taskMessage
|
||||
afterSrc []*taskMessage
|
||||
afterDst []*taskMessage
|
||||
}{
|
||||
{
|
||||
beforeSrc: []*taskMessage{t1, t2, t3},
|
||||
beforeDst: []*taskMessage{},
|
||||
afterSrc: []*taskMessage{},
|
||||
afterDst: []*taskMessage{t1, t2, t3},
|
||||
},
|
||||
{
|
||||
beforeSrc: []*taskMessage{},
|
||||
beforeDst: []*taskMessage{t1, t2, t3},
|
||||
afterSrc: []*taskMessage{},
|
||||
afterDst: []*taskMessage{t1, t2, t3},
|
||||
},
|
||||
{
|
||||
beforeSrc: []*taskMessage{t2, t3},
|
||||
beforeDst: []*taskMessage{t1},
|
||||
afterSrc: []*taskMessage{},
|
||||
afterDst: []*taskMessage{t1, t2, t3},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
// seed src list.
|
||||
for _, msg := range tc.beforeSrc {
|
||||
r.client.LPush(inProgress, mustMarshal(t, msg))
|
||||
}
|
||||
// seed dst list.
|
||||
for _, msg := range tc.beforeDst {
|
||||
r.client.LPush(defaultQueue, mustMarshal(t, msg))
|
||||
}
|
||||
|
||||
if err := r.moveAll(inProgress, defaultQueue); err != nil {
|
||||
t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", inProgress, defaultQueue, err)
|
||||
continue
|
||||
}
|
||||
|
||||
src := r.client.LRange(inProgress, 0, -1).Val()
|
||||
gotSrc := mustUnmarshalSlice(t, src)
|
||||
if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgress, diff)
|
||||
}
|
||||
dst := r.client.LRange(defaultQueue, 0, -1).Val()
|
||||
gotDst := mustUnmarshalSlice(t, dst)
|
||||
if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQueue, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestForward(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := randomTask("send_email", defaultQueue, nil)
|
||||
t2 := randomTask("generate_csv", defaultQueue, nil)
|
||||
secondAgo := time.Now().Add(-time.Second)
|
||||
hourFromNow := time.Now().Add(time.Hour)
|
||||
|
||||
tests := []struct {
|
||||
tasks []*redis.Z // scheduled tasks with timestamp as a score
|
||||
wantQueued []*taskMessage // queue after calling forward
|
||||
wantScheduled []*taskMessage // scheduled queue after calling forward
|
||||
}{
|
||||
{
|
||||
tasks: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())},
|
||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
||||
wantQueued: []*taskMessage{t1, t2},
|
||||
wantScheduled: []*taskMessage{},
|
||||
},
|
||||
{
|
||||
tasks: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
||||
wantQueued: []*taskMessage{t2},
|
||||
wantScheduled: []*taskMessage{t1},
|
||||
},
|
||||
{
|
||||
tasks: []*redis.Z{
|
||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}},
|
||||
wantQueued: []*taskMessage{},
|
||||
wantScheduled: []*taskMessage{t1, t2},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := r.client.ZAdd(scheduled, tc.tasks...).Err(); err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
err := r.forward(scheduled)
|
||||
if err != nil {
|
||||
t.Errorf("(*rdb).forward(%q) = %v, want nil", scheduled, err)
|
||||
continue
|
||||
}
|
||||
queued := r.client.LRange(defaultQueue, 0, -1).Val()
|
||||
gotQueued := mustUnmarshalSlice(t, queued)
|
||||
if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" {
|
||||
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQueue, len(gotQueued), len(tc.wantQueued), diff)
|
||||
continue
|
||||
}
|
||||
scheduled := r.client.ZRangeByScore(scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val()
|
||||
gotScheduled := mustUnmarshalSlice(t, scheduled)
|
||||
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" {
|
||||
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSchedule(t *testing.T) {
|
||||
r := setup(t)
|
||||
tests := []struct {
|
||||
msg *taskMessage
|
||||
processAt time.Time
|
||||
zset string
|
||||
}{
|
||||
{
|
||||
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
|
||||
time.Now().Add(15 * time.Minute),
|
||||
scheduled,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err := r.schedule(tc.zset, tc.processAt, tc.msg)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
res, err := r.client.ZRangeWithScores(tc.zset, 0, -1).Result()
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
desc := fmt.Sprintf("(*rdb).schedule(%q, %v, %v)", tc.zset, tc.processAt, tc.msg)
|
||||
if len(res) != 1 {
|
||||
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), tc.zset)
|
||||
continue
|
||||
}
|
||||
|
||||
if res[0].Score != float64(tc.processAt.Unix()) {
|
||||
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
8
retry.go
8
retry.go
@ -5,13 +5,15 @@ import (
|
||||
"math"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
func retryTask(rdb *rdb, msg *taskMessage, err error) {
|
||||
func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) {
|
||||
msg.ErrorMsg = err.Error()
|
||||
if msg.Retried >= msg.Retry {
|
||||
log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID)
|
||||
if err := rdb.kill(msg); err != nil {
|
||||
if err := r.Kill(msg); err != nil {
|
||||
log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err)
|
||||
}
|
||||
return
|
||||
@ -19,7 +21,7 @@ func retryTask(rdb *rdb, msg *taskMessage, err error) {
|
||||
retryAt := time.Now().Add(delaySeconds((msg.Retried)))
|
||||
log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now()))
|
||||
msg.Retried++
|
||||
if err := rdb.schedule(retry, retryAt, msg); err != nil {
|
||||
if err := r.RetryLater(msg, retryAt); err != nil {
|
||||
log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err)
|
||||
return
|
||||
}
|
||||
|
@ -6,18 +6,20 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
func TestRetry(t *testing.T) {
|
||||
r := setup(t)
|
||||
rdbClient := rdb.NewRDB(r)
|
||||
errMsg := "email server not responding"
|
||||
// t1 is a task with max-retry count reached.
|
||||
t1 := &taskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: uuid.New()}
|
||||
t1 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: uuid.New()}
|
||||
// t2 is t1 with updated error message.
|
||||
t2 := *t1
|
||||
t2.ErrorMsg = errMsg
|
||||
// t3 is a task which hasn't reached max-retry count.
|
||||
t3 := &taskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: uuid.New()}
|
||||
t3 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: uuid.New()}
|
||||
// t4 is t3 after retry.
|
||||
t4 := *t3
|
||||
t4.Retried++
|
||||
@ -25,45 +27,45 @@ func TestRetry(t *testing.T) {
|
||||
|
||||
tests := []struct {
|
||||
desc string // test case description
|
||||
msg *taskMessage // task to retry
|
||||
msg *rdb.TaskMessage // task to retry
|
||||
err error // error that caused retry
|
||||
wantDead []*taskMessage // state "dead" queue should be in
|
||||
wantRetry []*taskMessage // state "retry" queue should be in
|
||||
wantDead []*rdb.TaskMessage // state "dead" queue should be in
|
||||
wantRetry []*rdb.TaskMessage // state "retry" queue should be in
|
||||
}{
|
||||
{
|
||||
desc: "With retry exhausted task",
|
||||
msg: t1,
|
||||
err: fmt.Errorf(errMsg),
|
||||
wantDead: []*taskMessage{&t2},
|
||||
wantRetry: []*taskMessage{},
|
||||
wantDead: []*rdb.TaskMessage{&t2},
|
||||
wantRetry: []*rdb.TaskMessage{},
|
||||
},
|
||||
{
|
||||
desc: "With retry-able task",
|
||||
msg: t3,
|
||||
err: fmt.Errorf(errMsg),
|
||||
wantDead: []*taskMessage{},
|
||||
wantRetry: []*taskMessage{&t4},
|
||||
wantDead: []*rdb.TaskMessage{},
|
||||
wantRetry: []*rdb.TaskMessage{&t4},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
// clean up db before each test case.
|
||||
if err := r.client.FlushDB().Err(); err != nil {
|
||||
if err := r.FlushDB().Err(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
retryTask(r, tc.msg, tc.err)
|
||||
retryTask(rdbClient, tc.msg, tc.err)
|
||||
|
||||
deadQueue := r.client.ZRange(dead, 0, -1).Val()
|
||||
deadQueue := r.ZRange(deadQ, 0, -1).Val()
|
||||
gotDead := mustUnmarshalSlice(t, deadQueue)
|
||||
if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, dead, diff)
|
||||
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff)
|
||||
}
|
||||
|
||||
retryQueue := r.client.ZRange(retry, 0, -1).Val()
|
||||
retryQueue := r.ZRange(retryQ, 0, -1).Val()
|
||||
gotRetry := mustUnmarshalSlice(t, retryQueue)
|
||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
|
||||
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, dead, diff)
|
||||
t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, deadQ, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user