From d4e442d04fafd1bbf06501c07aeef51438ae17df Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 3 Dec 2019 21:01:26 -0800 Subject: [PATCH] Extract rdb to internal package --- asynq.go | 94 +---- asynq_test.go | 62 ++- background.go | 14 +- client.go | 14 +- client_test.go | 13 +- cmd/asynqmon/main.go | 10 +- go.sum | 9 + inspector.go | 84 ---- inspector_test.go | 503 ------------------------ rdb.go => internal/rdb/rdb.go | 213 +++++++--- rdb_test.go => internal/rdb/rdb_test.go | 242 ++++++++---- poller.go | 10 +- poller_test.go | 56 +-- processor.go | 22 +- processor_test.go | 69 ++-- retry.go | 8 +- retry_test.go | 36 +- 17 files changed, 488 insertions(+), 971 deletions(-) delete mode 100644 inspector.go delete mode 100644 inspector_test.go rename rdb.go => internal/rdb/rdb.go (52%) rename rdb_test.go => internal/rdb/rdb_test.go (54%) diff --git a/asynq.go b/asynq.go index 20071a4..fb5226d 100644 --- a/asynq.go +++ b/asynq.go @@ -1,10 +1,6 @@ package asynq -import ( - "time" - - "github.com/google/uuid" -) +import "github.com/go-redis/redis/v7" /* TODOs: @@ -27,32 +23,6 @@ type Task struct { Payload map[string]interface{} } -// taskMessage is an internal representation of a task with additional metadata fields. -// This data gets written in redis. -type taskMessage struct { - //-------- Task fields -------- - - Type string - Payload map[string]interface{} - - //-------- metadata fields -------- - - // unique identifier for each task - ID uuid.UUID - - // queue name this message should be enqueued to - Queue string - - // max number of retry for this task. - Retry int - - // number of times we've retried so far - Retried int - - // error message from the last failure - ErrorMsg string -} - // RedisConfig specifies redis configurations. type RedisConfig struct { Addr string @@ -62,60 +32,10 @@ type RedisConfig struct { DB int } -// Stats represents a state of queues at a certain time. -type Stats struct { - Queued int - InProgress int - Scheduled int - Retry int - Dead int - Timestamp time.Time -} - -// EnqueuedTask is a task in a queue and is ready to be processed. -// This is read only and used for inspection purpose. -type EnqueuedTask struct { - ID uuid.UUID - Type string - Payload map[string]interface{} -} - -// InProgressTask is a task that's currently being processed. -// This is read only and used for inspection purpose. -type InProgressTask struct { - ID uuid.UUID - Type string - Payload map[string]interface{} -} - -// ScheduledTask is a task that's scheduled to be processed in the future. -// This is read only and used for inspection purpose. -type ScheduledTask struct { - ID uuid.UUID - Type string - Payload map[string]interface{} - ProcessAt time.Time -} - -// RetryTask is a task that's in retry queue because worker failed to process the task. -// This is read only and used for inspection purpose. -type RetryTask struct { - ID uuid.UUID - Type string - Payload map[string]interface{} - // TODO(hibiken): add LastFailedAt time.Time - ProcessAt time.Time - ErrorMsg string - Retried int - Retry int -} - -// DeadTask is a task in that has exhausted all retries. -// This is read only and used for inspection purpose. -type DeadTask struct { - ID uuid.UUID - Type string - Payload map[string]interface{} - LastFailedAt time.Time - ErrorMsg string +func newRedisClient(config *RedisConfig) *redis.Client { + return redis.NewClient(&redis.Options{ + Addr: config.Addr, + Password: config.Password, + DB: config.DB, + }) } diff --git a/asynq_test.go b/asynq_test.go index d6e58cf..49022f6 100644 --- a/asynq_test.go +++ b/asynq_test.go @@ -2,29 +2,30 @@ package asynq import ( "encoding/json" - "math/rand" "sort" "testing" - "time" + "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/google/uuid" + "github.com/hibiken/asynq/internal/rdb" ) // This file defines test helper functions used by // other test files. -func init() { - rand.Seed(time.Now().UnixNano()) -} - -var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*taskMessage) []*taskMessage { - out := append([]*taskMessage(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() +func setup(t *testing.T) *redis.Client { + t.Helper() + r := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 2, }) - return out -}) + // Start each test with a clean slate. + if err := r.FlushDB().Err(); err != nil { + panic(err) + } + return r +} var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { out := append([]*Task(nil), in...) // Copy input to avoid mutating it @@ -34,32 +35,25 @@ var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { return out }) -// setup connects to a redis database and flush all keys -// before returning an instance of rdb. -func setup(t *testing.T) *rdb { - t.Helper() - r := newRDB(&RedisConfig{ - Addr: "localhost:6379", - DB: 15, // use database 15 to separate from other applications +var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*rdb.TaskMessage) []*rdb.TaskMessage { + out := append([]*rdb.TaskMessage(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() }) - // Start each test with a clean slate. - if err := r.client.FlushDB().Err(); err != nil { - panic(err) - } - return r -} + return out +}) -func randomTask(taskType, qname string, payload map[string]interface{}) *taskMessage { - return &taskMessage{ +func randomTask(taskType, qname string, payload map[string]interface{}) *rdb.TaskMessage { + return &rdb.TaskMessage{ ID: uuid.New(), Type: taskType, Queue: qname, - Retry: rand.Intn(100), + Retry: defaultMaxRetry, Payload: make(map[string]interface{}), } } -func mustMarshal(t *testing.T, task *taskMessage) string { +func mustMarshal(t *testing.T, task *rdb.TaskMessage) string { t.Helper() data, err := json.Marshal(task) if err != nil { @@ -68,9 +62,9 @@ func mustMarshal(t *testing.T, task *taskMessage) string { return string(data) } -func mustUnmarshal(t *testing.T, data string) *taskMessage { +func mustUnmarshal(t *testing.T, data string) *rdb.TaskMessage { t.Helper() - var task taskMessage + var task rdb.TaskMessage err := json.Unmarshal([]byte(data), &task) if err != nil { t.Fatal(err) @@ -78,7 +72,7 @@ func mustUnmarshal(t *testing.T, data string) *taskMessage { return &task } -func mustMarshalSlice(t *testing.T, tasks []*taskMessage) []string { +func mustMarshalSlice(t *testing.T, tasks []*rdb.TaskMessage) []string { t.Helper() var data []string for _, task := range tasks { @@ -87,9 +81,9 @@ func mustMarshalSlice(t *testing.T, tasks []*taskMessage) []string { return data } -func mustUnmarshalSlice(t *testing.T, data []string) []*taskMessage { +func mustUnmarshalSlice(t *testing.T, data []string) []*rdb.TaskMessage { t.Helper() - var tasks []*taskMessage + var tasks []*rdb.TaskMessage for _, s := range data { tasks = append(tasks, mustUnmarshal(t, s)) } diff --git a/background.go b/background.go index d669425..9cec5d9 100644 --- a/background.go +++ b/background.go @@ -7,6 +7,8 @@ import ( "os/signal" "sync" "time" + + "github.com/hibiken/asynq/internal/rdb" ) // Background is a top-level entity for the background-task processing. @@ -14,18 +16,18 @@ type Background struct { mu sync.Mutex running bool - rdb *rdb + rdb *rdb.RDB poller *poller processor *processor } // NewBackground returns a new Background instance. func NewBackground(numWorkers int, config *RedisConfig) *Background { - rdb := newRDB(config) - poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) - processor := newProcessor(rdb, numWorkers, nil) + r := rdb.NewRDB(newRedisClient(config)) + poller := newPoller(r, 5*time.Second, []string{rdb.Scheduled, rdb.Retry}) + processor := newProcessor(r, numWorkers, nil) return &Background{ - rdb: rdb, + rdb: r, poller: poller, processor: processor, } @@ -95,7 +97,7 @@ func (bg *Background) stop() { bg.poller.terminate() bg.processor.terminate() - bg.rdb.client.Close() + bg.rdb.Close() bg.processor.handler = nil bg.running = false } diff --git a/client.go b/client.go index d98f7ed..1061393 100644 --- a/client.go +++ b/client.go @@ -4,21 +4,23 @@ import ( "time" "github.com/google/uuid" + "github.com/hibiken/asynq/internal/rdb" ) // Client is an interface for scheduling tasks. type Client struct { - rdb *rdb + rdb *rdb.RDB } // NewClient creates and returns a new client. func NewClient(config *RedisConfig) *Client { - return &Client{rdb: newRDB(config)} + r := rdb.NewRDB(newRedisClient(config)) + return &Client{r} } // Process enqueues the task to be performed at a given time. func (c *Client) Process(task *Task, processAt time.Time) error { - msg := &taskMessage{ + msg := &rdb.TaskMessage{ ID: uuid.New(), Type: task.Type, Payload: task.Payload, @@ -29,9 +31,9 @@ func (c *Client) Process(task *Task, processAt time.Time) error { } // enqueue pushes a given task to the specified queue. -func (c *Client) enqueue(msg *taskMessage, processAt time.Time) error { +func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error { if time.Now().After(processAt) { - return c.rdb.enqueue(msg) + return c.rdb.Enqueue(msg) } - return c.rdb.schedule(scheduled, processAt, msg) + return c.rdb.Schedule(rdb.Scheduled, processAt, msg) } diff --git a/client_test.go b/client_test.go index 8394936..7474b5a 100644 --- a/client_test.go +++ b/client_test.go @@ -1,13 +1,14 @@ package asynq import ( + "github.com/hibiken/asynq/internal/rdb" "testing" "time" ) func TestClient(t *testing.T) { r := setup(t) - client := &Client{rdb: r} + client := &Client{rdb.NewRDB(r)} tests := []struct { task *Task @@ -31,7 +32,7 @@ func TestClient(t *testing.T) { for _, tc := range tests { // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { + if err := r.FlushDB().Err(); err != nil { t.Fatal(err) } @@ -41,12 +42,12 @@ func TestClient(t *testing.T) { continue } - if l := r.client.LLen(defaultQueue).Val(); l != tc.wantQueueSize { - t.Errorf("%q has length %d, want %d", defaultQueue, l, tc.wantQueueSize) + if l := r.LLen(rdb.DefaultQueue).Val(); l != tc.wantQueueSize { + t.Errorf("%q has length %d, want %d", rdb.DefaultQueue, l, tc.wantQueueSize) } - if l := r.client.ZCard(scheduled).Val(); l != tc.wantScheduledSize { - t.Errorf("%q has length %d, want %d", scheduled, l, tc.wantScheduledSize) + if l := r.ZCard(rdb.Scheduled).Val(); l != tc.wantScheduledSize { + t.Errorf("%q has length %d, want %d", rdb.Scheduled, l, tc.wantScheduledSize) } } } diff --git a/cmd/asynqmon/main.go b/cmd/asynqmon/main.go index 72c826d..c980a02 100644 --- a/cmd/asynqmon/main.go +++ b/cmd/asynqmon/main.go @@ -9,19 +9,21 @@ import ( "text/tabwriter" "time" - "github.com/hibiken/asynq" + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" ) var pollInterval = flag.Duration("interval", 3*time.Second, "polling interval") func main() { - inspector := asynq.NewInspector(&asynq.RedisConfig{ + c := redis.NewClient(&redis.Options{ Addr: "localhost:6379", DB: 2, }) + r := rdb.NewClient(c) for { - stats, err := inspector.CurrentStats() + stats, err := r.CurrentStats() if err != nil { log.Fatal(err) } @@ -31,7 +33,7 @@ func main() { } } -func printStats(s *asynq.Stats) { +func printStats(s *rdb.Stats) { format := strings.Repeat("%v\t", 5) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) fmt.Fprintf(tw, format, "Enqueued", "InProgress", "Scheduled", "Retry", "Dead") diff --git a/go.sum b/go.sum index 9d573e1..4371a1d 100644 --- a/go.sum +++ b/go.sum @@ -8,17 +8,26 @@ github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4= go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/inspector.go b/inspector.go deleted file mode 100644 index 39697cc..0000000 --- a/inspector.go +++ /dev/null @@ -1,84 +0,0 @@ -package asynq - -// Inspector is used to inspect queues. -type Inspector struct { - rdb *rdb -} - -// NewInspector returns a new Inspector instance. -func NewInspector(config *RedisConfig) *Inspector { - return &Inspector{ - rdb: newRDB(config), - } -} - -// CurrentStats returns a current stats of queues. -func (i *Inspector) CurrentStats() (*Stats, error) { - return i.rdb.currentStats() -} - -// toTaskSlice converts a taskMessage slice to a Task slice. -func (i *Inspector) toTaskSlice(msgs []*taskMessage) []*Task { - var tasks []*Task - for _, m := range msgs { - tasks = append(tasks, &Task{Type: m.Type, Payload: m.Payload}) - } - return tasks -} - -// ListEnqueuedTasks returns a list of tasks ready to be processed. -func (i *Inspector) ListEnqueuedTasks() ([]*EnqueuedTask, error) { - // TODO(hibiken): Support pagination. - msgs, err := i.rdb.listEnqueued() - if err != nil { - return nil, err - } - var tasks []*EnqueuedTask - for _, m := range msgs { - tasks = append(tasks, &EnqueuedTask{ - ID: m.ID, - Type: m.Type, - Payload: m.Payload, - }) - } - return tasks, nil -} - -// ListInProgressTasks returns a list of tasks that are being processed. -func (i *Inspector) ListInProgressTasks() ([]*InProgressTask, error) { - // TODO(hibiken): Support pagination. - msgs, err := i.rdb.listInProgress() - if err != nil { - return nil, err - } - var tasks []*InProgressTask - for _, m := range msgs { - tasks = append(tasks, &InProgressTask{ - ID: m.ID, - Type: m.Type, - Payload: m.Payload, - }) - } - return tasks, nil -} - -// ListScheduledTasks returns a list of tasks that are scheduled to -// be processed in the future. -func (i *Inspector) ListScheduledTasks() ([]*ScheduledTask, error) { - // TODO(hibiken): Support pagination. - return i.rdb.listScheduled() -} - -// ListRetryTasks returns a list of tasks what will be retried in the -// future. -func (i *Inspector) ListRetryTasks() ([]*RetryTask, error) { - // TODO(hibiken): Support pagination. - return i.rdb.listRetry() -} - -// ListDeadTasks returns a list of tasks that have reached its -// maximum retry limit. -func (i *Inspector) ListDeadTasks() ([]*DeadTask, error) { - // TODO(hibiken): Support pagination. - return i.rdb.listDead() -} diff --git a/inspector_test.go b/inspector_test.go deleted file mode 100644 index 8d79dd6..0000000 --- a/inspector_test.go +++ /dev/null @@ -1,503 +0,0 @@ -package asynq - -import ( - "sort" - "testing" - "time" - - "github.com/go-redis/redis/v7" - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/google/uuid" -) - -// ---- TODO(hibiken): Remove this once the new version is released (https://github.com/google/go-cmp/issues/166) ---- -// EquateApproxTime returns a Comparer options that -// determine two time.Time values to be equal if they -// are within the given time interval of one another. -// Note that if both times have a monotonic clock reading, -// the monotonic time difference will be used. -// -// The zero time is treated specially: it is only considered -// equal to another zero time value. -// -// It will panic if margin is negative. -func EquateApproxTime(margin time.Duration) cmp.Option { - if margin < 0 { - panic("negative duration in EquateApproxTime") - } - return cmp.FilterValues(func(x, y time.Time) bool { - return !x.IsZero() && !y.IsZero() - }, cmp.Comparer(timeApproximator{margin}.compare)) -} - -type timeApproximator struct { - margin time.Duration -} - -func (a timeApproximator) compare(x, y time.Time) bool { - // Avoid subtracting times to avoid overflow when the - // difference is larger than the largest representible duration. - if x.After(y) { - // Ensure x is always before y - x, y = y, x - } - // We're within the margin if x+margin >= y. - // Note: time.Time doesn't have AfterOrEqual method hence the negation. - return !x.Add(a.margin).Before(y) -} - -//----------------------------- - -func TestCurrentStats(t *testing.T) { - r := setup(t) - inspector := &Inspector{r} - t1 := randomTask("send_email", "default", nil) - t2 := randomTask("send_email", "default", nil) - t3 := randomTask("gen_export", "default", nil) - t4 := randomTask("gen_thumbnail", "default", nil) - t5 := randomTask("send_email", "default", nil) - - tests := []struct { - queue []*taskMessage - inProgress []*taskMessage - scheduled []*taskMessage - retry []*taskMessage - dead []*taskMessage - want *Stats - }{ - { - queue: []*taskMessage{t1}, - inProgress: []*taskMessage{t2, t3}, - scheduled: []*taskMessage{t4}, - retry: []*taskMessage{}, - dead: []*taskMessage{t5}, - want: &Stats{ - Queued: 1, - InProgress: 2, - Scheduled: 1, - Retry: 0, - Dead: 1, - }, - }, - { - queue: []*taskMessage{}, - inProgress: []*taskMessage{}, - scheduled: []*taskMessage{t1, t2, t4}, - retry: []*taskMessage{t3}, - dead: []*taskMessage{t5}, - want: &Stats{ - Queued: 0, - InProgress: 0, - Scheduled: 3, - Retry: 1, - Dead: 1, - }, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - for _, msg := range tc.queue { - err := r.client.LPush(defaultQueue, mustMarshal(t, msg)).Err() - if err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.inProgress { - err := r.client.LPush(inProgress, mustMarshal(t, msg)).Err() - if err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.scheduled { - err := r.client.ZAdd(scheduled, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Add(time.Hour).Unix()), - }).Err() - if err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.retry { - err := r.client.ZAdd(retry, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Add(time.Hour).Unix()), - }).Err() - if err != nil { - t.Fatal(err) - } - } - for _, msg := range tc.dead { - err := r.client.ZAdd(dead, &redis.Z{ - Member: mustMarshal(t, msg), - Score: float64(time.Now().Unix()), - }).Err() - if err != nil { - t.Fatal(err) - } - } - - got, err := inspector.CurrentStats() - if err != nil { - t.Error(err) - continue - } - ignoreOpt := cmpopts.IgnoreFields(*tc.want, "Timestamp") - if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("(*Inspector).CurrentStats() = %+v, want %+v; (-want, +got)\n%s", - got, tc.want, diff) - continue - } - } -} - -func TestListEnqueuedTasks(t *testing.T) { - r := setup(t) - inspector := &Inspector{r} - m1 := randomTask("send_email", "default", nil) - m2 := randomTask("send_email", "default", nil) - m3 := randomTask("gen_export", "default", nil) - t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} - t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} - t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload} - - tests := []struct { - queued []*taskMessage - want []*EnqueuedTask - }{ - { - queued: []*taskMessage{m1, m2, m3}, - want: []*EnqueuedTask{t1, t2, t3}, - }, - { - queued: []*taskMessage{}, - want: []*EnqueuedTask{}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - for _, msg := range tc.queued { - err := r.client.LPush(defaultQueue, mustMarshal(t, msg)).Err() - if err != nil { - t.Fatal(err) - } - } - - got, err := inspector.ListEnqueuedTasks() - if err != nil { - t.Error(err) - continue - } - - sortOpt := cmp.Transformer("SortEnqueuedTasks", func(in []*EnqueuedTask) []*EnqueuedTask { - out := append([]*EnqueuedTask(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() - }) - return out - }) - if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { - t.Errorf("(*Inspector).ListEnqueuedTasks = %v, want %v; (-want, +got)\n%s", - got, tc.want, diff) - continue - } - } -} - -func TestListInProgressTasks(t *testing.T) { - r := setup(t) - inspector := &Inspector{r} - m1 := randomTask("send_email", "default", nil) - m2 := randomTask("send_email", "default", nil) - m3 := randomTask("gen_export", "default", nil) - t1 := &InProgressTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} - t2 := &InProgressTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} - t3 := &InProgressTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload} - - tests := []struct { - inProgress []*taskMessage - want []*InProgressTask - }{ - { - inProgress: []*taskMessage{m1, m2, m3}, - want: []*InProgressTask{t1, t2, t3}, - }, - { - inProgress: []*taskMessage{}, - want: []*InProgressTask{}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - for _, msg := range tc.inProgress { - err := r.client.LPush(inProgress, mustMarshal(t, msg)).Err() - if err != nil { - t.Fatal(err) - } - } - - got, err := inspector.ListInProgressTasks() - if err != nil { - t.Error(err) - continue - } - - sortOpt := cmp.Transformer("SortInProgressTasks", func(in []*InProgressTask) []*InProgressTask { - out := append([]*InProgressTask(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() - }) - return out - }) - if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { - t.Errorf("(*Inspector).ListInProgressTasks = %v, want %v; (-want, +got)\n%s", - got, tc.want, diff) - continue - } - } -} - -func TestListScheduledTasks(t *testing.T) { - r := setup(t) - inspector := &Inspector{r} - m1 := randomTask("send_email", "default", nil) - m2 := randomTask("send_email", "default", nil) - m3 := randomTask("gen_export", "default", nil) - t1 := time.Now().Add(5 * time.Minute) - t2 := time.Now().Add(time.Hour) - t3 := time.Now().Add(24 * time.Hour) - s1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: t1} - s2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: t2} - s3 := &ScheduledTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload, ProcessAt: t3} - - type scheduledMsg struct { - msg *taskMessage - processAt time.Time - } - tests := []struct { - scheduled []scheduledMsg - want []*ScheduledTask - }{ - { - scheduled: []scheduledMsg{ - {m1, t1}, - {m2, t2}, - {m3, t3}, - }, - want: []*ScheduledTask{s1, s2, s3}, - }, - { - scheduled: []scheduledMsg{}, - want: []*ScheduledTask{}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - for _, s := range tc.scheduled { - err := r.client.ZAdd(scheduled, &redis.Z{Member: mustMarshal(t, s.msg), Score: float64(s.processAt.Unix())}).Err() - if err != nil { - t.Fatal(err) - } - } - - got, err := inspector.ListScheduledTasks() - if err != nil { - t.Error(err) - continue - } - - sortOpt := cmp.Transformer("SortScheduledTasks", func(in []*ScheduledTask) []*ScheduledTask { - out := append([]*ScheduledTask(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() - }) - return out - }) - timeCmpOpt := EquateApproxTime(time.Second) - if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" { - t.Errorf("(*Inspector).ListScheduledTasks = %v, want %v; (-want, +got)\n%s", - got, tc.want, diff) - continue - } - } -} - -func TestListRetryTasks(t *testing.T) { - r := setup(t) - inspector := &Inspector{r} - m1 := &taskMessage{ - ID: uuid.New(), - Type: "send_email", - Payload: map[string]interface{}{"to": "customer@example.com"}, - ErrorMsg: "couldn't send email", - Retry: 25, - Retried: 10, - } - m2 := &taskMessage{ - ID: uuid.New(), - Type: "gen_thumbnail", - Payload: map[string]interface{}{"src": "some/path/to/img/file"}, - ErrorMsg: "couldn't find a file", - Retry: 20, - Retried: 3, - } - t1 := time.Now().Add(time.Hour) - t2 := time.Now().Add(24 * time.Hour) - r1 := &RetryTask{ - ID: m1.ID, - Type: m1.Type, - Payload: m1.Payload, - ProcessAt: t1, - ErrorMsg: m1.ErrorMsg, - Retry: m1.Retry, - Retried: m1.Retried, - } - r2 := &RetryTask{ - ID: m2.ID, - Type: m2.Type, - Payload: m2.Payload, - ProcessAt: t2, - ErrorMsg: m2.ErrorMsg, - Retry: m2.Retry, - Retried: m2.Retried, - } - - type retryEntry struct { - msg *taskMessage - processAt time.Time - } - tests := []struct { - retry []retryEntry - want []*RetryTask - }{ - { - retry: []retryEntry{ - {m1, t1}, - {m2, t2}, - }, - want: []*RetryTask{r1, r2}, - }, - { - retry: []retryEntry{}, - want: []*RetryTask{}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - for _, e := range tc.retry { - err := r.client.ZAdd(retry, &redis.Z{ - Member: mustMarshal(t, e.msg), - Score: float64(e.processAt.Unix())}).Err() - if err != nil { - t.Fatal(err) - } - } - - got, err := inspector.ListRetryTasks() - if err != nil { - t.Error(err) - continue - } - - sortOpt := cmp.Transformer("SortRetryTasks", func(in []*RetryTask) []*RetryTask { - out := append([]*RetryTask(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() - }) - return out - }) - timeCmpOpt := EquateApproxTime(time.Second) - if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" { - t.Errorf("(*Inspector).ListRetryTasks = %v, want %v; (-want, +got)\n%s", - got, tc.want, diff) - continue - } - } -} - -func TestListDeadTasks(t *testing.T) { - r := setup(t) - inspector := &Inspector{r} - m1 := &taskMessage{ID: uuid.New(), Type: "send_email", Payload: map[string]interface{}{"to": "customer@example.com"}, ErrorMsg: "couldn't send email"} - m2 := &taskMessage{ID: uuid.New(), Type: "gen_thumbnail", Payload: map[string]interface{}{"src": "path/to/img/file"}, ErrorMsg: "couldn't find file"} - t1 := time.Now().Add(-5 * time.Second) - t2 := time.Now().Add(-24 * time.Hour) - d1 := &DeadTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ErrorMsg: m1.ErrorMsg, LastFailedAt: t1} - d2 := &DeadTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ErrorMsg: m2.ErrorMsg, LastFailedAt: t2} - - type deadEntry struct { - msg *taskMessage - lastFailedAt time.Time - } - tests := []struct { - dead []deadEntry - want []*DeadTask - }{ - { - dead: []deadEntry{ - {m1, t1}, - {m2, t2}, - }, - want: []*DeadTask{d1, d2}, - }, - { - dead: []deadEntry{}, - want: []*DeadTask{}, - }, - } - - for _, tc := range tests { - // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { - t.Fatal(err) - } - for _, d := range tc.dead { - err := r.client.ZAdd(dead, &redis.Z{ - Member: mustMarshal(t, d.msg), - Score: float64(d.lastFailedAt.Unix())}).Err() - if err != nil { - t.Fatal(err) - } - } - - got, err := inspector.ListDeadTasks() - if err != nil { - t.Errorf("(*Inspector).ListDeadTask = %v, %v; want %v, nil", got, err, tc.want) - continue - } - - sortOpt := cmp.Transformer("SortDeadTasks", func(in []*DeadTask) []*DeadTask { - out := append([]*DeadTask(nil), in...) // Copy input to avoid mutating it - sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() - }) - return out - }) - timeCmpOpt := EquateApproxTime(time.Second) - if diff := cmp.Diff(tc.want, got, sortOpt, timeCmpOpt); diff != "" { - t.Errorf("(*Inspector).ListDeadTasks = %v, want %v; (-want, +got)\n%s", - got, tc.want, diff) - continue - } - } -} diff --git a/rdb.go b/internal/rdb/rdb.go similarity index 52% rename from rdb.go rename to internal/rdb/rdb.go index 6b8ffca..3a310f1 100644 --- a/rdb.go +++ b/internal/rdb/rdb.go @@ -1,4 +1,4 @@ -package asynq +package rdb import ( "encoding/json" @@ -8,38 +8,124 @@ import ( "time" "github.com/go-redis/redis/v7" + "github.com/google/uuid" ) // Redis keys const ( - queuePrefix = "asynq:queues:" // LIST - asynq:queues: - defaultQueue = queuePrefix + "default" // LIST allQueues = "asynq:queues" // SET - scheduled = "asynq:scheduled" // ZSET - retry = "asynq:retry" // ZSET - dead = "asynq:dead" // ZSET - inProgress = "asynq:in_progress" // SET + queuePrefix = "asynq:queues:" // LIST - asynq:queues: + DefaultQueue = queuePrefix + "default" // LIST + Scheduled = "asynq:scheduled" // ZSET + Retry = "asynq:retry" // ZSET + Dead = "asynq:dead" // ZSET + InProgress = "asynq:in_progress" // SET ) -var errDequeueTimeout = errors.New("blocking dequeue operation timed out") +var ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") -// rdb encapsulates the interactions with redis server. -type rdb struct { +// RDB encapsulates the interactions with redis server. +type RDB struct { client *redis.Client } -func newRDB(config *RedisConfig) *rdb { - client := redis.NewClient(&redis.Options{ - Addr: config.Addr, - Password: config.Password, - DB: config.DB, - }) - return &rdb{client} +// NewRDB returns a new instance of RDB. +func NewRDB(client *redis.Client) *RDB { + return &RDB{client} } -// enqueue inserts the given task to the end of the queue. +// TaskMessage is an internal representation of a task with additional metadata fields. +// This data gets written in redis. +type TaskMessage struct { + //-------- Task fields -------- + + Type string + Payload map[string]interface{} + + //-------- metadata fields -------- + + // unique identifier for each task + ID uuid.UUID + + // queue name this message should be enqueued to + Queue string + + // max number of retry for this task. + Retry int + + // number of times we've retried so far + Retried int + + // error message from the last failure + ErrorMsg string +} + +// Stats represents a state of queues at a certain time. +type Stats struct { + Queued int + InProgress int + Scheduled int + Retry int + Dead int + Timestamp time.Time +} + +// EnqueuedTask is a task in a queue and is ready to be processed. +// This is read only and used for inspection purpose. +type EnqueuedTask struct { + ID uuid.UUID + Type string + Payload map[string]interface{} +} + +// InProgressTask is a task that's currently being processed. +// This is read only and used for inspection purpose. +type InProgressTask struct { + ID uuid.UUID + Type string + Payload map[string]interface{} +} + +// ScheduledTask is a task that's scheduled to be processed in the future. +// This is read only and used for inspection purpose. +type ScheduledTask struct { + ID uuid.UUID + Type string + Payload map[string]interface{} + ProcessAt time.Time +} + +// RetryTask is a task that's in retry queue because worker failed to process the task. +// This is read only and used for inspection purpose. +type RetryTask struct { + ID uuid.UUID + Type string + Payload map[string]interface{} + // TODO(hibiken): add LastFailedAt time.Time + ProcessAt time.Time + ErrorMsg string + Retried int + Retry int +} + +// DeadTask is a task in that has exhausted all retries. +// This is read only and used for inspection purpose. +type DeadTask struct { + ID uuid.UUID + Type string + Payload map[string]interface{} + LastFailedAt time.Time + ErrorMsg string +} + +// Close closes the connection with redis server. +func (r *RDB) Close() error { + return r.client.Close() +} + +// Enqueue inserts the given task to the end of the queue. // It also adds the queue name to the "all-queues" list. -func (r *rdb) enqueue(msg *taskMessage) error { +func (r *RDB) Enqueue(msg *TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -55,18 +141,18 @@ func (r *rdb) enqueue(msg *taskMessage) error { return nil } -// dequeue blocks until there is a task available to be processed, +// Dequeue blocks until there is a task available to be processed, // once a task is available, it adds the task to "in progress" list // and returns the task. -func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) { - data, err := r.client.BRPopLPush(qname, inProgress, timeout).Result() +func (r *RDB) Dequeue(qname string, timeout time.Duration) (*TaskMessage, error) { + data, err := r.client.BRPopLPush(qname, InProgress, timeout).Result() if err == redis.Nil { - return nil, errDequeueTimeout + return nil, ErrDequeueTimeout } if err != nil { - return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, inProgress, timeout, err) + return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, InProgress, timeout, err) } - var msg taskMessage + var msg TaskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err) @@ -75,8 +161,8 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) return &msg, nil } -// remove deletes all elements equal to msg from a redis list with the given key. -func (r *rdb) remove(key string, msg *taskMessage) error { +// Remove deletes all elements equal to msg from a redis list with the given key. +func (r *RDB) Remove(key string, msg *TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -89,8 +175,8 @@ func (r *rdb) remove(key string, msg *taskMessage) error { return nil } -// schedule adds the task to the zset to be processd at the specified time. -func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error { +// Schedule adds the task to the zset to be processd at the specified time. +func (r *RDB) Schedule(zset string, processAt time.Time, msg *TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -106,25 +192,25 @@ func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error const maxDeadTask = 100 const deadExpirationInDays = 90 -// kill sends the taskMessage to "dead" set. +// Kill sends the taskMessage to "dead" set. // It also trims the sorted set by timestamp and set size. -func (r *rdb) kill(msg *taskMessage) error { +func (r *RDB) Kill(msg *TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } now := time.Now() pipe := r.client.Pipeline() - pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) + pipe.ZAdd(Dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit))) - pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100 + pipe.ZRemRangeByScore(Dead, "-inf", strconv.Itoa(int(limit))) + pipe.ZRemRangeByRank(Dead, 0, -maxDeadTask) // trim the set to 100 _, err = pipe.Exec() return err } -// moveAll moves all tasks from src list to dst list. -func (r *rdb) moveAll(src, dst string) error { +// MoveAll moves all tasks from src list to dst list. +func (r *RDB) MoveAll(src, dst string) error { script := redis.NewScript(` local len = redis.call("LLEN", KEYS[1]) for i = len, 1, -1 do @@ -136,10 +222,10 @@ func (r *rdb) moveAll(src, dst string) error { return err } -// forward moves all tasks with a score less than the current unix time +// Forward moves all tasks with a score less than the current unix time // from the given zset to the default queue. // TODO(hibiken): Find a better method name that reflects what this does. -func (r *rdb) forward(from string) error { +func (r *RDB) Forward(from string) error { script := redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) for _, msg in ipairs(msgs) do @@ -150,18 +236,19 @@ func (r *rdb) forward(from string) error { return msgs `) now := float64(time.Now().Unix()) - res, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result() + res, err := script.Run(r.client, []string{from, allQueues, DefaultQueue}, now).Result() fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from) return err } -func (r *rdb) currentStats() (*Stats, error) { +// CurrentStats returns a current state of the queues. +func (r *RDB) CurrentStats() (*Stats, error) { pipe := r.client.Pipeline() - qlen := pipe.LLen(defaultQueue) - plen := pipe.LLen(inProgress) - slen := pipe.ZCard(scheduled) - rlen := pipe.ZCard(retry) - dlen := pipe.ZCard(dead) + qlen := pipe.LLen(DefaultQueue) + plen := pipe.LLen(InProgress) + slen := pipe.ZCard(Scheduled) + rlen := pipe.ZCard(Retry) + dlen := pipe.ZCard(Dead) _, err := pipe.Exec() if err != nil { return nil, err @@ -176,16 +263,16 @@ func (r *rdb) currentStats() (*Stats, error) { }, nil } -func (r *rdb) listEnqueued() ([]*taskMessage, error) { - return r.rangeList(defaultQueue) +func (r *RDB) ListEnqueued() ([]*TaskMessage, error) { + return r.rangeList(DefaultQueue) } -func (r *rdb) listInProgress() ([]*taskMessage, error) { - return r.rangeList(inProgress) +func (r *RDB) ListInProgress() ([]*TaskMessage, error) { + return r.rangeList(InProgress) } -func (r *rdb) listScheduled() ([]*ScheduledTask, error) { - data, err := r.client.ZRangeWithScores(scheduled, 0, -1).Result() +func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { + data, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result() if err != nil { return nil, err } @@ -195,7 +282,7 @@ func (r *rdb) listScheduled() ([]*ScheduledTask, error) { if !ok { continue // bad data, ignore and continue } - var msg taskMessage + var msg TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { continue // bad data, ignore and continue @@ -211,8 +298,8 @@ func (r *rdb) listScheduled() ([]*ScheduledTask, error) { return tasks, nil } -func (r *rdb) listRetry() ([]*RetryTask, error) { - data, err := r.client.ZRangeWithScores(retry, 0, -1).Result() +func (r *RDB) ListRetry() ([]*RetryTask, error) { + data, err := r.client.ZRangeWithScores(Retry, 0, -1).Result() if err != nil { return nil, err } @@ -222,7 +309,7 @@ func (r *rdb) listRetry() ([]*RetryTask, error) { if !ok { continue // bad data, ignore and continue } - var msg taskMessage + var msg TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { continue // bad data, ignore and continue @@ -241,8 +328,8 @@ func (r *rdb) listRetry() ([]*RetryTask, error) { return tasks, nil } -func (r *rdb) listDead() ([]*DeadTask, error) { - data, err := r.client.ZRangeWithScores(dead, 0, -1).Result() +func (r *RDB) ListDead() ([]*DeadTask, error) { + data, err := r.client.ZRangeWithScores(Dead, 0, -1).Result() if err != nil { return nil, err } @@ -252,7 +339,7 @@ func (r *rdb) listDead() ([]*DeadTask, error) { if !ok { continue // bad data, ignore and continue } - var msg taskMessage + var msg TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { continue // bad data, ignore and continue @@ -269,7 +356,7 @@ func (r *rdb) listDead() ([]*DeadTask, error) { return tasks, nil } -func (r *rdb) rangeList(key string) ([]*taskMessage, error) { +func (r *RDB) rangeList(key string) ([]*TaskMessage, error) { data, err := r.client.LRange(key, 0, -1).Result() if err != nil { return nil, err @@ -277,7 +364,7 @@ func (r *rdb) rangeList(key string) ([]*taskMessage, error) { return r.toMessageSlice(data), nil } -func (r *rdb) rangeZSet(key string) ([]*taskMessage, error) { +func (r *RDB) rangeZSet(key string) ([]*TaskMessage, error) { data, err := r.client.ZRange(key, 0, -1).Result() if err != nil { return nil, err @@ -286,10 +373,10 @@ func (r *rdb) rangeZSet(key string) ([]*taskMessage, error) { } // toMessageSlice convers json strings to a slice of task messages. -func (r *rdb) toMessageSlice(data []string) []*taskMessage { - var msgs []*taskMessage +func (r *RDB) toMessageSlice(data []string) []*TaskMessage { + var msgs []*TaskMessage for _, s := range data { - var msg taskMessage + var msg TaskMessage err := json.Unmarshal([]byte(s), &msg) if err != nil { // bad data; ignore and continue diff --git a/rdb_test.go b/internal/rdb/rdb_test.go similarity index 54% rename from rdb_test.go rename to internal/rdb/rdb_test.go index a246d6a..11d6425 100644 --- a/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1,18 +1,94 @@ -package asynq +package rdb import ( + "encoding/json" "fmt" + "math/rand" + "sort" "testing" "time" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + "github.com/google/uuid" ) +func init() { + rand.Seed(time.Now().UnixNano()) +} + +func setup(t *testing.T) *RDB { + t.Helper() + r := NewRDB(redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 15, + })) + // Start each test with a clean slate. + if err := r.client.FlushDB().Err(); err != nil { + panic(err) + } + return r +} + +var sortMsgOpt = cmp.Transformer("SortMsg", func(in []*TaskMessage) []*TaskMessage { + out := append([]*TaskMessage(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].ID.String() < out[j].ID.String() + }) + return out +}) + +func randomTask(taskType, qname string, payload map[string]interface{}) *TaskMessage { + return &TaskMessage{ + ID: uuid.New(), + Type: taskType, + Queue: qname, + Retry: 25, + Payload: make(map[string]interface{}), + } +} + +func mustMarshal(t *testing.T, task *TaskMessage) string { + t.Helper() + data, err := json.Marshal(task) + if err != nil { + t.Fatal(err) + } + return string(data) +} + +func mustUnmarshal(t *testing.T, data string) *TaskMessage { + t.Helper() + var task TaskMessage + err := json.Unmarshal([]byte(data), &task) + if err != nil { + t.Fatal(err) + } + return &task +} + +func mustMarshalSlice(t *testing.T, tasks []*TaskMessage) []string { + t.Helper() + var data []string + for _, task := range tasks { + data = append(data, mustMarshal(t, task)) + } + return data +} + +func mustUnmarshalSlice(t *testing.T, data []string) []*TaskMessage { + t.Helper() + var tasks []*TaskMessage + for _, s := range data { + tasks = append(tasks, mustUnmarshal(t, s)) + } + return tasks +} + func TestEnqueue(t *testing.T) { r := setup(t) tests := []struct { - msg *taskMessage + msg *TaskMessage }{ {msg: randomTask("send_email", "default", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})}, @@ -26,18 +102,18 @@ func TestEnqueue(t *testing.T) { if err := r.client.FlushDB().Err(); err != nil { t.Fatal(err) } - err := r.enqueue(tc.msg) + err := r.Enqueue(tc.msg) if err != nil { t.Error(err) continue } - res := r.client.LRange(defaultQueue, 0, -1).Val() + res := r.client.LRange(DefaultQueue, 0, -1).Val() if len(res) != 1 { - t.Errorf("LIST %q has length %d, want 1", defaultQueue, len(res)) + t.Errorf("LIST %q has length %d, want 1", DefaultQueue, len(res)) continue } - if !r.client.SIsMember(allQueues, defaultQueue).Val() { - t.Errorf("SISMEMBER %q %q = false, want true", allQueues, defaultQueue) + if !r.client.SIsMember(allQueues, DefaultQueue).Val() { + t.Errorf("SISMEMBER %q %q = false, want true", allQueues, DefaultQueue) } if diff := cmp.Diff(*tc.msg, *mustUnmarshal(t, res[0])); diff != "" { t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) @@ -49,13 +125,13 @@ func TestDequeue(t *testing.T) { r := setup(t) t1 := randomTask("send_email", "default", map[string]interface{}{"subject": "hello!"}) tests := []struct { - queued []*taskMessage - want *taskMessage + queued []*TaskMessage + want *TaskMessage err error inProgress int64 // length of "in-progress" tasks after dequeue }{ - {queued: []*taskMessage{t1}, want: t1, err: nil, inProgress: 1}, - {queued: []*taskMessage{}, want: nil, err: errDequeueTimeout, inProgress: 0}, + {queued: []*TaskMessage{t1}, want: t1, err: nil, inProgress: 1}, + {queued: []*TaskMessage{}, want: nil, err: ErrDequeueTimeout, inProgress: 0}, } for _, tc := range tests { @@ -64,16 +140,16 @@ func TestDequeue(t *testing.T) { t.Fatal(err) } for _, m := range tc.queued { - r.enqueue(m) + r.Enqueue(m) } - got, err := r.dequeue(defaultQueue, time.Second) + got, err := r.Dequeue(DefaultQueue, time.Second) if !cmp.Equal(got, tc.want) || err != tc.err { t.Errorf("(*rdb).dequeue(%q, time.Second) = %v, %v; want %v, %v", - defaultQueue, got, err, tc.want, tc.err) + DefaultQueue, got, err, tc.want, tc.err) continue } - if l := r.client.LLen(inProgress).Val(); l != tc.inProgress { - t.Errorf("LIST %q has length %d, want %d", inProgress, l, tc.inProgress) + if l := r.client.LLen(InProgress).Val(); l != tc.inProgress { + t.Errorf("LIST %q has length %d, want %d", InProgress, l, tc.inProgress) } } } @@ -84,24 +160,24 @@ func TestRemove(t *testing.T) { t2 := randomTask("export_csv", "csv", nil) tests := []struct { - initial []*taskMessage // initial state of the list - target *taskMessage // task to remove - final []*taskMessage // final state of the list + initial []*TaskMessage // initial state of the list + target *TaskMessage // task to remove + final []*TaskMessage // final state of the list }{ { - initial: []*taskMessage{t1, t2}, + initial: []*TaskMessage{t1, t2}, target: t1, - final: []*taskMessage{t2}, + final: []*TaskMessage{t2}, }, { - initial: []*taskMessage{t2}, + initial: []*TaskMessage{t2}, target: t1, - final: []*taskMessage{t2}, + final: []*TaskMessage{t2}, }, { - initial: []*taskMessage{t1}, + initial: []*TaskMessage{t1}, target: t1, - final: []*taskMessage{}, + final: []*TaskMessage{}, }, } @@ -112,26 +188,26 @@ func TestRemove(t *testing.T) { } // set up initial state for _, task := range tc.initial { - err := r.client.LPush(defaultQueue, mustMarshal(t, task)).Err() + err := r.client.LPush(DefaultQueue, mustMarshal(t, task)).Err() if err != nil { t.Fatal(err) } } - err := r.remove(defaultQueue, tc.target) + err := r.Remove(DefaultQueue, tc.target) if err != nil { t.Error(err) continue } - var got []*taskMessage - data := r.client.LRange(defaultQueue, 0, -1).Val() + var got []*TaskMessage + data := r.client.LRange(DefaultQueue, 0, -1).Val() for _, s := range data { got = append(got, mustUnmarshal(t, s)) } if diff := cmp.Diff(tc.final, got, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", defaultQueue, diff) + t.Errorf("mismatch found in %q after calling (*rdb).remove: (-want, +got):\n%s", DefaultQueue, diff) continue } } @@ -143,14 +219,14 @@ func TestKill(t *testing.T) { // TODO(hibiken): add test cases for trimming tests := []struct { - initial []*taskMessage // inital state of "dead" set - target *taskMessage // task to kill - want []*taskMessage // final state of "dead" set + initial []*TaskMessage // inital state of "dead" set + target *TaskMessage // task to kill + want []*TaskMessage // final state of "dead" set }{ { - initial: []*taskMessage{}, + initial: []*TaskMessage{}, target: t1, - want: []*taskMessage{t1}, + want: []*TaskMessage{t1}, }, } @@ -161,22 +237,22 @@ func TestKill(t *testing.T) { } // set up initial state for _, task := range tc.initial { - err := r.client.ZAdd(dead, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err() + err := r.client.ZAdd(Dead, &redis.Z{Member: mustMarshal(t, task), Score: float64(time.Now().Unix())}).Err() if err != nil { t.Fatal(err) } } - err := r.kill(tc.target) + err := r.Kill(tc.target) if err != nil { t.Error(err) continue } - actual := r.client.ZRange(dead, 0, -1).Val() + actual := r.client.ZRange(Dead, 0, -1).Val() got := mustUnmarshalSlice(t, actual) if diff := cmp.Diff(tc.want, got, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", dead, diff) + t.Errorf("mismatch found in %q after calling (*rdb).kill: (-want, +got):\n%s", Dead, diff) continue } } @@ -189,28 +265,28 @@ func TestMoveAll(t *testing.T) { t3 := randomTask("sync_stuff", "sync", nil) tests := []struct { - beforeSrc []*taskMessage - beforeDst []*taskMessage - afterSrc []*taskMessage - afterDst []*taskMessage + beforeSrc []*TaskMessage + beforeDst []*TaskMessage + afterSrc []*TaskMessage + afterDst []*TaskMessage }{ { - beforeSrc: []*taskMessage{t1, t2, t3}, - beforeDst: []*taskMessage{}, - afterSrc: []*taskMessage{}, - afterDst: []*taskMessage{t1, t2, t3}, + beforeSrc: []*TaskMessage{t1, t2, t3}, + beforeDst: []*TaskMessage{}, + afterSrc: []*TaskMessage{}, + afterDst: []*TaskMessage{t1, t2, t3}, }, { - beforeSrc: []*taskMessage{}, - beforeDst: []*taskMessage{t1, t2, t3}, - afterSrc: []*taskMessage{}, - afterDst: []*taskMessage{t1, t2, t3}, + beforeSrc: []*TaskMessage{}, + beforeDst: []*TaskMessage{t1, t2, t3}, + afterSrc: []*TaskMessage{}, + afterDst: []*TaskMessage{t1, t2, t3}, }, { - beforeSrc: []*taskMessage{t2, t3}, - beforeDst: []*taskMessage{t1}, - afterSrc: []*taskMessage{}, - afterDst: []*taskMessage{t1, t2, t3}, + beforeSrc: []*TaskMessage{t2, t3}, + beforeDst: []*TaskMessage{t1}, + afterSrc: []*TaskMessage{}, + afterDst: []*TaskMessage{t1, t2, t3}, }, } @@ -222,63 +298,63 @@ func TestMoveAll(t *testing.T) { } // seed src list. for _, msg := range tc.beforeSrc { - r.client.LPush(inProgress, mustMarshal(t, msg)) + r.client.LPush(InProgress, mustMarshal(t, msg)) } // seed dst list. for _, msg := range tc.beforeDst { - r.client.LPush(defaultQueue, mustMarshal(t, msg)) + r.client.LPush(DefaultQueue, mustMarshal(t, msg)) } - if err := r.moveAll(inProgress, defaultQueue); err != nil { - t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", inProgress, defaultQueue, err) + if err := r.MoveAll(InProgress, DefaultQueue); err != nil { + t.Errorf("(*rdb).moveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err) continue } - src := r.client.LRange(inProgress, 0, -1).Val() + src := r.client.LRange(InProgress, 0, -1).Val() gotSrc := mustUnmarshalSlice(t, src) if diff := cmp.Diff(tc.afterSrc, gotSrc, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", inProgress, diff) + t.Errorf("mismatch found in %q (-want, +got)\n%s", InProgress, diff) } - dst := r.client.LRange(defaultQueue, 0, -1).Val() + dst := r.client.LRange(DefaultQueue, 0, -1).Val() gotDst := mustUnmarshalSlice(t, dst) if diff := cmp.Diff(tc.afterDst, gotDst, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", defaultQueue, diff) + t.Errorf("mismatch found in %q (-want, +got)\n%s", DefaultQueue, diff) } } } func TestForward(t *testing.T) { r := setup(t) - t1 := randomTask("send_email", defaultQueue, nil) - t2 := randomTask("generate_csv", defaultQueue, nil) + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("generate_csv", "default", nil) secondAgo := time.Now().Add(-time.Second) hourFromNow := time.Now().Add(time.Hour) tests := []struct { tasks []*redis.Z // scheduled tasks with timestamp as a score - wantQueued []*taskMessage // queue after calling forward - wantScheduled []*taskMessage // scheduled queue after calling forward + wantQueued []*TaskMessage // queue after calling forward + wantScheduled []*TaskMessage // scheduled queue after calling forward }{ { tasks: []*redis.Z{ &redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, - wantQueued: []*taskMessage{t1, t2}, - wantScheduled: []*taskMessage{}, + wantQueued: []*TaskMessage{t1, t2}, + wantScheduled: []*TaskMessage{}, }, { tasks: []*redis.Z{ &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, - wantQueued: []*taskMessage{t2}, - wantScheduled: []*taskMessage{t1}, + wantQueued: []*TaskMessage{t2}, + wantScheduled: []*TaskMessage{t1}, }, { tasks: []*redis.Z{ &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}}, - wantQueued: []*taskMessage{}, - wantScheduled: []*taskMessage{t1, t2}, + wantQueued: []*TaskMessage{}, + wantScheduled: []*TaskMessage{t1, t2}, }, } @@ -287,27 +363,25 @@ func TestForward(t *testing.T) { if err := r.client.FlushDB().Err(); err != nil { t.Fatal(err) } - if err := r.client.ZAdd(scheduled, tc.tasks...).Err(); err != nil { + if err := r.client.ZAdd(Scheduled, tc.tasks...).Err(); err != nil { t.Error(err) continue } - err := r.forward(scheduled) + err := r.Forward(Scheduled) if err != nil { - t.Errorf("(*rdb).forward(%q) = %v, want nil", scheduled, err) + t.Errorf("(*rdb).forward(%q) = %v, want nil", Scheduled, err) continue } - queued := r.client.LRange(defaultQueue, 0, -1).Val() + queued := r.client.LRange(DefaultQueue, 0, -1).Val() gotQueued := mustUnmarshalSlice(t, queued) if diff := cmp.Diff(tc.wantQueued, gotQueued, sortMsgOpt); diff != "" { - t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQueue, len(gotQueued), len(tc.wantQueued), diff) - continue + t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", DefaultQueue, len(gotQueued), len(tc.wantQueued), diff) } - scheduled := r.client.ZRangeByScore(scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val() + scheduled := r.client.ZRangeByScore(Scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val() gotScheduled := mustUnmarshalSlice(t, scheduled) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff) - continue } } } @@ -315,14 +389,14 @@ func TestForward(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) tests := []struct { - msg *taskMessage + msg *TaskMessage processAt time.Time zset string }{ { randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}), time.Now().Add(15 * time.Minute), - scheduled, + Scheduled, }, } @@ -332,7 +406,7 @@ func TestSchedule(t *testing.T) { t.Fatal(err) } - err := r.schedule(tc.zset, tc.processAt, tc.msg) + err := r.Schedule(tc.zset, tc.processAt, tc.msg) if err != nil { t.Error(err) continue diff --git a/poller.go b/poller.go index 808c582..a4ef6c9 100644 --- a/poller.go +++ b/poller.go @@ -3,10 +3,12 @@ package asynq import ( "log" "time" + + "github.com/hibiken/asynq/internal/rdb" ) type poller struct { - rdb *rdb + rdb *rdb.RDB // channel to communicate back to the long running "poller" goroutine. done chan struct{} @@ -18,9 +20,9 @@ type poller struct { zsets []string } -func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller { +func newPoller(r *rdb.RDB, avgInterval time.Duration, zsets []string) *poller { return &poller{ - rdb: rdb, + rdb: r, done: make(chan struct{}), avgInterval: avgInterval, zsets: zsets, @@ -51,7 +53,7 @@ func (p *poller) start() { func (p *poller) exec() { for _, zset := range p.zsets { - if err := p.rdb.forward(zset); err != nil { + if err := p.rdb.Forward(zset); err != nil { log.Printf("[ERROR] could not forward scheduled tasks from %q: %v\n", zset, err) } } diff --git a/poller_test.go b/poller_test.go index 4fe095f..81ab104 100644 --- a/poller_test.go +++ b/poller_test.go @@ -5,29 +5,31 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/rdb" ) func TestPoller(t *testing.T) { type scheduledTask struct { - msg *taskMessage + msg *rdb.TaskMessage processAt time.Time } r := setup(t) + rdbClient := rdb.NewRDB(r) const pollInterval = time.Second - p := newPoller(r, pollInterval, []string{scheduled, retry}) + p := newPoller(rdbClient, pollInterval, []string{rdb.Scheduled, rdb.Retry}) t1 := randomTask("gen_thumbnail", "default", nil) t2 := randomTask("send_email", "default", nil) t3 := randomTask("reindex", "default", nil) t4 := randomTask("sync", "default", nil) tests := []struct { - initScheduled []scheduledTask // scheduled queue initial state - initRetry []scheduledTask // retry queue initial state - initQueue []*taskMessage // default queue initial state - wait time.Duration // wait duration before checking for final state - wantScheduled []*taskMessage // schedule queue final state - wantRetry []*taskMessage // retry queue final state - wantQueue []*taskMessage // default queue final state + initScheduled []scheduledTask // scheduled queue initial state + initRetry []scheduledTask // retry queue initial state + initQueue []*rdb.TaskMessage // default queue initial state + wait time.Duration // wait duration before checking for final state + wantScheduled []*rdb.TaskMessage // schedule queue final state + wantRetry []*rdb.TaskMessage // retry queue final state + wantQueue []*rdb.TaskMessage // default queue final state }{ { initScheduled: []scheduledTask{ @@ -37,11 +39,11 @@ func TestPoller(t *testing.T) { initRetry: []scheduledTask{ {t3, time.Now().Add(-500 * time.Millisecond)}, }, - initQueue: []*taskMessage{t4}, + initQueue: []*rdb.TaskMessage{t4}, wait: pollInterval * 2, - wantScheduled: []*taskMessage{t1}, - wantRetry: []*taskMessage{}, - wantQueue: []*taskMessage{t2, t3, t4}, + wantScheduled: []*rdb.TaskMessage{t1}, + wantRetry: []*rdb.TaskMessage{}, + wantQueue: []*rdb.TaskMessage{t2, t3, t4}, }, { initScheduled: []scheduledTask{ @@ -50,36 +52,36 @@ func TestPoller(t *testing.T) { {t3, time.Now().Add(-500 * time.Millisecond)}, }, initRetry: []scheduledTask{}, - initQueue: []*taskMessage{t4}, + initQueue: []*rdb.TaskMessage{t4}, wait: pollInterval * 2, - wantScheduled: []*taskMessage{}, - wantRetry: []*taskMessage{}, - wantQueue: []*taskMessage{t1, t2, t3, t4}, + wantScheduled: []*rdb.TaskMessage{}, + wantRetry: []*rdb.TaskMessage{}, + wantQueue: []*rdb.TaskMessage{t1, t2, t3, t4}, }, } for _, tc := range tests { // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { + if err := r.FlushDB().Err(); err != nil { t.Fatal(err) } // initialize scheduled queue for _, st := range tc.initScheduled { - err := r.schedule(scheduled, st.processAt, st.msg) + err := rdbClient.Schedule(rdb.Scheduled, st.processAt, st.msg) if err != nil { t.Fatal(err) } } // initialize retry queue for _, st := range tc.initRetry { - err := r.schedule(retry, st.processAt, st.msg) + err := rdbClient.Schedule(rdb.Retry, st.processAt, st.msg) if err != nil { t.Fatal(err) } } // initialize default queue for _, msg := range tc.initQueue { - err := r.enqueue(msg) + err := rdbClient.Enqueue(msg) if err != nil { t.Fatal(err) } @@ -89,22 +91,22 @@ func TestPoller(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotScheduledRaw := r.client.ZRange(scheduled, 0, -1).Val() + gotScheduledRaw := r.ZRange(rdb.Scheduled, 0, -1).Val() gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", scheduled, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.Scheduled, diff) } - gotRetryRaw := r.client.ZRange(retry, 0, -1).Val() + gotRetryRaw := r.ZRange(rdb.Retry, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", retry, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.Retry, diff) } - gotQueueRaw := r.client.LRange(defaultQueue, 0, -1).Val() + gotQueueRaw := r.LRange(rdb.DefaultQueue, 0, -1).Val() gotQueue := mustUnmarshalSlice(t, gotQueueRaw) if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", defaultQueue, diff) + t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", rdb.DefaultQueue, diff) } } } diff --git a/processor.go b/processor.go index ab12bd6..b539b80 100644 --- a/processor.go +++ b/processor.go @@ -4,10 +4,12 @@ import ( "fmt" "log" "time" + + "github.com/hibiken/asynq/internal/rdb" ) type processor struct { - rdb *rdb + rdb *rdb.RDB handler Handler @@ -24,9 +26,9 @@ type processor struct { done chan struct{} } -func newProcessor(rdb *rdb, numWorkers int, handler Handler) *processor { +func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { return &processor{ - rdb: rdb, + rdb: r, handler: handler, dequeueTimeout: 5 * time.Second, sema: make(chan struct{}, numWorkers), @@ -68,8 +70,8 @@ func (p *processor) start() { // exec pulls a task out of the queue and starts a worker goroutine to // process the task. func (p *processor) exec() { - msg, err := p.rdb.dequeue(defaultQueue, p.dequeueTimeout) - if err == errDequeueTimeout { + msg, err := p.rdb.Dequeue(rdb.DefaultQueue, p.dequeueTimeout) + if err == rdb.ErrDequeueTimeout { // timed out, this is a normal behavior. return } @@ -83,9 +85,9 @@ func (p *processor) exec() { go func(task *Task) { // NOTE: This deferred anonymous function needs to take taskMessage as a value because // the message can be mutated by the time this function is called. - defer func(msg taskMessage) { - if err := p.rdb.remove(inProgress, &msg); err != nil { - log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err) + defer func(msg rdb.TaskMessage) { + if err := p.rdb.Remove(rdb.InProgress, &msg); err != nil { + log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, rdb.InProgress, err) } <-p.sema // release token }(*msg) @@ -99,9 +101,9 @@ func (p *processor) exec() { // restore moves all tasks from "in-progress" back to queue // to restore all unfinished tasks. func (p *processor) restore() { - err := p.rdb.moveAll(inProgress, defaultQueue) + err := p.rdb.MoveAll(rdb.InProgress, rdb.DefaultQueue) if err != nil { - log.Printf("[ERROR] could not move tasks from %q to %q\n", inProgress, defaultQueue) + log.Printf("[ERROR] could not move tasks from %q to %q\n", rdb.InProgress, rdb.DefaultQueue) } } diff --git a/processor_test.go b/processor_test.go index cfe9d5d..68e6b93 100644 --- a/processor_test.go +++ b/processor_test.go @@ -7,10 +7,12 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/hibiken/asynq/internal/rdb" ) func TestProcessorSuccess(t *testing.T) { r := setup(t) + rdbClient := rdb.NewRDB(r) m1 := randomTask("send_email", "default", nil) m2 := randomTask("gen_thumbnail", "default", nil) @@ -23,20 +25,20 @@ func TestProcessorSuccess(t *testing.T) { t4 := &Task{Type: m4.Type, Payload: m4.Payload} tests := []struct { - initQueue []*taskMessage // initial default queue state - incoming []*taskMessage // tasks to be enqueued during run - wait time.Duration // wait duration between starting and stopping processor for this test case - wantProcessed []*Task // tasks to be processed at the end + initQueue []*rdb.TaskMessage // initial default queue state + incoming []*rdb.TaskMessage // tasks to be enqueued during run + wait time.Duration // wait duration between starting and stopping processor for this test case + wantProcessed []*Task // tasks to be processed at the end }{ { - initQueue: []*taskMessage{m1}, - incoming: []*taskMessage{m2, m3, m4}, + initQueue: []*rdb.TaskMessage{m1}, + incoming: []*rdb.TaskMessage{m2, m3, m4}, wait: time.Second, wantProcessed: []*Task{t1, t2, t3, t4}, }, { - initQueue: []*taskMessage{}, - incoming: []*taskMessage{m1}, + initQueue: []*rdb.TaskMessage{}, + incoming: []*rdb.TaskMessage{m1}, wait: time.Second, wantProcessed: []*Task{t1}, }, @@ -44,7 +46,7 @@ func TestProcessorSuccess(t *testing.T) { for _, tc := range tests { // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { + if err := r.FlushDB().Err(); err != nil { t.Fatal(err) } // instantiate a new processor @@ -57,11 +59,11 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - p := newProcessor(r, 10, h) + p := newProcessor(rdbClient, 10, h) p.dequeueTimeout = time.Second // short time out for test purpose // initialize default queue. for _, msg := range tc.initQueue { - err := r.enqueue(msg) + err := rdbClient.Enqueue(msg) if err != nil { t.Fatal(err) } @@ -70,7 +72,7 @@ func TestProcessorSuccess(t *testing.T) { p.start() for _, msg := range tc.incoming { - err := r.enqueue(msg) + err := rdbClient.Enqueue(msg) if err != nil { p.terminate() t.Fatal(err) @@ -83,14 +85,15 @@ func TestProcessorSuccess(t *testing.T) { t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) } - if l := r.client.LLen(inProgress).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", inProgress, l) + if l := r.LLen(rdb.InProgress).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", rdb.InProgress, l) } } } func TestProcessorRetry(t *testing.T) { r := setup(t) + rdbClient := rdb.NewRDB(r) m1 := randomTask("send_email", "default", nil) m1.Retried = m1.Retry // m1 has reached its max retry count @@ -113,24 +116,24 @@ func TestProcessorRetry(t *testing.T) { r4.Retried = m4.Retried + 1 tests := []struct { - initQueue []*taskMessage // initial default queue state - incoming []*taskMessage // tasks to be enqueued during run - wait time.Duration // wait duration between starting and stopping processor for this test case - wantRetry []*taskMessage // tasks in retry queue at the end - wantDead []*taskMessage // tasks in dead queue at the end + initQueue []*rdb.TaskMessage // initial default queue state + incoming []*rdb.TaskMessage // tasks to be enqueued during run + wait time.Duration // wait duration between starting and stopping processor for this test case + wantRetry []*rdb.TaskMessage // tasks in retry queue at the end + wantDead []*rdb.TaskMessage // tasks in dead queue at the end }{ { - initQueue: []*taskMessage{m1, m2}, - incoming: []*taskMessage{m3, m4}, + initQueue: []*rdb.TaskMessage{m1, m2}, + incoming: []*rdb.TaskMessage{m3, m4}, wait: time.Second, - wantRetry: []*taskMessage{&r2, &r3, &r4}, - wantDead: []*taskMessage{&r1}, + wantRetry: []*rdb.TaskMessage{&r2, &r3, &r4}, + wantDead: []*rdb.TaskMessage{&r1}, }, } for _, tc := range tests { // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { + if err := r.FlushDB().Err(); err != nil { t.Fatal(err) } // instantiate a new processor @@ -138,11 +141,11 @@ func TestProcessorRetry(t *testing.T) { h = func(task *Task) error { return fmt.Errorf(errMsg) } - p := newProcessor(r, 10, h) + p := newProcessor(rdbClient, 10, h) p.dequeueTimeout = time.Second // short time out for test purpose // initialize default queue. for _, msg := range tc.initQueue { - err := r.enqueue(msg) + err := rdbClient.Enqueue(msg) if err != nil { t.Fatal(err) } @@ -150,7 +153,7 @@ func TestProcessorRetry(t *testing.T) { p.start() for _, msg := range tc.incoming { - err := r.enqueue(msg) + err := rdbClient.Enqueue(msg) if err != nil { p.terminate() t.Fatal(err) @@ -159,20 +162,20 @@ func TestProcessorRetry(t *testing.T) { time.Sleep(tc.wait) p.terminate() - gotRetryRaw := r.client.ZRange(retry, 0, -1).Val() + gotRetryRaw := r.ZRange(rdb.Retry, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, gotRetryRaw) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", retry, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", rdb.Retry, diff) } - gotDeadRaw := r.client.ZRange(dead, 0, -1).Val() + gotDeadRaw := r.ZRange(rdb.Dead, 0, -1).Val() gotDead := mustUnmarshalSlice(t, gotDeadRaw) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", dead, diff) + t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", rdb.Dead, diff) } - if l := r.client.LLen(inProgress).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", inProgress, l) + if l := r.LLen(rdb.InProgress).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", rdb.InProgress, l) } } } diff --git a/retry.go b/retry.go index b8fda87..f7a815a 100644 --- a/retry.go +++ b/retry.go @@ -5,13 +5,15 @@ import ( "math" "math/rand" "time" + + "github.com/hibiken/asynq/internal/rdb" ) -func retryTask(rdb *rdb, msg *taskMessage, err error) { +func retryTask(r *rdb.RDB, msg *rdb.TaskMessage, err error) { msg.ErrorMsg = err.Error() if msg.Retried >= msg.Retry { log.Printf("[WARN] Retry exhausted for task(Type: %q, ID: %v)\n", msg.Type, msg.ID) - if err := rdb.kill(msg); err != nil { + if err := r.Kill(msg); err != nil { log.Printf("[ERROR] Could not add task %+v to 'dead'\n", err) } return @@ -19,7 +21,7 @@ func retryTask(rdb *rdb, msg *taskMessage, err error) { retryAt := time.Now().Add(delaySeconds((msg.Retried))) log.Printf("[INFO] Retrying task(Type: %q, ID: %v) in %v\n", msg.Type, msg.ID, retryAt.Sub(time.Now())) msg.Retried++ - if err := rdb.schedule(retry, retryAt, msg); err != nil { + if err := r.Schedule(rdb.Retry, retryAt, msg); err != nil { log.Printf("[ERROR] Could not add msg %+v to 'retry': %v\n", msg, err) return } diff --git a/retry_test.go b/retry_test.go index 600136f..1d16f6f 100644 --- a/retry_test.go +++ b/retry_test.go @@ -6,64 +6,66 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/uuid" + "github.com/hibiken/asynq/internal/rdb" ) func TestRetry(t *testing.T) { r := setup(t) + rdbClient := rdb.NewRDB(r) errMsg := "email server not responding" // t1 is a task with max-retry count reached. - t1 := &taskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: uuid.New()} + t1 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 10, Queue: "default", ID: uuid.New()} // t2 is t1 with updated error message. t2 := *t1 t2.ErrorMsg = errMsg // t3 is a task which hasn't reached max-retry count. - t3 := &taskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: uuid.New()} + t3 := &rdb.TaskMessage{Type: "send_email", Retry: 10, Retried: 5, Queue: "default", ID: uuid.New()} // t4 is t3 after retry. t4 := *t3 t4.Retried++ t4.ErrorMsg = errMsg tests := []struct { - desc string // test case description - msg *taskMessage // task to retry - err error // error that caused retry - wantDead []*taskMessage // state "dead" queue should be in - wantRetry []*taskMessage // state "retry" queue should be in + desc string // test case description + msg *rdb.TaskMessage // task to retry + err error // error that caused retry + wantDead []*rdb.TaskMessage // state "dead" queue should be in + wantRetry []*rdb.TaskMessage // state "retry" queue should be in }{ { desc: "With retry exhausted task", msg: t1, err: fmt.Errorf(errMsg), - wantDead: []*taskMessage{&t2}, - wantRetry: []*taskMessage{}, + wantDead: []*rdb.TaskMessage{&t2}, + wantRetry: []*rdb.TaskMessage{}, }, { desc: "With retry-able task", msg: t3, err: fmt.Errorf(errMsg), - wantDead: []*taskMessage{}, - wantRetry: []*taskMessage{&t4}, + wantDead: []*rdb.TaskMessage{}, + wantRetry: []*rdb.TaskMessage{&t4}, }, } for _, tc := range tests { // clean up db before each test case. - if err := r.client.FlushDB().Err(); err != nil { + if err := r.FlushDB().Err(); err != nil { t.Fatal(err) } - retryTask(r, tc.msg, tc.err) + retryTask(rdbClient, tc.msg, tc.err) - deadQueue := r.client.ZRange(dead, 0, -1).Val() + deadQueue := r.ZRange(rdb.Dead, 0, -1).Val() gotDead := mustUnmarshalSlice(t, deadQueue) if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, dead, diff) + t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, rdb.Dead, diff) } - retryQueue := r.client.ZRange(retry, 0, -1).Val() + retryQueue := r.ZRange(rdb.Retry, 0, -1).Val() gotRetry := mustUnmarshalSlice(t, retryQueue) if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { - t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, dead, diff) + t.Errorf("%s;\nmismatch found in %q after retryTask(); (-want, +got)\n%s", tc.desc, rdb.Dead, diff) } } }