2020-01-03 10:13:16 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
2019-12-04 13:01:26 +08:00
package rdb
2019-11-20 23:01:24 +08:00
import (
2021-09-02 20:56:02 +08:00
"context"
2020-02-21 23:18:22 +08:00
"encoding/json"
2020-08-29 21:54:08 +08:00
"flag"
2021-04-18 22:19:19 +08:00
"strconv"
2020-08-29 21:54:08 +08:00
"strings"
2020-02-23 06:30:24 +08:00
"sync"
2019-11-20 23:01:24 +08:00
"testing"
"time"
2021-09-02 20:56:02 +08:00
"github.com/go-redis/redis/v8"
2019-11-20 23:01:24 +08:00
"github.com/google/go-cmp/cmp"
2020-02-02 14:22:48 +08:00
"github.com/google/go-cmp/cmp/cmpopts"
2020-07-02 21:21:20 +08:00
"github.com/google/uuid"
2019-12-29 12:12:14 +08:00
h "github.com/hibiken/asynq/internal/asynqtest"
2019-12-22 23:15:45 +08:00
"github.com/hibiken/asynq/internal/base"
2021-05-09 22:39:01 +08:00
"github.com/hibiken/asynq/internal/errors"
2021-12-11 01:07:41 +08:00
"github.com/hibiken/asynq/internal/timeutil"
2019-11-20 23:01:24 +08:00
)
2020-08-29 21:54:08 +08:00
// variables used for package testing.
var (
redisAddr string
redisDB int
useRedisCluster bool
redisClusterAddrs string // comma-separated list of host:port
)
func init ( ) {
flag . StringVar ( & redisAddr , "redis_addr" , "localhost:6379" , "redis address to use in testing" )
2020-09-02 20:29:34 +08:00
flag . IntVar ( & redisDB , "redis_db" , 15 , "redis db number to use in testing" )
2020-08-29 21:54:08 +08:00
flag . BoolVar ( & useRedisCluster , "redis_cluster" , false , "use redis cluster as a broker in testing" )
flag . StringVar ( & redisClusterAddrs , "redis_cluster_addrs" , "localhost:7000,localhost:7001,localhost:7002" , "comma separated list of redis server addresses" )
}
2021-03-06 07:20:56 +08:00
func setup ( tb testing . TB ) ( r * RDB ) {
tb . Helper ( )
2020-08-29 21:54:08 +08:00
if useRedisCluster {
addrs := strings . Split ( redisClusterAddrs , "," )
if len ( addrs ) == 0 {
2021-03-06 07:20:56 +08:00
tb . Fatal ( "No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag." )
2020-08-29 21:54:08 +08:00
}
r = NewRDB ( redis . NewClusterClient ( & redis . ClusterOptions {
Addrs : addrs ,
} ) )
} else {
r = NewRDB ( redis . NewClient ( & redis . Options {
Addr : redisAddr ,
DB : redisDB ,
} ) )
}
2019-12-29 12:12:14 +08:00
// Start each test with a clean slate.
2021-03-06 07:20:56 +08:00
h . FlushDB ( tb , r . client )
2019-12-29 12:12:14 +08:00
return r
}
2019-11-26 11:58:24 +08:00
func TestEnqueue ( t * testing . T ) {
2019-11-26 10:55:17 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2021-03-21 04:42:13 +08:00
t1 := h . NewTaskMessage ( "send_email" , h . JSON ( map [ string ] interface { } { "to" : "exampleuser@gmail.com" , "from" : "noreply@example.com" } ) )
t2 := h . NewTaskMessageWithQueue ( "generate_csv" , h . JSON ( map [ string ] interface { } { } ) , "csv" )
2020-08-07 21:31:02 +08:00
t3 := h . NewTaskMessageWithQueue ( "sync" , nil , "low" )
2020-01-09 22:21:43 +08:00
2021-12-11 01:07:41 +08:00
enqueueTime := time . Now ( )
r . SetClock ( timeutil . NewSimulatedClock ( enqueueTime ) )
2019-11-26 10:55:17 +08:00
tests := [ ] struct {
2019-12-22 23:15:45 +08:00
msg * base . TaskMessage
2019-11-26 10:55:17 +08:00
} {
2020-01-09 22:21:43 +08:00
{ t1 } ,
{ t2 } ,
{ t3 } ,
2019-11-20 23:01:24 +08:00
}
2019-11-26 10:55:17 +08:00
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client ) // clean up db before each test case.
2019-12-13 11:49:41 +08:00
2021-11-16 08:34:26 +08:00
err := r . Enqueue ( context . Background ( ) , tc . msg )
2019-11-26 10:55:17 +08:00
if err != nil {
2020-01-09 22:21:43 +08:00
t . Errorf ( "(*RDB).Enqueue(msg) = %v, want nil" , err )
2021-04-18 22:19:19 +08:00
continue
2019-11-26 10:55:17 +08:00
}
2020-01-09 22:21:43 +08:00
2021-04-18 22:19:19 +08:00
// Check Pending list has task ID.
pendingKey := base . PendingKey ( tc . msg . Queue )
2021-09-02 20:56:02 +08:00
pendingIDs := r . client . LRange ( context . Background ( ) , pendingKey , 0 , - 1 ) . Val ( )
2021-04-24 21:44:44 +08:00
if n := len ( pendingIDs ) ; n != 1 {
t . Errorf ( "Redis LIST %q contains %d IDs, want 1" , pendingKey , n )
2021-04-18 22:19:19 +08:00
continue
}
2021-09-10 21:29:37 +08:00
if pendingIDs [ 0 ] != tc . msg . ID {
t . Errorf ( "Redis LIST %q: got %v, want %v" , pendingKey , pendingIDs , [ ] string { tc . msg . ID } )
2019-11-26 10:55:17 +08:00
continue
}
2021-04-18 22:19:19 +08:00
// Check the value under the task key.
2021-09-10 21:29:37 +08:00
taskKey := base . TaskKey ( tc . msg . Queue , tc . msg . ID )
2021-09-02 20:56:02 +08:00
encoded := r . client . HGet ( context . Background ( ) , taskKey , "msg" ) . Val ( ) // "msg" field
2021-04-18 22:19:19 +08:00
decoded := h . MustUnmarshal ( t , encoded )
if diff := cmp . Diff ( tc . msg , decoded ) ; diff != "" {
t . Errorf ( "persisted message was %v, want %v; (-want, +got)\n%s" , decoded , tc . msg , diff )
}
2021-09-02 20:56:02 +08:00
state := r . client . HGet ( context . Background ( ) , taskKey , "state" ) . Val ( ) // "state" field
2021-04-18 22:19:19 +08:00
if state != "pending" {
t . Errorf ( "state field under task-key is set to %q, want %q" , state , "pending" )
}
2021-09-02 20:56:02 +08:00
timeout := r . client . HGet ( context . Background ( ) , taskKey , "timeout" ) . Val ( ) // "timeout" field
2021-04-18 22:19:19 +08:00
if want := strconv . Itoa ( int ( tc . msg . Timeout ) ) ; timeout != want {
t . Errorf ( "timeout field under task-key is set to %v, want %v" , timeout , want )
2019-11-26 10:55:17 +08:00
}
2021-09-02 20:56:02 +08:00
deadline := r . client . HGet ( context . Background ( ) , taskKey , "deadline" ) . Val ( ) // "deadline" field
2021-04-18 22:19:19 +08:00
if want := strconv . Itoa ( int ( tc . msg . Deadline ) ) ; deadline != want {
2021-05-10 10:20:54 +08:00
t . Errorf ( "deadline field under task-key is set to %v, want %v" , deadline , want )
2021-04-18 22:19:19 +08:00
}
2021-12-09 22:37:18 +08:00
pendingSince := r . client . HGet ( context . Background ( ) , taskKey , "pending_since" ) . Val ( ) // "pending_since" field
2021-12-11 22:27:44 +08:00
if want := strconv . Itoa ( int ( enqueueTime . UnixNano ( ) ) ) ; pendingSince != want {
2021-12-09 22:37:18 +08:00
t . Errorf ( "pending_since field under task-key is set to %v, want %v" , pendingSince , want )
}
2021-04-18 22:19:19 +08:00
// Check queue is in the AllQueues set.
2021-09-02 20:56:02 +08:00
if ! r . client . SIsMember ( context . Background ( ) , base . AllQueues , tc . msg . Queue ) . Val ( ) {
2020-08-07 21:31:02 +08:00
t . Errorf ( "%q is not a member of SET %q" , tc . msg . Queue , base . AllQueues )
2020-01-09 22:21:43 +08:00
}
2019-11-20 23:01:24 +08:00
}
}
2021-09-11 07:47:00 +08:00
func TestEnqueueTaskIdConflictError ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
m1 := base . TaskMessage {
ID : "custom_id" ,
Type : "foo" ,
Payload : nil ,
}
m2 := base . TaskMessage {
ID : "custom_id" ,
Type : "bar" ,
Payload : nil ,
}
tests := [ ] struct {
firstMsg * base . TaskMessage
secondMsg * base . TaskMessage
} {
{ firstMsg : & m1 , secondMsg : & m2 } ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case.
2021-11-16 08:34:26 +08:00
if err := r . Enqueue ( context . Background ( ) , tc . firstMsg ) ; err != nil {
2021-09-11 07:47:00 +08:00
t . Errorf ( "First message: Enqueue failed: %v" , err )
continue
}
2021-11-16 08:34:26 +08:00
if err := r . Enqueue ( context . Background ( ) , tc . secondMsg ) ; ! errors . Is ( err , errors . ErrTaskIdConflict ) {
2021-09-11 07:47:00 +08:00
t . Errorf ( "Second message: Enqueue returned %v, want %v" , err , errors . ErrTaskIdConflict )
continue
}
}
}
2020-03-18 21:49:39 +08:00
func TestEnqueueUnique ( t * testing . T ) {
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-03-18 21:49:39 +08:00
m1 := base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-03-18 21:49:39 +08:00
Type : "email" ,
2021-03-21 04:42:13 +08:00
Payload : h . JSON ( map [ string ] interface { } { "user_id" : json . Number ( "123" ) } ) ,
2020-03-18 21:49:39 +08:00
Queue : base . DefaultQueueName ,
2021-03-21 04:42:13 +08:00
UniqueKey : base . UniqueKey ( base . DefaultQueueName , "email" , h . JSON ( map [ string ] interface { } { "user_id" : 123 } ) ) ,
2020-03-18 21:49:39 +08:00
}
2021-12-11 01:07:41 +08:00
enqueueTime := time . Now ( )
r . SetClock ( timeutil . NewSimulatedClock ( enqueueTime ) )
2020-03-18 21:49:39 +08:00
tests := [ ] struct {
msg * base . TaskMessage
ttl time . Duration // uniqueness ttl
} {
{ & m1 , time . Minute } ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case.
2021-03-13 08:23:08 +08:00
// Enqueue the first message, should succeed.
2021-11-16 08:34:26 +08:00
err := r . EnqueueUnique ( context . Background ( ) , tc . msg , tc . ttl )
2020-03-18 21:49:39 +08:00
if err != nil {
t . Errorf ( "First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil" ,
tc . msg , tc . ttl , err )
continue
}
2021-03-13 08:23:08 +08:00
gotPending := h . GetPendingMessages ( t , r . client , tc . msg . Queue )
if len ( gotPending ) != 1 {
t . Errorf ( "%q has length %d, want 1" , base . PendingKey ( tc . msg . Queue ) , len ( gotPending ) )
continue
}
if diff := cmp . Diff ( tc . msg , gotPending [ 0 ] ) ; diff != "" {
t . Errorf ( "persisted data differed from the original input (-want, +got)\n%s" , diff )
}
2021-09-02 20:56:02 +08:00
if ! r . client . SIsMember ( context . Background ( ) , base . AllQueues , tc . msg . Queue ) . Val ( ) {
2021-03-13 08:23:08 +08:00
t . Errorf ( "%q is not a member of SET %q" , tc . msg . Queue , base . AllQueues )
}
2021-04-19 21:28:58 +08:00
// Check Pending list has task ID.
pendingKey := base . PendingKey ( tc . msg . Queue )
2021-09-02 20:56:02 +08:00
pendingIDs := r . client . LRange ( context . Background ( ) , pendingKey , 0 , - 1 ) . Val ( )
2021-04-19 21:28:58 +08:00
if len ( pendingIDs ) != 1 {
t . Errorf ( "Redis LIST %q contains %d IDs, want 1" , pendingKey , len ( pendingIDs ) )
2021-03-13 08:23:08 +08:00
continue
}
2021-09-10 21:29:37 +08:00
if pendingIDs [ 0 ] != tc . msg . ID {
t . Errorf ( "Redis LIST %q: got %v, want %v" , pendingKey , pendingIDs , [ ] string { tc . msg . ID } )
2021-04-19 21:28:58 +08:00
continue
2021-03-13 08:23:08 +08:00
}
2021-04-19 21:28:58 +08:00
// Check the value under the task key.
2021-09-10 21:29:37 +08:00
taskKey := base . TaskKey ( tc . msg . Queue , tc . msg . ID )
2021-09-02 20:56:02 +08:00
encoded := r . client . HGet ( context . Background ( ) , taskKey , "msg" ) . Val ( ) // "msg" field
2021-04-19 21:28:58 +08:00
decoded := h . MustUnmarshal ( t , encoded )
if diff := cmp . Diff ( tc . msg , decoded ) ; diff != "" {
t . Errorf ( "persisted message was %v, want %v; (-want, +got)\n%s" , decoded , tc . msg , diff )
}
2021-09-02 20:56:02 +08:00
state := r . client . HGet ( context . Background ( ) , taskKey , "state" ) . Val ( ) // "state" field
2021-04-19 21:28:58 +08:00
if state != "pending" {
t . Errorf ( "state field under task-key is set to %q, want %q" , state , "pending" )
}
2021-09-02 20:56:02 +08:00
timeout := r . client . HGet ( context . Background ( ) , taskKey , "timeout" ) . Val ( ) // "timeout" field
2021-04-19 21:28:58 +08:00
if want := strconv . Itoa ( int ( tc . msg . Timeout ) ) ; timeout != want {
t . Errorf ( "timeout field under task-key is set to %v, want %v" , timeout , want )
}
2021-09-02 20:56:02 +08:00
deadline := r . client . HGet ( context . Background ( ) , taskKey , "deadline" ) . Val ( ) // "deadline" field
2021-04-19 21:28:58 +08:00
if want := strconv . Itoa ( int ( tc . msg . Deadline ) ) ; deadline != want {
2021-06-09 21:06:43 +08:00
t . Errorf ( "deadline field under task-key is set to %v, want %v" , deadline , want )
}
2021-12-09 22:37:18 +08:00
pendingSince := r . client . HGet ( context . Background ( ) , taskKey , "pending_since" ) . Val ( ) // "pending_since" field
2021-12-11 22:27:44 +08:00
if want := strconv . Itoa ( int ( enqueueTime . UnixNano ( ) ) ) ; pendingSince != want {
2021-12-09 22:37:18 +08:00
t . Errorf ( "pending_since field under task-key is set to %v, want %v" , pendingSince , want )
}
2021-09-02 20:56:02 +08:00
uniqueKey := r . client . HGet ( context . Background ( ) , taskKey , "unique_key" ) . Val ( ) // "unique_key" field
2021-06-09 21:06:43 +08:00
if uniqueKey != tc . msg . UniqueKey {
t . Errorf ( "uniqueue_key field under task key is set to %q, want %q" , uniqueKey , tc . msg . UniqueKey )
2021-04-19 21:28:58 +08:00
}
// Check queue is in the AllQueues set.
2021-09-02 20:56:02 +08:00
if ! r . client . SIsMember ( context . Background ( ) , base . AllQueues , tc . msg . Queue ) . Val ( ) {
2021-03-13 08:23:08 +08:00
t . Errorf ( "%q is not a member of SET %q" , tc . msg . Queue , base . AllQueues )
}
2020-03-18 21:49:39 +08:00
2021-03-13 08:23:08 +08:00
// Enqueue the second message, should fail.
2021-11-16 08:34:26 +08:00
got := r . EnqueueUnique ( context . Background ( ) , tc . msg , tc . ttl )
2021-05-09 22:39:01 +08:00
if ! errors . Is ( got , errors . ErrDuplicateTask ) {
t . Errorf ( "Second message: (*RDB).EnqueueUnique(msg, ttl) = %v, want %v" , got , errors . ErrDuplicateTask )
2020-03-18 21:49:39 +08:00
continue
}
2021-09-02 20:56:02 +08:00
gotTTL := r . client . TTL ( context . Background ( ) , tc . msg . UniqueKey ) . Val ( )
2021-04-19 21:28:58 +08:00
if ! cmp . Equal ( tc . ttl . Seconds ( ) , gotTTL . Seconds ( ) , cmpopts . EquateApprox ( 0 , 2 ) ) {
2020-03-18 21:49:39 +08:00
t . Errorf ( "TTL %q = %v, want %v" , tc . msg . UniqueKey , gotTTL , tc . ttl )
continue
}
}
}
2021-09-11 07:47:00 +08:00
func TestEnqueueUniqueTaskIdConflictError ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
m1 := base . TaskMessage {
ID : "custom_id" ,
Type : "foo" ,
Payload : nil ,
UniqueKey : "unique_key_one" ,
}
m2 := base . TaskMessage {
ID : "custom_id" ,
Type : "bar" ,
Payload : nil ,
UniqueKey : "unique_key_two" ,
}
const ttl = 30 * time . Second
tests := [ ] struct {
firstMsg * base . TaskMessage
secondMsg * base . TaskMessage
} {
{ firstMsg : & m1 , secondMsg : & m2 } ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case.
2021-11-16 08:34:26 +08:00
if err := r . EnqueueUnique ( context . Background ( ) , tc . firstMsg , ttl ) ; err != nil {
2021-09-11 07:47:00 +08:00
t . Errorf ( "First message: EnqueueUnique failed: %v" , err )
continue
}
2021-11-16 08:34:26 +08:00
if err := r . EnqueueUnique ( context . Background ( ) , tc . secondMsg , ttl ) ; ! errors . Is ( err , errors . ErrTaskIdConflict ) {
2021-09-11 07:47:00 +08:00
t . Errorf ( "Second message: EnqueueUnique returned %v, want %v" , err , errors . ErrTaskIdConflict )
continue
}
}
}
2019-11-26 12:57:53 +08:00
func TestDequeue ( t * testing . T ) {
2019-11-26 10:55:17 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-06-17 21:46:54 +08:00
now := time . Now ( )
t1 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-17 21:46:54 +08:00
Type : "send_email" ,
2021-03-21 04:42:13 +08:00
Payload : h . JSON ( map [ string ] interface { } { "subject" : "hello!" } ) ,
2021-03-13 08:23:08 +08:00
Queue : "default" ,
2020-06-17 21:46:54 +08:00
Timeout : 1800 ,
Deadline : 0 ,
}
2020-06-22 23:33:58 +08:00
t1Deadline := now . Unix ( ) + t1 . Timeout
2020-06-17 21:46:54 +08:00
t2 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-17 21:46:54 +08:00
Type : "export_csv" ,
Payload : nil ,
2021-03-13 08:23:08 +08:00
Queue : "critical" ,
2020-06-17 21:46:54 +08:00
Timeout : 0 ,
Deadline : 1593021600 ,
}
t2Deadline := t2 . Deadline
t3 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-17 21:46:54 +08:00
Type : "reindex" ,
Payload : nil ,
2021-03-13 08:23:08 +08:00
Queue : "low" ,
2020-06-22 23:33:58 +08:00
Timeout : int64 ( ( 5 * time . Minute ) . Seconds ( ) ) ,
Deadline : time . Now ( ) . Add ( 10 * time . Minute ) . Unix ( ) ,
2020-06-17 21:46:54 +08:00
}
2020-01-07 22:28:34 +08:00
2019-11-26 12:57:53 +08:00
tests := [ ] struct {
2020-09-06 03:43:15 +08:00
pending map [ string ] [ ] * base . TaskMessage
args [ ] string // list of queues to query
wantMsg * base . TaskMessage
wantDeadline time . Time
wantPending map [ string ] [ ] * base . TaskMessage
wantActive map [ string ] [ ] * base . TaskMessage
wantDeadlines map [ string ] [ ] base . Z
2019-11-26 12:57:53 +08:00
} {
2019-12-28 12:37:15 +08:00
{
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 22:28:34 +08:00
"default" : { t1 } ,
} ,
2020-06-18 20:45:10 +08:00
args : [ ] string { "default" } ,
wantMsg : t1 ,
2020-06-23 21:34:59 +08:00
wantDeadline : time . Unix ( t1Deadline , 0 ) ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 22:28:34 +08:00
"default" : { } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:04:16 +08:00
"default" : { t1 } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } } ,
2020-06-17 21:46:54 +08:00
} ,
2019-12-28 12:37:15 +08:00
} ,
2020-01-07 22:28:34 +08:00
{
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 22:28:34 +08:00
"default" : { t1 } ,
"critical" : { t2 } ,
"low" : { t3 } ,
} ,
2020-06-18 20:45:10 +08:00
args : [ ] string { "critical" , "default" , "low" } ,
wantMsg : t2 ,
2020-06-23 21:34:59 +08:00
wantDeadline : time . Unix ( t2Deadline , 0 ) ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 22:28:34 +08:00
"default" : { t1 } ,
"critical" : { } ,
"low" : { t3 } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:04:16 +08:00
"default" : { } ,
"critical" : { t2 } ,
"low" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
"critical" : { { Message : t2 , Score : t2Deadline } } ,
"low" : { } ,
2020-06-17 21:46:54 +08:00
} ,
2020-01-07 22:28:34 +08:00
} ,
{
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2021-03-13 08:23:08 +08:00
"default" : { t1 } ,
2020-01-07 22:28:34 +08:00
"critical" : { } ,
2021-03-13 08:23:08 +08:00
"low" : { t3 } ,
2020-01-07 22:28:34 +08:00
} ,
2020-06-18 20:45:10 +08:00
args : [ ] string { "critical" , "default" , "low" } ,
2021-03-13 08:23:08 +08:00
wantMsg : t1 ,
wantDeadline : time . Unix ( t1Deadline , 0 ) ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 22:28:34 +08:00
"default" : { } ,
"critical" : { } ,
2021-03-13 08:23:08 +08:00
"low" : { t3 } ,
2020-06-17 21:46:54 +08:00
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2021-03-13 08:23:08 +08:00
"default" : { t1 } ,
2020-08-08 21:04:16 +08:00
"critical" : { } ,
"low" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
2021-03-13 08:23:08 +08:00
"default" : { { Message : t1 , Score : t1Deadline } } ,
2020-08-08 21:04:16 +08:00
"critical" : { } ,
"low" : { } ,
2020-01-07 22:28:34 +08:00
} ,
} ,
2021-05-10 21:04:42 +08:00
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case
h . SeedAllPendingQueues ( t , r . client , tc . pending )
gotMsg , gotDeadline , err := r . Dequeue ( tc . args ... )
if err != nil {
t . Errorf ( "(*RDB).Dequeue(%v) returned error %v" , tc . args , err )
continue
}
if ! cmp . Equal ( gotMsg , tc . wantMsg ) {
t . Errorf ( "(*RDB).Dequeue(%v) returned message %v; want %v" ,
tc . args , gotMsg , tc . wantMsg )
continue
}
if ! cmp . Equal ( gotDeadline , tc . wantDeadline , cmpopts . EquateApproxTime ( 1 * time . Second ) ) {
t . Errorf ( "(*RDB).Dequeue(%v) returned deadline %v; want %v" ,
tc . args , gotDeadline , tc . wantDeadline )
continue
}
for queue , want := range tc . wantPending {
gotPending := h . GetPendingMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotPending , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . PendingKey ( queue ) , diff )
}
}
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . ActiveKey ( queue ) , diff )
}
}
for queue , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , queue )
2021-09-13 07:56:22 +08:00
cmpOpts := [ ] cmp . Option { h . SortZSetEntryOpt , h . EquateInt64Approx ( 2 ) } // allow up to 2 second margin in Score
if diff := cmp . Diff ( want , gotDeadlines , cmpOpts ... ) ; diff != "" {
2021-05-10 21:04:42 +08:00
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . DeadlinesKey ( queue ) , diff )
}
}
}
}
func TestDequeueError ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
tests := [ ] struct {
pending map [ string ] [ ] * base . TaskMessage
args [ ] string // list of queues to query
wantErr error
wantPending map [ string ] [ ] * base . TaskMessage
wantActive map [ string ] [ ] * base . TaskMessage
wantDeadlines map [ string ] [ ] base . Z
} {
{
pending : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
} ,
args : [ ] string { "default" } ,
wantErr : errors . ErrNoProcessableTask ,
wantPending : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
} ,
wantActive : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
} ,
} ,
2020-01-07 22:28:34 +08:00
{
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 22:28:34 +08:00
"default" : { } ,
"critical" : { } ,
"low" : { } ,
} ,
2021-05-10 21:04:42 +08:00
args : [ ] string { "critical" , "default" , "low" } ,
wantErr : errors . ErrNoProcessableTask ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 22:28:34 +08:00
"default" : { } ,
"critical" : { } ,
"low" : { } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:04:16 +08:00
"default" : { } ,
"critical" : { } ,
"low" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
"critical" : { } ,
"low" : { } ,
} ,
2019-12-28 12:37:15 +08:00
} ,
2019-11-22 13:45:27 +08:00
}
2019-11-20 23:01:24 +08:00
2019-11-26 12:57:53 +08:00
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client ) // clean up db before each test case
2020-09-05 22:03:43 +08:00
h . SeedAllPendingQueues ( t , r . client , tc . pending )
2019-12-13 11:49:41 +08:00
2021-05-10 21:04:42 +08:00
gotMsg , gotDeadline , gotErr := r . Dequeue ( tc . args ... )
if ! errors . Is ( gotErr , tc . wantErr ) {
2020-06-18 20:45:10 +08:00
t . Errorf ( "(*RDB).Dequeue(%v) returned error %v; want %v" ,
2021-05-10 21:04:42 +08:00
tc . args , gotErr , tc . wantErr )
2020-06-18 20:45:10 +08:00
continue
}
2021-05-10 21:04:42 +08:00
if gotMsg != nil {
t . Errorf ( "(*RDB).Dequeue(%v) returned message %v; want nil" , tc . args , gotMsg )
2020-06-18 20:45:10 +08:00
continue
}
2021-05-10 21:04:42 +08:00
if ! gotDeadline . IsZero ( ) {
t . Errorf ( "(*RDB).Dequeue(%v) returned deadline %v; want %v" , tc . args , gotDeadline , time . Time { } )
2020-06-04 20:37:17 +08:00
continue
}
2020-09-05 22:03:43 +08:00
for queue , want := range tc . wantPending {
gotPending := h . GetPendingMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotPending , h . SortMsgOpt ) ; diff != "" {
2021-03-13 08:23:08 +08:00
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . PendingKey ( queue ) , diff )
2020-06-04 20:37:17 +08:00
}
}
2020-09-06 03:43:15 +08:00
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . ActiveKey ( queue ) , diff )
2020-08-08 21:04:16 +08:00
}
2020-06-04 20:37:17 +08:00
}
2020-08-08 21:04:16 +08:00
for queue , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotDeadlines , h . SortZSetEntryOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . DeadlinesKey ( queue ) , diff )
}
2020-06-17 21:46:54 +08:00
}
2020-06-04 20:37:17 +08:00
}
}
func TestDequeueIgnoresPausedQueues ( t * testing . T ) {
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-08-13 21:54:32 +08:00
t1 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-08-13 21:54:32 +08:00
Type : "send_email" ,
2021-03-21 04:42:13 +08:00
Payload : h . JSON ( map [ string ] interface { } { "subject" : "hello!" } ) ,
2020-08-13 21:54:32 +08:00
Queue : "default" ,
Timeout : 1800 ,
Deadline : 0 ,
}
t2 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-08-13 21:54:32 +08:00
Type : "export_csv" ,
Payload : nil ,
Queue : "critical" ,
Timeout : 1800 ,
Deadline : 0 ,
}
2020-06-04 20:37:17 +08:00
tests := [ ] struct {
2020-09-06 03:43:15 +08:00
paused [ ] string // list of paused queues
pending map [ string ] [ ] * base . TaskMessage
args [ ] string // list of queues to query
wantMsg * base . TaskMessage
2021-05-10 21:04:42 +08:00
wantErr error
2020-09-06 03:43:15 +08:00
wantPending map [ string ] [ ] * base . TaskMessage
wantActive map [ string ] [ ] * base . TaskMessage
2020-06-04 20:37:17 +08:00
} {
{
paused : [ ] string { "default" } ,
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-06-04 20:37:17 +08:00
"default" : { t1 } ,
"critical" : { t2 } ,
} ,
2020-06-18 20:45:10 +08:00
args : [ ] string { "default" , "critical" } ,
wantMsg : t2 ,
2021-05-10 21:04:42 +08:00
wantErr : nil ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-06-04 20:37:17 +08:00
"default" : { t1 } ,
"critical" : { } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:04:16 +08:00
"default" : { } ,
"critical" : { t2 } ,
} ,
2020-06-04 20:37:17 +08:00
} ,
{
paused : [ ] string { "default" } ,
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-06-04 20:37:17 +08:00
"default" : { t1 } ,
} ,
2020-06-18 20:45:10 +08:00
args : [ ] string { "default" } ,
wantMsg : nil ,
2021-05-10 21:04:42 +08:00
wantErr : errors . ErrNoProcessableTask ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-06-04 20:37:17 +08:00
"default" : { t1 } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:04:16 +08:00
"default" : { } ,
} ,
2020-06-04 20:37:17 +08:00
} ,
{
paused : [ ] string { "critical" , "default" } ,
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-06-04 20:37:17 +08:00
"default" : { t1 } ,
"critical" : { t2 } ,
} ,
2020-06-18 20:45:10 +08:00
args : [ ] string { "default" , "critical" } ,
wantMsg : nil ,
2021-05-10 21:04:42 +08:00
wantErr : errors . ErrNoProcessableTask ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-06-04 20:37:17 +08:00
"default" : { t1 } ,
"critical" : { t2 } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:04:16 +08:00
"default" : { } ,
"critical" : { } ,
} ,
2020-06-04 20:37:17 +08:00
} ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case
for _ , qname := range tc . paused {
if err := r . Pause ( qname ) ; err != nil {
t . Fatal ( err )
}
}
2020-09-05 22:03:43 +08:00
h . SeedAllPendingQueues ( t , r . client , tc . pending )
2020-06-04 20:37:17 +08:00
2020-06-18 20:45:10 +08:00
got , _ , err := r . Dequeue ( tc . args ... )
2021-05-10 21:04:42 +08:00
if ! cmp . Equal ( got , tc . wantMsg ) || ! errors . Is ( err , tc . wantErr ) {
2020-06-04 20:37:17 +08:00
t . Errorf ( "Dequeue(%v) = %v, %v; want %v, %v" ,
2021-05-10 21:04:42 +08:00
tc . args , got , err , tc . wantMsg , tc . wantErr )
2019-11-26 12:57:53 +08:00
continue
}
2019-12-28 12:37:15 +08:00
2020-09-05 22:03:43 +08:00
for queue , want := range tc . wantPending {
gotPending := h . GetPendingMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotPending , h . SortMsgOpt ) ; diff != "" {
2021-03-13 08:23:08 +08:00
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . PendingKey ( queue ) , diff )
2020-01-07 22:28:34 +08:00
}
}
2020-09-06 03:43:15 +08:00
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q: (-want,+got):\n%s" , base . ActiveKey ( queue ) , diff )
2020-08-08 21:04:16 +08:00
}
2019-11-26 12:57:53 +08:00
}
2019-11-20 23:01:24 +08:00
}
}
2019-11-24 07:09:50 +08:00
2019-12-04 22:33:05 +08:00
func TestDone ( t * testing . T ) {
2019-11-28 22:50:05 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-06-18 22:10:57 +08:00
now := time . Now ( )
t1 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-18 22:10:57 +08:00
Type : "send_email" ,
Payload : nil ,
Timeout : 1800 ,
Deadline : 0 ,
2020-08-08 21:48:49 +08:00
Queue : "default" ,
2020-06-18 22:10:57 +08:00
}
t2 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-18 22:10:57 +08:00
Type : "export_csv" ,
Payload : nil ,
Timeout : 0 ,
Deadline : 1592485787 ,
2020-08-08 21:48:49 +08:00
Queue : "custom" ,
2020-06-18 22:10:57 +08:00
}
2020-03-18 21:49:39 +08:00
t3 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-03-18 21:49:39 +08:00
Type : "reindex" ,
Payload : nil ,
2020-06-18 22:10:57 +08:00
Timeout : 1800 ,
Deadline : 0 ,
2021-11-06 07:52:54 +08:00
UniqueKey : "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72" ,
2020-03-18 21:49:39 +08:00
Queue : "default" ,
}
2020-08-08 21:48:49 +08:00
t1Deadline := now . Unix ( ) + t1 . Timeout
t2Deadline := t2 . Deadline
2021-11-06 07:52:54 +08:00
t3Deadline := now . Unix ( ) + t3 . Timeout
2019-11-28 22:50:05 +08:00
tests := [ ] struct {
2020-09-06 03:43:15 +08:00
desc string
2021-03-13 08:23:08 +08:00
active map [ string ] [ ] * base . TaskMessage // initial state of the active list
2021-11-06 07:52:54 +08:00
deadlines map [ string ] [ ] base . Z // initial state of the deadlines set
2020-09-06 03:43:15 +08:00
target * base . TaskMessage // task to remove
wantActive map [ string ] [ ] * base . TaskMessage // final state of the active list
wantDeadlines map [ string ] [ ] base . Z // final state of the deadline set
2019-11-28 22:50:05 +08:00
} {
{
2020-08-11 12:49:12 +08:00
desc : "removes message from the correct queue" ,
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:48:49 +08:00
"default" : { t1 } ,
"custom" : { t2 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
} ,
target : t1 ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-11 12:49:12 +08:00
"default" : { } ,
"custom" : { t2 } ,
2020-08-08 21:48:49 +08:00
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
2020-06-18 22:10:57 +08:00
} ,
2019-11-28 22:50:05 +08:00
} ,
{
2020-08-11 12:49:12 +08:00
desc : "with one queue" ,
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:48:49 +08:00
"default" : { t1 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } } ,
} ,
target : t1 ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:48:49 +08:00
"default" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
} ,
2019-11-28 22:50:05 +08:00
} ,
2020-03-18 21:49:39 +08:00
{
2020-08-11 12:49:12 +08:00
desc : "with multiple messages in a queue" ,
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-08 21:48:49 +08:00
"default" : { t1 , t3 } ,
"custom" : { t2 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t3 , Score : t3Deadline } } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
} ,
target : t3 ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-11 12:49:12 +08:00
"default" : { t1 } ,
2020-08-08 21:48:49 +08:00
"custom" : { t2 } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
2020-06-18 22:10:57 +08:00
} ,
2020-03-18 21:49:39 +08:00
} ,
2019-11-28 22:50:05 +08:00
}
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client ) // clean up db before each test case
2020-08-08 21:48:49 +08:00
h . SeedAllDeadlines ( t , r . client , tc . deadlines )
2021-03-13 08:23:08 +08:00
h . SeedAllActiveQueues ( t , r . client , tc . active )
for _ , msgs := range tc . active {
2020-08-08 21:48:49 +08:00
for _ , msg := range msgs {
// Set uniqueness lock if unique key is present.
if len ( msg . UniqueKey ) > 0 {
2021-09-10 21:29:37 +08:00
err := r . client . SetNX ( context . Background ( ) , msg . UniqueKey , msg . ID , time . Minute ) . Err ( )
2020-08-08 21:48:49 +08:00
if err != nil {
t . Fatal ( err )
}
2020-03-18 21:49:39 +08:00
}
}
}
2019-11-28 22:50:05 +08:00
2019-12-04 22:33:05 +08:00
err := r . Done ( tc . target )
2019-11-28 22:50:05 +08:00
if err != nil {
2020-08-11 12:49:12 +08:00
t . Errorf ( "%s; (*RDB).Done(task) = %v, want nil" , tc . desc , err )
2019-11-28 22:50:05 +08:00
continue
}
2020-09-06 03:43:15 +08:00
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "%s; mismatch found in %q: (-want, +got):\n%s" , tc . desc , base . ActiveKey ( queue ) , diff )
2020-08-08 21:48:49 +08:00
continue
}
2019-11-28 23:47:12 +08:00
}
2020-08-08 21:48:49 +08:00
for queue , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotDeadlines , h . SortZSetEntryOpt ) ; diff != "" {
2020-08-11 12:49:12 +08:00
t . Errorf ( "%s; mismatch found in %q: (-want, +got):\n%s" , tc . desc , base . DeadlinesKey ( queue ) , diff )
2020-08-08 21:48:49 +08:00
continue
}
2020-06-18 22:10:57 +08:00
}
2019-12-23 21:33:48 +08:00
2020-08-08 21:48:49 +08:00
processedKey := base . ProcessedKey ( tc . target . Queue , time . Now ( ) )
2021-09-02 20:56:02 +08:00
gotProcessed := r . client . Get ( context . Background ( ) , processedKey ) . Val ( )
2019-12-23 21:33:48 +08:00
if gotProcessed != "1" {
2020-08-11 12:49:12 +08:00
t . Errorf ( "%s; GET %q = %q, want 1" , tc . desc , processedKey , gotProcessed )
2019-12-23 21:33:48 +08:00
}
2021-09-02 20:56:02 +08:00
gotTTL := r . client . TTL ( context . Background ( ) , processedKey ) . Val ( )
2019-12-23 21:33:48 +08:00
if gotTTL > statsTTL {
2020-08-11 12:49:12 +08:00
t . Errorf ( "%s; TTL %q = %v, want less than or equal to %v" , tc . desc , processedKey , gotTTL , statsTTL )
2019-12-23 21:33:48 +08:00
}
2020-03-18 21:49:39 +08:00
2021-09-02 20:56:02 +08:00
if len ( tc . target . UniqueKey ) > 0 && r . client . Exists ( context . Background ( ) , tc . target . UniqueKey ) . Val ( ) != 0 {
2020-08-11 12:49:12 +08:00
t . Errorf ( "%s; Uniqueness lock %q still exists" , tc . desc , tc . target . UniqueKey )
2020-03-18 21:49:39 +08:00
}
2019-11-28 23:47:12 +08:00
}
}
2021-11-06 07:52:54 +08:00
func TestMarkAsComplete ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
now := time . Now ( )
t1 := & base . TaskMessage {
ID : uuid . NewString ( ) ,
Type : "send_email" ,
Payload : nil ,
Timeout : 1800 ,
Deadline : 0 ,
Queue : "default" ,
Retention : 3600 ,
}
t2 := & base . TaskMessage {
ID : uuid . NewString ( ) ,
Type : "export_csv" ,
Payload : nil ,
Timeout : 0 ,
Deadline : now . Add ( 2 * time . Hour ) . Unix ( ) ,
Queue : "custom" ,
Retention : 7200 ,
}
t3 := & base . TaskMessage {
ID : uuid . NewString ( ) ,
Type : "reindex" ,
Payload : nil ,
Timeout : 1800 ,
Deadline : 0 ,
UniqueKey : "asynq:{default}:unique:b0804ec967f48520697662a204f5fe72" ,
Queue : "default" ,
Retention : 1800 ,
}
t1Deadline := now . Unix ( ) + t1 . Timeout
t2Deadline := t2 . Deadline
t3Deadline := now . Unix ( ) + t3 . Timeout
tests := [ ] struct {
desc string
active map [ string ] [ ] * base . TaskMessage // initial state of the active list
deadlines map [ string ] [ ] base . Z // initial state of the deadlines set
completed map [ string ] [ ] base . Z // initial state of the completed set
target * base . TaskMessage // task to mark as completed
wantActive map [ string ] [ ] * base . TaskMessage // final state of the active list
wantDeadlines map [ string ] [ ] base . Z // final state of the deadline set
wantCompleted func ( completedAt time . Time ) map [ string ] [ ] base . Z // final state of the completed set
} {
{
desc : "select a message from the correct queue" ,
active : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 } ,
"custom" : { t2 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
} ,
completed : map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : { } ,
} ,
target : t1 ,
wantActive : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
"custom" : { t2 } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
} ,
wantCompleted : func ( completedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : { { Message : h . TaskMessageWithCompletedAt ( * t1 , completedAt ) , Score : completedAt . Unix ( ) + t1 . Retention } } ,
"custom" : { } ,
}
} ,
} ,
{
desc : "with one queue" ,
active : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } } ,
} ,
completed : map [ string ] [ ] base . Z {
"default" : { } ,
} ,
target : t1 ,
wantActive : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
} ,
wantCompleted : func ( completedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : { { Message : h . TaskMessageWithCompletedAt ( * t1 , completedAt ) , Score : completedAt . Unix ( ) + t1 . Retention } } ,
}
} ,
} ,
{
desc : "with multiple messages in a queue" ,
active : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 , t3 } ,
"custom" : { t2 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t3 , Score : t3Deadline } } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
} ,
completed : map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : { } ,
} ,
target : t3 ,
wantActive : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 } ,
"custom" : { t2 } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } } ,
"custom" : { { Message : t2 , Score : t2Deadline } } ,
} ,
wantCompleted : func ( completedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : { { Message : h . TaskMessageWithCompletedAt ( * t3 , completedAt ) , Score : completedAt . Unix ( ) + t3 . Retention } } ,
"custom" : { } ,
}
} ,
} ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case
h . SeedAllDeadlines ( t , r . client , tc . deadlines )
h . SeedAllActiveQueues ( t , r . client , tc . active )
h . SeedAllCompletedQueues ( t , r . client , tc . completed )
for _ , msgs := range tc . active {
for _ , msg := range msgs {
// Set uniqueness lock if unique key is present.
if len ( msg . UniqueKey ) > 0 {
err := r . client . SetNX ( context . Background ( ) , msg . UniqueKey , msg . ID , time . Minute ) . Err ( )
if err != nil {
t . Fatal ( err )
}
}
}
}
completedAt := time . Now ( )
err := r . MarkAsComplete ( tc . target )
if err != nil {
t . Errorf ( "%s; (*RDB).MarkAsCompleted(task) = %v, want nil" , tc . desc , err )
continue
}
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "%s; mismatch found in %q: (-want, +got):\n%s" , tc . desc , base . ActiveKey ( queue ) , diff )
continue
}
}
for queue , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotDeadlines , h . SortZSetEntryOpt ) ; diff != "" {
t . Errorf ( "%s; mismatch found in %q: (-want, +got):\n%s" , tc . desc , base . DeadlinesKey ( queue ) , diff )
continue
}
}
for queue , want := range tc . wantCompleted ( completedAt ) {
gotCompleted := h . GetCompletedEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotCompleted , h . SortZSetEntryOpt ) ; diff != "" {
t . Errorf ( "%s; mismatch found in %q: (-want, +got):\n%s" , tc . desc , base . CompletedKey ( queue ) , diff )
continue
}
}
processedKey := base . ProcessedKey ( tc . target . Queue , time . Now ( ) )
gotProcessed := r . client . Get ( context . Background ( ) , processedKey ) . Val ( )
if gotProcessed != "1" {
t . Errorf ( "%s; GET %q = %q, want 1" , tc . desc , processedKey , gotProcessed )
}
gotTTL := r . client . TTL ( context . Background ( ) , processedKey ) . Val ( )
if gotTTL > statsTTL {
t . Errorf ( "%s; TTL %q = %v, want less than or equal to %v" , tc . desc , processedKey , gotTTL , statsTTL )
}
if len ( tc . target . UniqueKey ) > 0 && r . client . Exists ( context . Background ( ) , tc . target . UniqueKey ) . Val ( ) != 0 {
t . Errorf ( "%s; Uniqueness lock %q still exists" , tc . desc , tc . target . UniqueKey )
}
}
}
2019-12-18 12:07:17 +08:00
func TestRequeue ( t * testing . T ) {
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-06-19 03:12:29 +08:00
now := time . Now ( )
t1 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 03:12:29 +08:00
Type : "send_email" ,
Payload : nil ,
Queue : "default" ,
Timeout : 1800 ,
}
t2 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 03:12:29 +08:00
Type : "export_csv" ,
Payload : nil ,
Queue : "default" ,
Timeout : 3000 ,
}
t3 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 03:12:29 +08:00
Type : "send_email" ,
Payload : nil ,
Queue : "critical" ,
Timeout : 80 ,
}
2020-06-22 23:33:58 +08:00
t1Deadline := now . Unix ( ) + t1 . Timeout
t2Deadline := now . Unix ( ) + t2 . Timeout
t3Deadline := now . Unix ( ) + t3 . Timeout
2019-12-18 12:07:17 +08:00
tests := [ ] struct {
2020-09-06 03:43:15 +08:00
pending map [ string ] [ ] * base . TaskMessage // initial state of queues
2021-03-13 08:23:08 +08:00
active map [ string ] [ ] * base . TaskMessage // initial state of the active list
2020-09-06 03:43:15 +08:00
deadlines map [ string ] [ ] base . Z // initial state of the deadlines set
target * base . TaskMessage // task to requeue
wantPending map [ string ] [ ] * base . TaskMessage // final state of queues
wantActive map [ string ] [ ] * base . TaskMessage // final state of the active list
wantDeadlines map [ string ] [ ] base . Z // final state of the deadlines set
2019-12-18 12:07:17 +08:00
} {
{
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { } ,
} ,
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t1 , t2 } ,
2020-01-27 05:41:06 +08:00
} ,
2020-08-11 12:49:12 +08:00
deadlines : map [ string ] [ ] base . Z {
2020-08-09 20:40:44 +08:00
"default" : {
{ Message : t1 , Score : t1Deadline } ,
{ Message : t2 , Score : t2Deadline } ,
} ,
2020-06-19 03:12:29 +08:00
} ,
target : t1 ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t1 } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t2 } ,
2020-01-27 05:41:06 +08:00
} ,
2020-08-11 12:49:12 +08:00
wantDeadlines : map [ string ] [ ] base . Z {
"default" : {
2020-08-09 20:40:44 +08:00
{ Message : t2 , Score : t2Deadline } ,
} ,
2020-06-19 03:12:29 +08:00
} ,
2019-12-18 12:07:17 +08:00
} ,
{
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t1 } ,
2020-01-27 05:41:06 +08:00
} ,
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t2 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : {
{ Message : t2 , Score : t2Deadline } ,
} ,
2020-06-19 03:12:29 +08:00
} ,
target : t2 ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t1 , t2 } ,
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { } ,
2020-01-27 05:41:06 +08:00
} ,
2019-12-18 12:07:17 +08:00
} ,
2020-01-27 05:41:06 +08:00
{
2020-09-05 22:03:43 +08:00
pending : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t1 } ,
"critical" : { } ,
2020-01-27 05:41:06 +08:00
} ,
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t2 } ,
"critical" : { t3 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
2020-08-11 12:49:12 +08:00
"default" : { { Message : t2 , Score : t2Deadline } } ,
"critical" : { { Message : t3 , Score : t3Deadline } } ,
2020-06-19 03:12:29 +08:00
} ,
target : t3 ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t1 } ,
"critical" : { t3 } ,
2020-01-27 05:41:06 +08:00
} ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 20:40:44 +08:00
"default" : { t2 } ,
"critical" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t2 , Score : t2Deadline } } ,
"critical" : { } ,
2020-06-19 03:12:29 +08:00
} ,
2020-01-27 05:41:06 +08:00
} ,
2019-12-18 12:07:17 +08:00
}
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client ) // clean up db before each test case
2020-09-05 22:03:43 +08:00
h . SeedAllPendingQueues ( t , r . client , tc . pending )
2021-03-13 08:23:08 +08:00
h . SeedAllActiveQueues ( t , r . client , tc . active )
2020-08-09 20:40:44 +08:00
h . SeedAllDeadlines ( t , r . client , tc . deadlines )
2019-12-18 12:07:17 +08:00
err := r . Requeue ( tc . target )
if err != nil {
t . Errorf ( "(*RDB).Requeue(task) = %v, want nil" , err )
continue
}
2020-09-05 22:03:43 +08:00
for qname , want := range tc . wantPending {
gotPending := h . GetPendingMessages ( t , r . client , qname )
if diff := cmp . Diff ( want , gotPending , h . SortMsgOpt ) ; diff != "" {
2021-03-13 08:23:08 +08:00
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . PendingKey ( qname ) , diff )
2020-01-27 05:41:06 +08:00
}
2019-12-18 12:07:17 +08:00
}
2020-09-06 03:43:15 +08:00
for qname , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , qname )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q: (-want, +got):\n%s" , base . ActiveKey ( qname ) , diff )
2020-08-09 20:40:44 +08:00
}
2019-12-18 12:07:17 +08:00
}
2020-08-09 20:40:44 +08:00
for qname , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , qname )
2020-08-11 12:49:12 +08:00
if diff := cmp . Diff ( want , gotDeadlines , h . SortZSetEntryOpt ) ; diff != "" {
2020-08-09 20:40:44 +08:00
t . Errorf ( "mismatch found in %q: (-want, +got):\n%s" , base . DeadlinesKey ( qname ) , diff )
}
2020-06-19 03:12:29 +08:00
}
2019-12-18 12:07:17 +08:00
}
}
2019-12-28 12:37:15 +08:00
func TestSchedule ( t * testing . T ) {
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2021-03-21 04:42:13 +08:00
msg := h . NewTaskMessage ( "send_email" , h . JSON ( map [ string ] interface { } { "subject" : "hello" } ) )
2019-12-28 12:37:15 +08:00
tests := [ ] struct {
msg * base . TaskMessage
processAt time . Time
} {
2021-03-13 08:23:08 +08:00
{ msg , time . Now ( ) . Add ( 15 * time . Minute ) } ,
2019-12-28 12:37:15 +08:00
}
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client ) // clean up db before each test case
2019-12-28 12:37:15 +08:00
2021-11-16 08:34:26 +08:00
err := r . Schedule ( context . Background ( ) , tc . msg , tc . processAt )
2019-12-28 12:37:15 +08:00
if err != nil {
2021-04-24 21:44:44 +08:00
t . Errorf ( "(*RDB).Schedule(%v, %v) = %v, want nil" ,
tc . msg , tc . processAt , err )
2019-12-28 12:37:15 +08:00
continue
}
2021-04-24 21:44:44 +08:00
// Check Scheduled zset has task ID.
scheduledKey := base . ScheduledKey ( tc . msg . Queue )
2021-09-02 20:56:02 +08:00
zs := r . client . ZRangeWithScores ( context . Background ( ) , scheduledKey , 0 , - 1 ) . Val ( )
2021-04-24 21:44:44 +08:00
if n := len ( zs ) ; n != 1 {
t . Errorf ( "Redis ZSET %q contains %d elements, want 1" ,
scheduledKey , n )
continue
}
2021-09-10 21:29:37 +08:00
if got := zs [ 0 ] . Member . ( string ) ; got != tc . msg . ID {
2021-04-24 21:44:44 +08:00
t . Errorf ( "Redis ZSET %q member: got %v, want %v" ,
2021-09-10 21:29:37 +08:00
scheduledKey , got , tc . msg . ID )
2019-12-28 12:37:15 +08:00
continue
}
2021-04-24 21:44:44 +08:00
if got := int64 ( zs [ 0 ] . Score ) ; got != tc . processAt . Unix ( ) {
t . Errorf ( "Redis ZSET %q score: got %d, want %d" ,
scheduledKey , got , tc . processAt . Unix ( ) )
2019-12-28 12:37:15 +08:00
continue
}
2021-04-24 21:44:44 +08:00
// Check the values under the task key.
2021-09-10 21:29:37 +08:00
taskKey := base . TaskKey ( tc . msg . Queue , tc . msg . ID )
2021-09-02 20:56:02 +08:00
encoded := r . client . HGet ( context . Background ( ) , taskKey , "msg" ) . Val ( ) // "msg" field
2021-04-24 21:44:44 +08:00
decoded := h . MustUnmarshal ( t , encoded )
if diff := cmp . Diff ( tc . msg , decoded ) ; diff != "" {
t . Errorf ( "persisted message was %v, want %v; (-want, +got)\n%s" ,
decoded , tc . msg , diff )
}
2021-09-02 20:56:02 +08:00
state := r . client . HGet ( context . Background ( ) , taskKey , "state" ) . Val ( ) // "state" field
2021-04-24 21:44:44 +08:00
if want := "scheduled" ; state != want {
t . Errorf ( "state field under task-key is set to %q, want %q" ,
state , want )
}
2021-09-02 20:56:02 +08:00
timeout := r . client . HGet ( context . Background ( ) , taskKey , "timeout" ) . Val ( ) // "timeout" field
2021-04-24 21:44:44 +08:00
if want := strconv . Itoa ( int ( tc . msg . Timeout ) ) ; timeout != want {
t . Errorf ( "timeout field under task-key is set to %v, want %v" , timeout , want )
}
2021-09-02 20:56:02 +08:00
deadline := r . client . HGet ( context . Background ( ) , taskKey , "deadline" ) . Val ( ) // "deadline" field
2021-04-24 21:44:44 +08:00
if want := strconv . Itoa ( int ( tc . msg . Deadline ) ) ; deadline != want {
t . Errorf ( "deadline field under task-ke is set to %v, want %v" , deadline , want )
}
// Check queue is in the AllQueues set.
2021-09-02 20:56:02 +08:00
if ! r . client . SIsMember ( context . Background ( ) , base . AllQueues , tc . msg . Queue ) . Val ( ) {
2020-08-07 21:31:02 +08:00
t . Errorf ( "%q is not a member of SET %q" , tc . msg . Queue , base . AllQueues )
}
2019-12-28 12:37:15 +08:00
}
}
2021-09-11 07:47:00 +08:00
func TestScheduleTaskIdConflictError ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
m1 := base . TaskMessage {
ID : "custom_id" ,
Type : "foo" ,
Payload : nil ,
UniqueKey : "unique_key_one" ,
}
m2 := base . TaskMessage {
ID : "custom_id" ,
Type : "bar" ,
Payload : nil ,
UniqueKey : "unique_key_two" ,
}
processAt := time . Now ( ) . Add ( 30 * time . Second )
tests := [ ] struct {
firstMsg * base . TaskMessage
secondMsg * base . TaskMessage
} {
{ firstMsg : & m1 , secondMsg : & m2 } ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case.
2021-11-16 08:34:26 +08:00
if err := r . Schedule ( context . Background ( ) , tc . firstMsg , processAt ) ; err != nil {
2021-09-11 07:47:00 +08:00
t . Errorf ( "First message: Schedule failed: %v" , err )
continue
}
2021-11-16 08:34:26 +08:00
if err := r . Schedule ( context . Background ( ) , tc . secondMsg , processAt ) ; ! errors . Is ( err , errors . ErrTaskIdConflict ) {
2021-09-11 07:47:00 +08:00
t . Errorf ( "Second message: Schedule returned %v, want %v" , err , errors . ErrTaskIdConflict )
continue
}
}
}
2020-03-18 21:49:39 +08:00
func TestScheduleUnique ( t * testing . T ) {
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-03-18 21:49:39 +08:00
m1 := base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-03-18 21:49:39 +08:00
Type : "email" ,
2021-03-21 04:42:13 +08:00
Payload : h . JSON ( map [ string ] interface { } { "user_id" : 123 } ) ,
2020-03-18 21:49:39 +08:00
Queue : base . DefaultQueueName ,
2021-03-21 04:42:13 +08:00
UniqueKey : base . UniqueKey ( base . DefaultQueueName , "email" , h . JSON ( map [ string ] interface { } { "user_id" : 123 } ) ) ,
2020-03-18 21:49:39 +08:00
}
tests := [ ] struct {
msg * base . TaskMessage
processAt time . Time
ttl time . Duration // uniqueness lock ttl
} {
{ & m1 , time . Now ( ) . Add ( 15 * time . Minute ) , time . Minute } ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case
2021-05-09 22:39:01 +08:00
desc := "(*RDB).ScheduleUnique(msg, processAt, ttl)"
2021-11-16 08:34:26 +08:00
err := r . ScheduleUnique ( context . Background ( ) , tc . msg , tc . processAt , tc . ttl )
2020-03-18 21:49:39 +08:00
if err != nil {
t . Errorf ( "Frist task: %s = %v, want nil" , desc , err )
continue
}
2021-04-25 11:41:44 +08:00
// Check Scheduled zset has task ID.
scheduledKey := base . ScheduledKey ( tc . msg . Queue )
2021-09-02 20:56:02 +08:00
zs := r . client . ZRangeWithScores ( context . Background ( ) , scheduledKey , 0 , - 1 ) . Val ( )
2021-04-25 11:41:44 +08:00
if n := len ( zs ) ; n != 1 {
t . Errorf ( "Redis ZSET %q contains %d elements, want 1" ,
scheduledKey , n )
2020-03-18 21:49:39 +08:00
continue
}
2021-09-10 21:29:37 +08:00
if got := zs [ 0 ] . Member . ( string ) ; got != tc . msg . ID {
2021-04-25 11:41:44 +08:00
t . Errorf ( "Redis ZSET %q member: got %v, want %v" ,
2021-09-10 21:29:37 +08:00
scheduledKey , got , tc . msg . ID )
2021-04-25 11:41:44 +08:00
continue
}
if got := int64 ( zs [ 0 ] . Score ) ; got != tc . processAt . Unix ( ) {
t . Errorf ( "Redis ZSET %q score: got %d, want %d" ,
scheduledKey , got , tc . processAt . Unix ( ) )
2020-03-18 21:49:39 +08:00
continue
}
2021-04-25 11:41:44 +08:00
// Check the values under the task key.
2021-09-10 21:29:37 +08:00
taskKey := base . TaskKey ( tc . msg . Queue , tc . msg . ID )
2021-09-02 20:56:02 +08:00
encoded := r . client . HGet ( context . Background ( ) , taskKey , "msg" ) . Val ( ) // "msg" field
2021-04-25 11:41:44 +08:00
decoded := h . MustUnmarshal ( t , encoded )
if diff := cmp . Diff ( tc . msg , decoded ) ; diff != "" {
t . Errorf ( "persisted message was %v, want %v; (-want, +got)\n%s" ,
decoded , tc . msg , diff )
}
2021-09-02 20:56:02 +08:00
state := r . client . HGet ( context . Background ( ) , taskKey , "state" ) . Val ( ) // "state" field
2021-04-25 11:41:44 +08:00
if want := "scheduled" ; state != want {
t . Errorf ( "state field under task-key is set to %q, want %q" ,
state , want )
}
2021-09-02 20:56:02 +08:00
timeout := r . client . HGet ( context . Background ( ) , taskKey , "timeout" ) . Val ( ) // "timeout" field
2021-04-25 11:41:44 +08:00
if want := strconv . Itoa ( int ( tc . msg . Timeout ) ) ; timeout != want {
t . Errorf ( "timeout field under task-key is set to %v, want %v" , timeout , want )
}
2021-09-02 20:56:02 +08:00
deadline := r . client . HGet ( context . Background ( ) , taskKey , "deadline" ) . Val ( ) // "deadline" field
2021-04-25 11:41:44 +08:00
if want := strconv . Itoa ( int ( tc . msg . Deadline ) ) ; deadline != want {
2021-06-09 21:06:43 +08:00
t . Errorf ( "deadline field under task-key is set to %v, want %v" , deadline , want )
}
2021-09-02 20:56:02 +08:00
uniqueKey := r . client . HGet ( context . Background ( ) , taskKey , "unique_key" ) . Val ( ) // "unique_key" field
2021-06-09 21:06:43 +08:00
if uniqueKey != tc . msg . UniqueKey {
t . Errorf ( "uniqueue_key field under task key is set to %q, want %q" , uniqueKey , tc . msg . UniqueKey )
2021-04-25 11:41:44 +08:00
}
// Check queue is in the AllQueues set.
2021-09-02 20:56:02 +08:00
if ! r . client . SIsMember ( context . Background ( ) , base . AllQueues , tc . msg . Queue ) . Val ( ) {
2021-04-25 11:41:44 +08:00
t . Errorf ( "%q is not a member of SET %q" , tc . msg . Queue , base . AllQueues )
}
// Enqueue the second message, should fail.
2021-11-16 08:34:26 +08:00
got := r . ScheduleUnique ( context . Background ( ) , tc . msg , tc . processAt , tc . ttl )
2021-05-09 22:39:01 +08:00
if ! errors . Is ( got , errors . ErrDuplicateTask ) {
t . Errorf ( "Second task: %s = %v, want %v" , desc , got , errors . ErrDuplicateTask )
2021-04-25 11:41:44 +08:00
continue
2020-03-18 21:49:39 +08:00
}
2021-09-02 20:56:02 +08:00
gotTTL := r . client . TTL ( context . Background ( ) , tc . msg . UniqueKey ) . Val ( )
2020-03-18 21:49:39 +08:00
if ! cmp . Equal ( tc . ttl . Seconds ( ) , gotTTL . Seconds ( ) , cmpopts . EquateApprox ( 0 , 1 ) ) {
t . Errorf ( "TTL %q = %v, want %v" , tc . msg . UniqueKey , gotTTL , tc . ttl )
continue
}
}
}
2021-09-11 07:47:00 +08:00
func TestScheduleUniqueTaskIdConflictError ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
m1 := base . TaskMessage {
ID : "custom_id" ,
Type : "foo" ,
Payload : nil ,
UniqueKey : "unique_key_one" ,
}
m2 := base . TaskMessage {
ID : "custom_id" ,
Type : "bar" ,
Payload : nil ,
UniqueKey : "unique_key_two" ,
}
const ttl = 30 * time . Second
processAt := time . Now ( ) . Add ( 30 * time . Second )
tests := [ ] struct {
firstMsg * base . TaskMessage
secondMsg * base . TaskMessage
} {
{ firstMsg : & m1 , secondMsg : & m2 } ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client ) // clean up db before each test case.
2021-11-16 08:34:26 +08:00
if err := r . ScheduleUnique ( context . Background ( ) , tc . firstMsg , processAt , ttl ) ; err != nil {
2021-09-11 07:47:00 +08:00
t . Errorf ( "First message: ScheduleUnique failed: %v" , err )
continue
}
2021-11-16 08:34:26 +08:00
if err := r . ScheduleUnique ( context . Background ( ) , tc . secondMsg , processAt , ttl ) ; ! errors . Is ( err , errors . ErrTaskIdConflict ) {
2021-09-11 07:47:00 +08:00
t . Errorf ( "Second message: ScheduleUnique returned %v, want %v" , err , errors . ErrTaskIdConflict )
continue
}
}
}
2019-12-28 12:37:15 +08:00
func TestRetry ( t * testing . T ) {
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-06-19 01:25:01 +08:00
now := time . Now ( )
t1 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 01:25:01 +08:00
Type : "send_email" ,
2021-03-21 04:42:13 +08:00
Payload : h . JSON ( map [ string ] interface { } { "subject" : "Hola!" } ) ,
2020-06-23 21:34:59 +08:00
Retried : 10 ,
2020-06-19 01:25:01 +08:00
Timeout : 1800 ,
2020-08-09 03:17:33 +08:00
Queue : "default" ,
2020-06-19 01:25:01 +08:00
}
t2 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 01:25:01 +08:00
Type : "gen_thumbnail" ,
2021-03-21 04:42:13 +08:00
Payload : h . JSON ( map [ string ] interface { } { "path" : "some/path/to/image.jpg" } ) ,
2020-06-19 01:25:01 +08:00
Timeout : 3000 ,
2020-08-09 03:17:33 +08:00
Queue : "default" ,
2020-06-19 01:25:01 +08:00
}
t3 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 01:25:01 +08:00
Type : "reindex" ,
Payload : nil ,
Timeout : 60 ,
2020-08-09 03:17:33 +08:00
Queue : "default" ,
}
t4 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-08-09 03:17:33 +08:00
Type : "send_notification" ,
Payload : nil ,
Timeout : 1800 ,
Queue : "custom" ,
2020-06-19 01:25:01 +08:00
}
2020-08-09 03:17:33 +08:00
t1Deadline := now . Unix ( ) + t1 . Timeout
t2Deadline := now . Unix ( ) + t2 . Timeout
t4Deadline := now . Unix ( ) + t4 . Timeout
2019-12-28 12:37:15 +08:00
errMsg := "SMTP server is not responding"
tests := [ ] struct {
2021-03-13 08:23:08 +08:00
active map [ string ] [ ] * base . TaskMessage
2020-09-06 03:43:15 +08:00
deadlines map [ string ] [ ] base . Z
retry map [ string ] [ ] base . Z
msg * base . TaskMessage
processAt time . Time
errMsg string
wantActive map [ string ] [ ] * base . TaskMessage
wantDeadlines map [ string ] [ ] base . Z
2021-06-03 21:58:07 +08:00
getWantRetry func ( failedAt time . Time ) map [ string ] [ ] base . Z
2019-12-28 12:37:15 +08:00
} {
{
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:17:33 +08:00
"default" : { t1 , t2 } ,
2020-06-19 01:25:01 +08:00
} ,
2020-08-09 03:17:33 +08:00
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t2 , Score : t2Deadline } } ,
2019-12-28 12:37:15 +08:00
} ,
2020-08-09 03:17:33 +08:00
retry : map [ string ] [ ] base . Z {
"default" : { { Message : t3 , Score : now . Add ( time . Minute ) . Unix ( ) } } ,
2020-06-19 01:25:01 +08:00
} ,
2020-08-09 03:17:33 +08:00
msg : t1 ,
processAt : now . Add ( 5 * time . Minute ) ,
errMsg : errMsg ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:17:33 +08:00
"default" : { t2 } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t2 , Score : t2Deadline } } ,
} ,
2021-06-03 21:58:07 +08:00
getWantRetry : func ( failedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : {
{ Message : h . TaskMessageAfterRetry ( * t1 , errMsg , failedAt ) , Score : now . Add ( 5 * time . Minute ) . Unix ( ) } ,
{ Message : t3 , Score : now . Add ( time . Minute ) . Unix ( ) } ,
} ,
}
2020-08-09 03:17:33 +08:00
} ,
} ,
{
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:17:33 +08:00
"default" : { t1 , t2 } ,
"custom" : { t4 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t2 , Score : t2Deadline } } ,
"custom" : { { Message : t4 , Score : t4Deadline } } ,
} ,
retry : map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : { } ,
} ,
msg : t4 ,
processAt : now . Add ( 5 * time . Minute ) ,
errMsg : errMsg ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:17:33 +08:00
"default" : { t1 , t2 } ,
"custom" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t2 , Score : t2Deadline } } ,
"custom" : { } ,
} ,
2021-06-03 21:58:07 +08:00
getWantRetry : func ( failedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : {
{ Message : h . TaskMessageAfterRetry ( * t4 , errMsg , failedAt ) , Score : now . Add ( 5 * time . Minute ) . Unix ( ) } ,
} ,
}
2019-12-28 12:37:15 +08:00
} ,
} ,
}
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client )
2021-03-13 08:23:08 +08:00
h . SeedAllActiveQueues ( t , r . client , tc . active )
2020-08-09 03:17:33 +08:00
h . SeedAllDeadlines ( t , r . client , tc . deadlines )
h . SeedAllRetryQueues ( t , r . client , tc . retry )
2019-12-28 12:37:15 +08:00
2021-06-03 21:58:07 +08:00
callTime := time . Now ( ) // time when method was called
2021-09-01 21:00:54 +08:00
err := r . Retry ( tc . msg , tc . processAt , tc . errMsg , true /*isFailure*/ )
2019-12-28 12:37:15 +08:00
if err != nil {
t . Errorf ( "(*RDB).Retry = %v, want nil" , err )
continue
}
2020-09-06 03:43:15 +08:00
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . ActiveKey ( queue ) , diff )
2020-08-09 03:17:33 +08:00
}
2019-12-28 12:37:15 +08:00
}
2020-08-09 03:17:33 +08:00
for queue , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotDeadlines , h . SortZSetEntryOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . DeadlinesKey ( queue ) , diff )
}
2020-06-19 01:25:01 +08:00
}
2021-05-19 12:00:53 +08:00
cmpOpts := [ ] cmp . Option {
h . SortZSetEntryOpt ,
cmpopts . EquateApproxTime ( 5 * time . Second ) , // for LastFailedAt field
}
2021-06-03 21:58:07 +08:00
wantRetry := tc . getWantRetry ( callTime )
for queue , want := range wantRetry {
2020-08-09 03:17:33 +08:00
gotRetry := h . GetRetryEntries ( t , r . client , queue )
2021-05-19 12:00:53 +08:00
if diff := cmp . Diff ( want , gotRetry , cmpOpts ... ) ; diff != "" {
2020-08-09 03:17:33 +08:00
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . RetryKey ( queue ) , diff )
}
2019-12-28 12:37:15 +08:00
}
2020-08-09 03:17:33 +08:00
processedKey := base . ProcessedKey ( tc . msg . Queue , time . Now ( ) )
2021-09-02 20:56:02 +08:00
gotProcessed := r . client . Get ( context . Background ( ) , processedKey ) . Val ( )
2019-12-28 12:37:15 +08:00
if gotProcessed != "1" {
t . Errorf ( "GET %q = %q, want 1" , processedKey , gotProcessed )
}
2021-09-02 20:56:02 +08:00
gotTTL := r . client . TTL ( context . Background ( ) , processedKey ) . Val ( )
2019-12-28 12:37:15 +08:00
if gotTTL > statsTTL {
t . Errorf ( "TTL %q = %v, want less than or equal to %v" , processedKey , gotTTL , statsTTL )
}
2020-08-09 03:17:33 +08:00
failedKey := base . FailedKey ( tc . msg . Queue , time . Now ( ) )
2021-09-02 20:56:02 +08:00
gotFailed := r . client . Get ( context . Background ( ) , failedKey ) . Val ( )
2020-08-09 03:44:08 +08:00
if gotFailed != "1" {
t . Errorf ( "GET %q = %q, want 1" , failedKey , gotFailed )
2019-12-28 12:37:15 +08:00
}
2021-09-02 20:56:02 +08:00
gotTTL = r . client . TTL ( context . Background ( ) , failedKey ) . Val ( )
2019-12-28 12:37:15 +08:00
if gotTTL > statsTTL {
2020-08-09 03:17:33 +08:00
t . Errorf ( "TTL %q = %v, want less than or equal to %v" , failedKey , gotTTL , statsTTL )
2019-12-28 12:37:15 +08:00
}
}
}
2019-12-31 01:22:25 +08:00
2021-09-01 21:00:54 +08:00
func TestRetryWithNonFailureError ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
now := time . Now ( )
t1 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2021-09-01 21:00:54 +08:00
Type : "send_email" ,
Payload : h . JSON ( map [ string ] interface { } { "subject" : "Hola!" } ) ,
Retried : 10 ,
Timeout : 1800 ,
Queue : "default" ,
}
t2 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2021-09-01 21:00:54 +08:00
Type : "gen_thumbnail" ,
Payload : h . JSON ( map [ string ] interface { } { "path" : "some/path/to/image.jpg" } ) ,
Timeout : 3000 ,
Queue : "default" ,
}
t3 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2021-09-01 21:00:54 +08:00
Type : "reindex" ,
Payload : nil ,
Timeout : 60 ,
Queue : "default" ,
}
t4 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2021-09-01 21:00:54 +08:00
Type : "send_notification" ,
Payload : nil ,
Timeout : 1800 ,
Queue : "custom" ,
}
t1Deadline := now . Unix ( ) + t1 . Timeout
t2Deadline := now . Unix ( ) + t2 . Timeout
t4Deadline := now . Unix ( ) + t4 . Timeout
errMsg := "SMTP server is not responding"
tests := [ ] struct {
active map [ string ] [ ] * base . TaskMessage
deadlines map [ string ] [ ] base . Z
retry map [ string ] [ ] base . Z
msg * base . TaskMessage
processAt time . Time
errMsg string
wantActive map [ string ] [ ] * base . TaskMessage
wantDeadlines map [ string ] [ ] base . Z
getWantRetry func ( failedAt time . Time ) map [ string ] [ ] base . Z
} {
{
active : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 , t2 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t2 , Score : t2Deadline } } ,
} ,
retry : map [ string ] [ ] base . Z {
"default" : { { Message : t3 , Score : now . Add ( time . Minute ) . Unix ( ) } } ,
} ,
msg : t1 ,
processAt : now . Add ( 5 * time . Minute ) ,
errMsg : errMsg ,
wantActive : map [ string ] [ ] * base . TaskMessage {
"default" : { t2 } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t2 , Score : t2Deadline } } ,
} ,
getWantRetry : func ( failedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : {
// Task message should include the error message but without incrementing the retry count.
{ Message : h . TaskMessageWithError ( * t1 , errMsg , failedAt ) , Score : now . Add ( 5 * time . Minute ) . Unix ( ) } ,
{ Message : t3 , Score : now . Add ( time . Minute ) . Unix ( ) } ,
} ,
}
} ,
} ,
{
active : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 , t2 } ,
"custom" : { t4 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t2 , Score : t2Deadline } } ,
"custom" : { { Message : t4 , Score : t4Deadline } } ,
} ,
retry : map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : { } ,
} ,
msg : t4 ,
processAt : now . Add ( 5 * time . Minute ) ,
errMsg : errMsg ,
wantActive : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 , t2 } ,
"custom" : { } ,
} ,
wantDeadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : t1Deadline } , { Message : t2 , Score : t2Deadline } } ,
"custom" : { } ,
} ,
getWantRetry : func ( failedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : {
// Task message should include the error message but without incrementing the retry count.
{ Message : h . TaskMessageWithError ( * t4 , errMsg , failedAt ) , Score : now . Add ( 5 * time . Minute ) . Unix ( ) } ,
} ,
}
} ,
} ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client )
h . SeedAllActiveQueues ( t , r . client , tc . active )
h . SeedAllDeadlines ( t , r . client , tc . deadlines )
h . SeedAllRetryQueues ( t , r . client , tc . retry )
callTime := time . Now ( ) // time when method was called
err := r . Retry ( tc . msg , tc . processAt , tc . errMsg , false /*isFailure*/ )
if err != nil {
t . Errorf ( "(*RDB).Retry = %v, want nil" , err )
continue
}
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . ActiveKey ( queue ) , diff )
}
}
for queue , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotDeadlines , h . SortZSetEntryOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . DeadlinesKey ( queue ) , diff )
}
}
cmpOpts := [ ] cmp . Option {
h . SortZSetEntryOpt ,
cmpopts . EquateApproxTime ( 5 * time . Second ) , // for LastFailedAt field
}
wantRetry := tc . getWantRetry ( callTime )
for queue , want := range wantRetry {
gotRetry := h . GetRetryEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotRetry , cmpOpts ... ) ; diff != "" {
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . RetryKey ( queue ) , diff )
}
}
// If isFailure is set to false, no stats should be recorded to avoid skewing the error rate.
processedKey := base . ProcessedKey ( tc . msg . Queue , time . Now ( ) )
2021-09-02 20:56:02 +08:00
gotProcessed := r . client . Get ( context . Background ( ) , processedKey ) . Val ( )
2021-09-01 21:00:54 +08:00
if gotProcessed != "" {
t . Errorf ( "GET %q = %q, want empty" , processedKey , gotProcessed )
}
// If isFailure is set to false, no stats should be recorded to avoid skewing the error rate.
failedKey := base . FailedKey ( tc . msg . Queue , time . Now ( ) )
2021-09-02 20:56:02 +08:00
gotFailed := r . client . Get ( context . Background ( ) , failedKey ) . Val ( )
2021-09-01 21:00:54 +08:00
if gotFailed != "" {
t . Errorf ( "GET %q = %q, want empty" , failedKey , gotFailed )
}
}
}
2021-01-13 03:01:21 +08:00
func TestArchive ( t * testing . T ) {
2019-11-28 23:47:12 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-06-19 01:53:58 +08:00
now := time . Now ( )
t1 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 01:53:58 +08:00
Type : "send_email" ,
Payload : nil ,
Queue : "default" ,
Retry : 25 ,
2020-08-09 03:44:08 +08:00
Retried : 25 ,
2020-06-19 01:53:58 +08:00
Timeout : 1800 ,
}
2020-06-22 23:33:58 +08:00
t1Deadline := now . Unix ( ) + t1 . Timeout
2020-06-19 01:53:58 +08:00
t2 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 01:53:58 +08:00
Type : "reindex" ,
Payload : nil ,
Queue : "default" ,
Retry : 25 ,
Retried : 0 ,
Timeout : 3000 ,
}
2020-06-22 23:33:58 +08:00
t2Deadline := now . Unix ( ) + t2 . Timeout
2020-06-19 01:53:58 +08:00
t3 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-06-19 01:53:58 +08:00
Type : "generate_csv" ,
Payload : nil ,
Queue : "default" ,
Retry : 25 ,
Retried : 0 ,
Timeout : 60 ,
}
2020-06-22 23:33:58 +08:00
t3Deadline := now . Unix ( ) + t3 . Timeout
2020-08-09 03:44:08 +08:00
t4 := & base . TaskMessage {
2021-09-10 21:29:37 +08:00
ID : uuid . NewString ( ) ,
2020-08-09 03:44:08 +08:00
Type : "send_email" ,
Payload : nil ,
Queue : "custom" ,
Retry : 25 ,
Retried : 25 ,
Timeout : 1800 ,
}
t4Deadline := now . Unix ( ) + t4 . Timeout
2019-12-16 09:16:13 +08:00
errMsg := "SMTP server not responding"
2019-11-28 23:47:12 +08:00
// TODO(hibiken): add test cases for trimming
tests := [ ] struct {
2021-06-03 21:58:07 +08:00
active map [ string ] [ ] * base . TaskMessage
deadlines map [ string ] [ ] base . Z
archived map [ string ] [ ] base . Z
target * base . TaskMessage // task to archive
wantActive map [ string ] [ ] * base . TaskMessage
wantDeadlines map [ string ] [ ] base . Z
getWantArchived func ( failedAt time . Time ) map [ string ] [ ] base . Z
2019-11-28 23:47:12 +08:00
} {
{
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:44:08 +08:00
"default" : { t1 , t2 } ,
2020-06-19 01:53:58 +08:00
} ,
2020-08-09 03:44:08 +08:00
deadlines : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : t1Deadline } ,
{ Message : t2 , Score : t2Deadline } ,
2019-12-29 12:12:14 +08:00
} ,
2019-12-16 09:16:13 +08:00
} ,
2021-01-13 03:01:21 +08:00
archived : map [ string ] [ ] base . Z {
2020-08-09 03:44:08 +08:00
"default" : {
{ Message : t3 , Score : now . Add ( - time . Hour ) . Unix ( ) } ,
} ,
} ,
target : t1 ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:44:08 +08:00
"default" : { t2 } ,
} ,
2020-08-11 12:49:12 +08:00
wantDeadlines : map [ string ] [ ] base . Z {
2020-08-09 03:44:08 +08:00
"default" : { { Message : t2 , Score : t2Deadline } } ,
} ,
2021-06-03 21:58:07 +08:00
getWantArchived : func ( failedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : {
{ Message : h . TaskMessageWithError ( * t1 , errMsg , failedAt ) , Score : failedAt . Unix ( ) } ,
{ Message : t3 , Score : now . Add ( - time . Hour ) . Unix ( ) } ,
} ,
}
2020-08-09 03:44:08 +08:00
} ,
} ,
{
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:44:08 +08:00
"default" : { t1 , t2 , t3 } ,
} ,
deadlines : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : t1Deadline } ,
{ Message : t2 , Score : t2Deadline } ,
{ Message : t3 , Score : t3Deadline } ,
} ,
} ,
2021-01-13 03:01:21 +08:00
archived : map [ string ] [ ] base . Z {
2020-08-09 03:44:08 +08:00
"default" : { } ,
} ,
target : t1 ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:44:08 +08:00
"default" : { t2 , t3 } ,
2020-06-19 01:53:58 +08:00
} ,
2020-08-09 03:44:08 +08:00
wantDeadlines : map [ string ] [ ] base . Z {
"default" : {
{ Message : t2 , Score : t2Deadline } ,
{ Message : t3 , Score : t3Deadline } ,
2019-12-29 12:12:14 +08:00
} ,
2020-08-09 03:44:08 +08:00
} ,
2021-06-03 21:58:07 +08:00
getWantArchived : func ( failedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : {
{ Message : h . TaskMessageWithError ( * t1 , errMsg , failedAt ) , Score : failedAt . Unix ( ) } ,
} ,
}
2019-12-16 09:16:13 +08:00
} ,
} ,
{
2021-03-13 08:23:08 +08:00
active : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:44:08 +08:00
"default" : { t1 } ,
"custom" : { t4 } ,
2020-06-19 01:53:58 +08:00
} ,
2020-08-09 03:44:08 +08:00
deadlines : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : t1Deadline } ,
} ,
"custom" : {
{ Message : t4 , Score : t4Deadline } ,
} ,
} ,
2021-01-13 03:01:21 +08:00
archived : map [ string ] [ ] base . Z {
2020-08-09 03:44:08 +08:00
"default" : { } ,
"custom" : { } ,
} ,
target : t4 ,
2020-09-06 03:43:15 +08:00
wantActive : map [ string ] [ ] * base . TaskMessage {
2020-08-09 03:44:08 +08:00
"default" : { t1 } ,
"custom" : { } ,
} ,
2020-08-11 12:49:12 +08:00
wantDeadlines : map [ string ] [ ] base . Z {
2020-08-09 03:44:08 +08:00
"default" : { { Message : t1 , Score : t1Deadline } } ,
"custom" : { } ,
2020-06-19 01:53:58 +08:00
} ,
2021-06-03 21:58:07 +08:00
getWantArchived : func ( failedAt time . Time ) map [ string ] [ ] base . Z {
return map [ string ] [ ] base . Z {
"default" : { } ,
"custom" : {
{ Message : h . TaskMessageWithError ( * t4 , errMsg , failedAt ) , Score : failedAt . Unix ( ) } ,
} ,
}
2019-12-16 09:16:13 +08:00
} ,
2019-11-28 23:47:12 +08:00
} ,
}
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client ) // clean up db before each test case
2021-03-13 08:23:08 +08:00
h . SeedAllActiveQueues ( t , r . client , tc . active )
2020-08-09 03:44:08 +08:00
h . SeedAllDeadlines ( t , r . client , tc . deadlines )
2021-01-13 03:01:21 +08:00
h . SeedAllArchivedQueues ( t , r . client , tc . archived )
2019-11-28 23:47:12 +08:00
2021-06-03 21:58:07 +08:00
callTime := time . Now ( ) // record time `Archive` was called
2021-01-13 03:01:21 +08:00
err := r . Archive ( tc . target , errMsg )
2019-11-28 23:47:12 +08:00
if err != nil {
2021-01-13 03:01:21 +08:00
t . Errorf ( "(*RDB).Archive(%v, %v) = %v, want nil" , tc . target , errMsg , err )
2019-11-28 23:47:12 +08:00
continue
}
2020-09-06 03:43:15 +08:00
for queue , want := range tc . wantActive {
gotActive := h . GetActiveMessages ( t , r . client , queue )
if diff := cmp . Diff ( want , gotActive , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q: (-want, +got)\n%s" , base . ActiveKey ( queue ) , diff )
2020-08-09 03:44:08 +08:00
}
2019-12-16 09:16:13 +08:00
}
2020-08-09 03:44:08 +08:00
for queue , want := range tc . wantDeadlines {
gotDeadlines := h . GetDeadlinesEntries ( t , r . client , queue )
if diff := cmp . Diff ( want , gotDeadlines , h . SortZSetEntryOpt ) ; diff != "" {
2021-01-13 03:01:21 +08:00
t . Errorf ( "mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s" , base . DeadlinesKey ( queue ) , diff )
2020-08-09 03:44:08 +08:00
}
2020-06-19 01:53:58 +08:00
}
2021-06-03 21:58:07 +08:00
for queue , want := range tc . getWantArchived ( callTime ) {
2021-01-13 03:01:21 +08:00
gotArchived := h . GetArchivedEntries ( t , r . client , queue )
2021-05-19 12:00:53 +08:00
if diff := cmp . Diff ( want , gotArchived , h . SortZSetEntryOpt , zScoreCmpOpt , timeCmpOpt ) ; diff != "" {
2021-01-13 03:01:21 +08:00
t . Errorf ( "mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s" , base . ArchivedKey ( queue ) , diff )
2020-08-09 03:44:08 +08:00
}
2019-11-28 22:50:05 +08:00
}
2019-12-23 21:33:48 +08:00
2020-08-09 03:44:08 +08:00
processedKey := base . ProcessedKey ( tc . target . Queue , time . Now ( ) )
2021-09-02 20:56:02 +08:00
gotProcessed := r . client . Get ( context . Background ( ) , processedKey ) . Val ( )
2019-12-23 21:33:48 +08:00
if gotProcessed != "1" {
t . Errorf ( "GET %q = %q, want 1" , processedKey , gotProcessed )
}
2021-09-02 20:56:02 +08:00
gotTTL := r . client . TTL ( context . Background ( ) , processedKey ) . Val ( )
2019-12-23 21:33:48 +08:00
if gotTTL > statsTTL {
t . Errorf ( "TTL %q = %v, want less than or equal to %v" , processedKey , gotTTL , statsTTL )
}
2020-08-09 03:44:08 +08:00
failedKey := base . FailedKey ( tc . target . Queue , time . Now ( ) )
2021-09-02 20:56:02 +08:00
gotFailed := r . client . Get ( context . Background ( ) , failedKey ) . Val ( )
2020-08-09 03:44:08 +08:00
if gotFailed != "1" {
t . Errorf ( "GET %q = %q, want 1" , failedKey , gotFailed )
2019-12-23 21:33:48 +08:00
}
2021-09-02 20:56:02 +08:00
gotTTL = r . client . TTL ( context . Background ( ) , processedKey ) . Val ( )
2019-12-23 21:33:48 +08:00
if gotTTL > statsTTL {
2020-08-09 03:44:08 +08:00
t . Errorf ( "TTL %q = %v, want less than or equal to %v" , failedKey , gotTTL , statsTTL )
2019-12-23 21:33:48 +08:00
}
2019-11-28 22:50:05 +08:00
}
}
2021-03-13 08:23:08 +08:00
func TestForwardIfReady ( t * testing . T ) {
2019-11-26 10:55:17 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2019-12-29 12:12:14 +08:00
t1 := h . NewTaskMessage ( "send_email" , nil )
t2 := h . NewTaskMessage ( "generate_csv" , nil )
t3 := h . NewTaskMessage ( "gen_thumbnail" , nil )
2020-08-09 21:26:14 +08:00
t4 := h . NewTaskMessageWithQueue ( "important_task" , nil , "critical" )
t5 := h . NewTaskMessageWithQueue ( "minor_task" , nil , "low" )
2019-11-26 22:38:11 +08:00
secondAgo := time . Now ( ) . Add ( - time . Second )
hourFromNow := time . Now ( ) . Add ( time . Hour )
tests := [ ] struct {
2020-08-09 21:26:14 +08:00
scheduled map [ string ] [ ] base . Z
retry map [ string ] [ ] base . Z
qnames [ ] string
2020-09-05 22:03:43 +08:00
wantPending map [ string ] [ ] * base . TaskMessage
2020-08-09 21:26:14 +08:00
wantScheduled map [ string ] [ ] * base . TaskMessage
wantRetry map [ string ] [ ] * base . TaskMessage
2019-11-26 22:38:11 +08:00
} {
{
2020-08-09 21:26:14 +08:00
scheduled : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : secondAgo . Unix ( ) } ,
{ Message : t2 , Score : secondAgo . Unix ( ) } ,
} ,
} ,
retry : map [ string ] [ ] base . Z {
"default" : { { Message : t3 , Score : secondAgo . Unix ( ) } } ,
2019-12-29 12:12:14 +08:00
} ,
2020-08-09 21:26:14 +08:00
qnames : [ ] string { "default" } ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 13:27:51 +08:00
"default" : { t1 , t2 , t3 } ,
} ,
2020-08-09 21:26:14 +08:00
wantScheduled : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
} ,
wantRetry : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
} ,
2019-11-26 22:38:11 +08:00
} ,
{
2020-08-09 21:26:14 +08:00
scheduled : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : hourFromNow . Unix ( ) } ,
{ Message : t2 , Score : secondAgo . Unix ( ) } ,
} ,
} ,
retry : map [ string ] [ ] base . Z {
"default" : { { Message : t3 , Score : secondAgo . Unix ( ) } } ,
} ,
qnames : [ ] string { "default" } ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 13:27:51 +08:00
"default" : { t2 , t3 } ,
} ,
2020-08-09 21:26:14 +08:00
wantScheduled : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 } ,
} ,
wantRetry : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
} ,
2019-11-26 22:38:11 +08:00
} ,
{
2020-08-09 21:26:14 +08:00
scheduled : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : hourFromNow . Unix ( ) } ,
{ Message : t2 , Score : hourFromNow . Unix ( ) } ,
} ,
} ,
retry : map [ string ] [ ] base . Z {
"default" : { { Message : t3 , Score : hourFromNow . Unix ( ) } } ,
} ,
qnames : [ ] string { "default" } ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 13:27:51 +08:00
"default" : { } ,
} ,
2020-08-09 21:26:14 +08:00
wantScheduled : map [ string ] [ ] * base . TaskMessage {
"default" : { t1 , t2 } ,
} ,
wantRetry : map [ string ] [ ] * base . TaskMessage {
"default" : { t3 } ,
} ,
2019-11-26 22:38:11 +08:00
} ,
2020-01-07 13:27:51 +08:00
{
2020-08-09 21:26:14 +08:00
scheduled : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : secondAgo . Unix ( ) } } ,
"critical" : { { Message : t4 , Score : secondAgo . Unix ( ) } } ,
"low" : { } ,
2020-01-07 13:27:51 +08:00
} ,
2020-08-09 21:26:14 +08:00
retry : map [ string ] [ ] base . Z {
"default" : { } ,
"critical" : { } ,
"low" : { { Message : t5 , Score : secondAgo . Unix ( ) } } ,
} ,
qnames : [ ] string { "default" , "critical" , "low" } ,
2020-09-05 22:03:43 +08:00
wantPending : map [ string ] [ ] * base . TaskMessage {
2020-01-07 13:27:51 +08:00
"default" : { t1 } ,
"critical" : { t4 } ,
"low" : { t5 } ,
} ,
2020-08-09 21:26:14 +08:00
wantScheduled : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
"critical" : { } ,
"low" : { } ,
} ,
wantRetry : map [ string ] [ ] * base . TaskMessage {
"default" : { } ,
"critical" : { } ,
"low" : { } ,
} ,
2020-01-07 13:27:51 +08:00
} ,
2019-11-26 22:38:11 +08:00
}
for _ , tc := range tests {
2019-12-29 12:12:14 +08:00
h . FlushDB ( t , r . client ) // clean up db before each test case
2020-08-09 21:26:14 +08:00
h . SeedAllScheduledQueues ( t , r . client , tc . scheduled )
h . SeedAllRetryQueues ( t , r . client , tc . retry )
2019-11-25 23:09:39 +08:00
2021-12-11 01:07:41 +08:00
now := time . Now ( )
r . SetClock ( timeutil . NewSimulatedClock ( now ) )
2021-03-13 08:23:08 +08:00
err := r . ForwardIfReady ( tc . qnames ... )
2019-11-26 22:38:11 +08:00
if err != nil {
2020-08-09 21:26:14 +08:00
t . Errorf ( "(*RDB).CheckScheduled(%v) = %v, want nil" , tc . qnames , err )
2019-11-26 22:38:11 +08:00
continue
}
2019-12-29 12:12:14 +08:00
2020-09-05 22:03:43 +08:00
for qname , want := range tc . wantPending {
gotPending := h . GetPendingMessages ( t , r . client , qname )
if diff := cmp . Diff ( want , gotPending , h . SortMsgOpt ) ; diff != "" {
2021-03-13 08:23:08 +08:00
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . PendingKey ( qname ) , diff )
2020-01-07 13:27:51 +08:00
}
2021-12-09 22:37:18 +08:00
// Make sure "pending_since" field is set
for _ , msg := range gotPending {
pendingSince := r . client . HGet ( context . Background ( ) , base . TaskKey ( msg . Queue , msg . ID ) , "pending_since" ) . Val ( )
2021-12-11 22:27:44 +08:00
if want := strconv . Itoa ( int ( now . UnixNano ( ) ) ) ; pendingSince != want {
2021-12-09 22:37:18 +08:00
t . Error ( "pending_since field is not set for newly pending message" )
}
}
2019-11-26 22:38:11 +08:00
}
2020-08-09 21:26:14 +08:00
for qname , want := range tc . wantScheduled {
gotScheduled := h . GetScheduledMessages ( t , r . client , qname )
if diff := cmp . Diff ( want , gotScheduled , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . ScheduledKey ( qname ) , diff )
}
2019-12-13 11:49:41 +08:00
}
2020-08-09 21:26:14 +08:00
for qname , want := range tc . wantRetry {
gotRetry := h . GetRetryMessages ( t , r . client , qname )
if diff := cmp . Diff ( want , gotRetry , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "mismatch found in %q; (-want, +got)\n%s" , base . RetryKey ( qname ) , diff )
}
2019-11-26 22:38:11 +08:00
}
2019-11-25 23:09:39 +08:00
}
}
2020-01-31 22:48:58 +08:00
2021-11-06 07:52:54 +08:00
func newCompletedTask ( qname , typename string , payload [ ] byte , completedAt time . Time ) * base . TaskMessage {
msg := h . NewTaskMessageWithQueue ( typename , payload , qname )
msg . CompletedAt = completedAt . Unix ( )
return msg
}
func TestDeleteExpiredCompletedTasks ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
now := time . Now ( )
secondAgo := now . Add ( - time . Second )
hourFromNow := now . Add ( time . Hour )
hourAgo := now . Add ( - time . Hour )
minuteAgo := now . Add ( - time . Minute )
t1 := newCompletedTask ( "default" , "task1" , nil , hourAgo )
t2 := newCompletedTask ( "default" , "task2" , nil , minuteAgo )
t3 := newCompletedTask ( "default" , "task3" , nil , secondAgo )
t4 := newCompletedTask ( "critical" , "critical_task" , nil , hourAgo )
t5 := newCompletedTask ( "low" , "low_priority_task" , nil , hourAgo )
tests := [ ] struct {
desc string
completed map [ string ] [ ] base . Z
qname string
wantCompleted map [ string ] [ ] base . Z
} {
{
desc : "deletes expired task from default queue" ,
completed : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : secondAgo . Unix ( ) } ,
{ Message : t2 , Score : hourFromNow . Unix ( ) } ,
{ Message : t3 , Score : now . Unix ( ) } ,
} ,
} ,
qname : "default" ,
wantCompleted : map [ string ] [ ] base . Z {
"default" : {
{ Message : t2 , Score : hourFromNow . Unix ( ) } ,
} ,
} ,
} ,
{
desc : "deletes expired task from specified queue" ,
completed : map [ string ] [ ] base . Z {
"default" : {
{ Message : t2 , Score : secondAgo . Unix ( ) } ,
} ,
"critical" : {
{ Message : t4 , Score : secondAgo . Unix ( ) } ,
} ,
"low" : {
{ Message : t5 , Score : now . Unix ( ) } ,
} ,
} ,
qname : "critical" ,
wantCompleted : map [ string ] [ ] base . Z {
"default" : {
{ Message : t2 , Score : secondAgo . Unix ( ) } ,
} ,
"critical" : { } ,
"low" : {
{ Message : t5 , Score : now . Unix ( ) } ,
} ,
} ,
} ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client )
h . SeedAllCompletedQueues ( t , r . client , tc . completed )
if err := r . DeleteExpiredCompletedTasks ( tc . qname ) ; err != nil {
t . Errorf ( "DeleteExpiredCompletedTasks(%q) failed: %v" , tc . qname , err )
continue
}
for qname , want := range tc . wantCompleted {
got := h . GetCompletedEntries ( t , r . client , qname )
if diff := cmp . Diff ( want , got , h . SortZSetEntryOpt ) ; diff != "" {
t . Errorf ( "%s: diff found in %q completed set: want=%v, got=%v\n%s" , tc . desc , qname , want , got , diff )
}
}
}
}
2020-06-20 21:29:58 +08:00
func TestListDeadlineExceeded ( t * testing . T ) {
2020-08-10 20:37:49 +08:00
t1 := h . NewTaskMessageWithQueue ( "task1" , nil , "default" )
t2 := h . NewTaskMessageWithQueue ( "task2" , nil , "default" )
2020-06-20 21:29:58 +08:00
t3 := h . NewTaskMessageWithQueue ( "task3" , nil , "critical" )
now := time . Now ( )
oneHourFromNow := now . Add ( 1 * time . Hour )
fiveMinutesFromNow := now . Add ( 5 * time . Minute )
fiveMinutesAgo := now . Add ( - 5 * time . Minute )
oneHourAgo := now . Add ( - 1 * time . Hour )
tests := [ ] struct {
desc string
2020-08-10 20:37:49 +08:00
deadlines map [ string ] [ ] base . Z
qnames [ ] string
2020-06-20 21:29:58 +08:00
t time . Time
want [ ] * base . TaskMessage
} {
{
2020-09-06 03:43:15 +08:00
desc : "with a single active task" ,
2020-08-10 20:37:49 +08:00
deadlines : map [ string ] [ ] base . Z {
"default" : { { Message : t1 , Score : fiveMinutesAgo . Unix ( ) } } ,
2020-06-20 21:29:58 +08:00
} ,
2020-08-10 20:37:49 +08:00
qnames : [ ] string { "default" } ,
t : time . Now ( ) ,
want : [ ] * base . TaskMessage { t1 } ,
2020-06-20 21:29:58 +08:00
} ,
{
2020-09-06 03:43:15 +08:00
desc : "with multiple active tasks, and one expired" ,
2020-08-10 20:37:49 +08:00
deadlines : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : oneHourAgo . Unix ( ) } ,
{ Message : t2 , Score : fiveMinutesFromNow . Unix ( ) } ,
} ,
"critical" : {
{ Message : t3 , Score : oneHourFromNow . Unix ( ) } ,
} ,
2020-06-20 21:29:58 +08:00
} ,
2020-08-10 20:37:49 +08:00
qnames : [ ] string { "default" , "critical" } ,
t : time . Now ( ) ,
want : [ ] * base . TaskMessage { t1 } ,
2020-06-20 21:29:58 +08:00
} ,
{
2020-09-06 03:43:15 +08:00
desc : "with multiple expired active tasks" ,
2020-08-10 20:37:49 +08:00
deadlines : map [ string ] [ ] base . Z {
"default" : {
{ Message : t1 , Score : oneHourAgo . Unix ( ) } ,
{ Message : t2 , Score : oneHourFromNow . Unix ( ) } ,
} ,
"critical" : {
{ Message : t3 , Score : fiveMinutesAgo . Unix ( ) } ,
} ,
2020-06-20 21:29:58 +08:00
} ,
2020-08-10 20:37:49 +08:00
qnames : [ ] string { "default" , "critical" } ,
t : time . Now ( ) ,
2020-08-11 12:49:12 +08:00
want : [ ] * base . TaskMessage { t1 , t3 } ,
2020-06-20 21:29:58 +08:00
} ,
{
2020-09-06 03:43:15 +08:00
desc : "with empty active queue" ,
2020-08-10 20:37:49 +08:00
deadlines : map [ string ] [ ] base . Z {
"default" : { } ,
"critical" : { } ,
} ,
qnames : [ ] string { "default" , "critical" } ,
t : time . Now ( ) ,
want : [ ] * base . TaskMessage { } ,
2020-06-20 21:29:58 +08:00
} ,
}
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-06-20 21:29:58 +08:00
for _ , tc := range tests {
h . FlushDB ( t , r . client )
2020-08-10 20:37:49 +08:00
h . SeedAllDeadlines ( t , r . client , tc . deadlines )
2020-06-20 21:29:58 +08:00
2020-08-10 20:37:49 +08:00
got , err := r . ListDeadlineExceeded ( tc . t , tc . qnames ... )
2020-06-20 21:29:58 +08:00
if err != nil {
t . Errorf ( "%s; ListDeadlineExceeded(%v) returned error: %v" , tc . desc , tc . t , err )
continue
}
if diff := cmp . Diff ( tc . want , got , h . SortMsgOpt ) ; diff != "" {
t . Errorf ( "%s; ListDeadlineExceeded(%v) returned %v, want %v;(-want,+got)\n%s" ,
tc . desc , tc . t , got , tc . want , diff )
}
}
}
2020-04-13 07:42:11 +08:00
func TestWriteServerState ( t * testing . T ) {
2020-01-31 22:48:58 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-02-21 23:18:22 +08:00
2020-05-19 11:47:35 +08:00
var (
host = "localhost"
pid = 4242
serverID = "server123"
2020-02-21 23:18:22 +08:00
2020-05-19 11:47:35 +08:00
ttl = 5 * time . Second
)
2020-02-21 23:18:22 +08:00
2020-05-19 11:47:35 +08:00
info := base . ServerInfo {
Host : host ,
PID : pid ,
ServerID : serverID ,
Concurrency : 10 ,
Queues : map [ string ] int { "default" : 2 , "email" : 5 , "low" : 1 } ,
StrictPriority : false ,
2021-03-13 08:23:08 +08:00
Started : time . Now ( ) . UTC ( ) ,
2021-03-23 21:20:54 +08:00
Status : "active" ,
2020-05-19 11:47:35 +08:00
ActiveWorkerCount : 0 ,
}
err := r . WriteServerState ( & info , nil /* workers */ , ttl )
2020-02-21 23:18:22 +08:00
if err != nil {
2020-04-13 07:42:11 +08:00
t . Errorf ( "r.WriteServerState returned an error: %v" , err )
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
// Check ServerInfo was written correctly.
skey := base . ServerInfoKey ( host , pid , serverID )
2021-09-02 20:56:02 +08:00
data := r . client . Get ( context . Background ( ) , skey ) . Val ( )
2021-03-13 08:23:08 +08:00
got , err := base . DecodeServerInfo ( [ ] byte ( data ) )
2020-02-21 23:18:22 +08:00
if err != nil {
2021-03-13 08:23:08 +08:00
t . Fatalf ( "could not decode server info: %v" , err )
2020-02-21 23:18:22 +08:00
}
2021-03-13 08:23:08 +08:00
if diff := cmp . Diff ( info , * got ) ; diff != "" {
2020-04-13 07:42:11 +08:00
t . Errorf ( "persisted ServerInfo was %v, want %v; (-want,+got)\n%s" ,
2020-05-19 11:47:35 +08:00
got , info , diff )
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
// Check ServerInfo TTL was set correctly.
2021-09-02 20:56:02 +08:00
gotTTL := r . client . TTL ( context . Background ( ) , skey ) . Val ( )
2020-03-18 21:49:39 +08:00
if ! cmp . Equal ( ttl . Seconds ( ) , gotTTL . Seconds ( ) , cmpopts . EquateApprox ( 0 , 1 ) ) {
2020-04-13 07:42:11 +08:00
t . Errorf ( "TTL of %q was %v, want %v" , skey , gotTTL , ttl )
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
// Check ServerInfo key was added to the set all server keys correctly.
2021-09-02 20:56:02 +08:00
gotServerKeys := r . client . ZRange ( context . Background ( ) , base . AllServers , 0 , - 1 ) . Val ( )
2020-05-19 11:47:35 +08:00
wantServerKeys := [ ] string { skey }
if diff := cmp . Diff ( wantServerKeys , gotServerKeys ) ; diff != "" {
t . Errorf ( "%q contained %v, want %v" , base . AllServers , gotServerKeys , wantServerKeys )
2020-01-31 22:48:58 +08:00
}
2020-05-19 11:47:35 +08:00
// Check WorkersInfo was written correctly.
wkey := base . WorkersKey ( host , pid , serverID )
2021-09-02 20:56:02 +08:00
workerExist := r . client . Exists ( context . Background ( ) , wkey ) . Val ( )
2020-02-21 23:18:22 +08:00
if workerExist != 0 {
t . Errorf ( "%q key exists" , wkey )
}
2020-05-19 11:47:35 +08:00
// Check WorkersInfo key was added to the set correctly.
2021-09-02 20:56:02 +08:00
gotWorkerKeys := r . client . ZRange ( context . Background ( ) , base . AllWorkers , 0 , - 1 ) . Val ( )
2020-02-21 23:18:22 +08:00
wantWorkerKeys := [ ] string { wkey }
if diff := cmp . Diff ( wantWorkerKeys , gotWorkerKeys ) ; diff != "" {
t . Errorf ( "%q contained %v, want %v" , base . AllWorkers , gotWorkerKeys , wantWorkerKeys )
2020-01-31 22:48:58 +08:00
}
2020-02-21 23:18:22 +08:00
}
2020-01-31 22:48:58 +08:00
2020-04-13 07:42:11 +08:00
func TestWriteServerStateWithWorkers ( t * testing . T ) {
2020-02-21 23:18:22 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-01-31 22:48:58 +08:00
2020-05-19 11:47:35 +08:00
var (
host = "127.0.0.1"
pid = 4242
serverID = "server123"
2021-03-21 04:42:13 +08:00
msg1 = h . NewTaskMessage ( "send_email" , h . JSON ( map [ string ] interface { } { "user_id" : "123" } ) )
msg2 = h . NewTaskMessage ( "gen_thumbnail" , h . JSON ( map [ string ] interface { } { "path" : "some/path/to/imgfile" } ) )
2020-05-19 11:47:35 +08:00
ttl = 5 * time . Second
)
workers := [ ] * base . WorkerInfo {
{
Host : host ,
PID : pid ,
2021-09-10 21:29:37 +08:00
ID : msg1 . ID ,
2020-05-19 11:47:35 +08:00
Type : msg1 . Type ,
Queue : msg1 . Queue ,
Payload : msg1 . Payload ,
Started : time . Now ( ) . Add ( - 10 * time . Second ) ,
} ,
{
Host : host ,
PID : pid ,
2021-09-10 21:29:37 +08:00
ID : msg2 . ID ,
2020-05-19 11:47:35 +08:00
Type : msg2 . Type ,
Queue : msg2 . Queue ,
Payload : msg2 . Payload ,
Started : time . Now ( ) . Add ( - 2 * time . Minute ) ,
} ,
}
serverInfo := base . ServerInfo {
Host : host ,
PID : pid ,
ServerID : serverID ,
Concurrency : 10 ,
Queues : map [ string ] int { "default" : 2 , "email" : 5 , "low" : 1 } ,
StrictPriority : false ,
2021-03-13 08:23:08 +08:00
Started : time . Now ( ) . Add ( - 10 * time . Minute ) . UTC ( ) ,
2021-03-23 21:20:54 +08:00
Status : "active" ,
2020-05-19 11:47:35 +08:00
ActiveWorkerCount : len ( workers ) ,
}
2020-01-31 22:48:58 +08:00
2020-05-19 11:47:35 +08:00
err := r . WriteServerState ( & serverInfo , workers , ttl )
2020-02-21 23:18:22 +08:00
if err != nil {
2020-05-19 11:47:35 +08:00
t . Fatalf ( "r.WriteServerState returned an error: %v" , err )
2020-02-21 23:18:22 +08:00
}
2020-01-31 22:48:58 +08:00
2020-05-19 11:47:35 +08:00
// Check ServerInfo was written correctly.
skey := base . ServerInfoKey ( host , pid , serverID )
2021-09-02 20:56:02 +08:00
data := r . client . Get ( context . Background ( ) , skey ) . Val ( )
2021-03-13 08:23:08 +08:00
got , err := base . DecodeServerInfo ( [ ] byte ( data ) )
2020-02-21 23:18:22 +08:00
if err != nil {
2021-03-13 08:23:08 +08:00
t . Fatalf ( "could not decode server info: %v" , err )
2020-02-21 23:18:22 +08:00
}
2021-03-13 08:23:08 +08:00
if diff := cmp . Diff ( serverInfo , * got ) ; diff != "" {
2020-04-13 07:42:11 +08:00
t . Errorf ( "persisted ServerInfo was %v, want %v; (-want,+got)\n%s" ,
2020-05-19 11:47:35 +08:00
got , serverInfo , diff )
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
// Check ServerInfo TTL was set correctly.
2021-09-02 20:56:02 +08:00
gotTTL := r . client . TTL ( context . Background ( ) , skey ) . Val ( )
2020-03-18 21:49:39 +08:00
if ! cmp . Equal ( ttl . Seconds ( ) , gotTTL . Seconds ( ) , cmpopts . EquateApprox ( 0 , 1 ) ) {
2020-04-13 07:42:11 +08:00
t . Errorf ( "TTL of %q was %v, want %v" , skey , gotTTL , ttl )
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
// Check ServerInfo key was added to the set correctly.
2021-09-02 20:56:02 +08:00
gotServerKeys := r . client . ZRange ( context . Background ( ) , base . AllServers , 0 , - 1 ) . Val ( )
2020-05-19 11:47:35 +08:00
wantServerKeys := [ ] string { skey }
if diff := cmp . Diff ( wantServerKeys , gotServerKeys ) ; diff != "" {
t . Errorf ( "%q contained %v, want %v" , base . AllServers , gotServerKeys , wantServerKeys )
2020-02-21 23:18:22 +08:00
}
2020-01-31 22:48:58 +08:00
2020-05-19 11:47:35 +08:00
// Check WorkersInfo was written correctly.
wkey := base . WorkersKey ( host , pid , serverID )
2021-09-02 20:56:02 +08:00
wdata := r . client . HGetAll ( context . Background ( ) , wkey ) . Val ( )
2020-02-21 23:18:22 +08:00
if len ( wdata ) != 2 {
t . Fatalf ( "HGETALL %q returned a hash of size %d, want 2" , wkey , len ( wdata ) )
}
2020-05-19 11:47:35 +08:00
var gotWorkers [ ] * base . WorkerInfo
for _ , val := range wdata {
2021-03-13 08:23:08 +08:00
w , err := base . DecodeWorkerInfo ( [ ] byte ( val ) )
if err != nil {
2020-02-21 23:18:22 +08:00
t . Fatalf ( "could not unmarshal worker's data: %v" , err )
2020-01-31 22:48:58 +08:00
}
2021-03-13 08:23:08 +08:00
gotWorkers = append ( gotWorkers , w )
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
if diff := cmp . Diff ( workers , gotWorkers , h . SortWorkerInfoOpt ) ; diff != "" {
2020-02-21 23:18:22 +08:00
t . Errorf ( "persisted workers info was %v, want %v; (-want,+got)\n%s" ,
2020-05-19 11:47:35 +08:00
gotWorkers , workers , diff )
2020-02-21 23:18:22 +08:00
}
2020-02-02 14:22:48 +08:00
2020-05-19 11:47:35 +08:00
// Check WorkersInfo TTL was set correctly.
2021-09-02 20:56:02 +08:00
gotTTL = r . client . TTL ( context . Background ( ) , wkey ) . Val ( )
2020-05-19 11:47:35 +08:00
if ! cmp . Equal ( ttl . Seconds ( ) , gotTTL . Seconds ( ) , cmpopts . EquateApprox ( 0 , 1 ) ) {
2020-02-21 23:18:22 +08:00
t . Errorf ( "TTL of %q was %v, want %v" , wkey , gotTTL , ttl )
}
2020-05-19 11:47:35 +08:00
// Check WorkersInfo key was added to the set correctly.
2021-09-02 20:56:02 +08:00
gotWorkerKeys := r . client . ZRange ( context . Background ( ) , base . AllWorkers , 0 , - 1 ) . Val ( )
2020-02-21 23:18:22 +08:00
wantWorkerKeys := [ ] string { wkey }
if diff := cmp . Diff ( wantWorkerKeys , gotWorkerKeys ) ; diff != "" {
t . Errorf ( "%q contained %v, want %v" , base . AllWorkers , gotWorkerKeys , wantWorkerKeys )
}
}
2020-02-02 14:22:48 +08:00
2020-04-13 07:42:11 +08:00
func TestClearServerState ( t * testing . T ) {
2020-02-21 23:18:22 +08:00
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-02-02 14:22:48 +08:00
2020-05-19 11:47:35 +08:00
var (
host = "127.0.0.1"
pid = 1234
serverID = "server123"
2020-02-02 14:22:48 +08:00
2020-05-19 11:47:35 +08:00
otherHost = "127.0.0.2"
otherPID = 9876
otherServerID = "server987"
2021-03-21 04:42:13 +08:00
msg1 = h . NewTaskMessage ( "send_email" , h . JSON ( map [ string ] interface { } { "user_id" : "123" } ) )
msg2 = h . NewTaskMessage ( "gen_thumbnail" , h . JSON ( map [ string ] interface { } { "path" : "some/path/to/imgfile" } ) )
2020-05-19 11:47:35 +08:00
ttl = 5 * time . Second
)
workers1 := [ ] * base . WorkerInfo {
{
Host : host ,
PID : pid ,
2021-09-10 21:29:37 +08:00
ID : msg1 . ID ,
2020-05-19 11:47:35 +08:00
Type : msg1 . Type ,
Queue : msg1 . Queue ,
Payload : msg1 . Payload ,
Started : time . Now ( ) . Add ( - 10 * time . Second ) ,
} ,
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
serverInfo1 := base . ServerInfo {
Host : host ,
PID : pid ,
ServerID : serverID ,
Concurrency : 10 ,
Queues : map [ string ] int { "default" : 2 , "email" : 5 , "low" : 1 } ,
StrictPriority : false ,
Started : time . Now ( ) . Add ( - 10 * time . Minute ) ,
2021-03-23 21:20:54 +08:00
Status : "active" ,
2020-05-19 11:47:35 +08:00
ActiveWorkerCount : len ( workers1 ) ,
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
workers2 := [ ] * base . WorkerInfo {
{
Host : otherHost ,
PID : otherPID ,
2021-09-10 21:29:37 +08:00
ID : msg2 . ID ,
2020-05-19 11:47:35 +08:00
Type : msg2 . Type ,
Queue : msg2 . Queue ,
Payload : msg2 . Payload ,
Started : time . Now ( ) . Add ( - 30 * time . Second ) ,
} ,
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
serverInfo2 := base . ServerInfo {
Host : otherHost ,
PID : otherPID ,
ServerID : otherServerID ,
Concurrency : 10 ,
Queues : map [ string ] int { "default" : 2 , "email" : 5 , "low" : 1 } ,
StrictPriority : false ,
Started : time . Now ( ) . Add ( - 15 * time . Minute ) ,
2021-03-23 21:20:54 +08:00
Status : "active" ,
2020-05-19 11:47:35 +08:00
ActiveWorkerCount : len ( workers2 ) ,
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
// Write server and workers data.
if err := r . WriteServerState ( & serverInfo1 , workers1 , ttl ) ; err != nil {
t . Fatalf ( "could not write server state: %v" , err )
2020-02-21 23:18:22 +08:00
}
2020-05-19 11:47:35 +08:00
if err := r . WriteServerState ( & serverInfo2 , workers2 , ttl ) ; err != nil {
t . Fatalf ( "could not write server state: %v" , err )
2020-02-21 23:18:22 +08:00
}
2020-02-02 14:22:48 +08:00
2020-05-19 11:47:35 +08:00
err := r . ClearServerState ( host , pid , serverID )
2020-02-21 23:18:22 +08:00
if err != nil {
2020-04-13 07:42:11 +08:00
t . Fatalf ( "(*RDB).ClearServerState failed: %v" , err )
2020-02-21 23:18:22 +08:00
}
2020-02-02 14:22:48 +08:00
2020-05-19 11:47:35 +08:00
skey := base . ServerInfoKey ( host , pid , serverID )
wkey := base . WorkersKey ( host , pid , serverID )
otherSKey := base . ServerInfoKey ( otherHost , otherPID , otherServerID )
otherWKey := base . WorkersKey ( otherHost , otherPID , otherServerID )
// Check all keys are cleared.
2021-09-02 20:56:02 +08:00
if r . client . Exists ( context . Background ( ) , skey ) . Val ( ) != 0 {
2020-04-13 07:42:11 +08:00
t . Errorf ( "Redis key %q exists" , skey )
2020-02-21 23:18:22 +08:00
}
2021-09-02 20:56:02 +08:00
if r . client . Exists ( context . Background ( ) , wkey ) . Val ( ) != 0 {
2020-02-21 23:18:22 +08:00
t . Errorf ( "Redis key %q exists" , wkey )
}
2021-09-02 20:56:02 +08:00
gotServerKeys := r . client . ZRange ( context . Background ( ) , base . AllServers , 0 , - 1 ) . Val ( )
2020-05-19 11:47:35 +08:00
wantServerKeys := [ ] string { otherSKey }
if diff := cmp . Diff ( wantServerKeys , gotServerKeys ) ; diff != "" {
t . Errorf ( "%q contained %v, want %v" , base . AllServers , gotServerKeys , wantServerKeys )
2020-02-21 23:18:22 +08:00
}
2021-09-02 20:56:02 +08:00
gotWorkerKeys := r . client . ZRange ( context . Background ( ) , base . AllWorkers , 0 , - 1 ) . Val ( )
2020-02-21 23:18:22 +08:00
wantWorkerKeys := [ ] string { otherWKey }
if diff := cmp . Diff ( wantWorkerKeys , gotWorkerKeys ) ; diff != "" {
t . Errorf ( "%q contained %v, want %v" , base . AllWorkers , gotWorkerKeys , wantWorkerKeys )
2020-01-31 22:48:58 +08:00
}
}
2020-02-23 06:30:24 +08:00
func TestCancelationPubSub ( t * testing . T ) {
r := setup ( t )
2020-09-08 21:51:01 +08:00
defer r . Close ( )
2020-02-23 06:30:24 +08:00
pubsub , err := r . CancelationPubSub ( )
if err != nil {
t . Fatalf ( "(*RDB).CancelationPubSub() returned an error: %v" , err )
}
cancelCh := pubsub . Channel ( )
var (
mu sync . Mutex
received [ ] string
)
go func ( ) {
for msg := range cancelCh {
mu . Lock ( )
received = append ( received , msg . Payload )
mu . Unlock ( )
}
} ( )
publish := [ ] string { "one" , "two" , "three" }
for _ , msg := range publish {
r . PublishCancelation ( msg )
}
// allow for message to reach subscribers.
time . Sleep ( time . Second )
pubsub . Close ( )
mu . Lock ( )
if diff := cmp . Diff ( publish , received , h . SortStringSliceOpt ) ; diff != "" {
t . Errorf ( "subscriber received %v, want %v; (-want,+got)\n%s" , received , publish , diff )
}
mu . Unlock ( )
}
2021-11-06 07:52:54 +08:00
func TestWriteResult ( t * testing . T ) {
r := setup ( t )
defer r . Close ( )
tests := [ ] struct {
qname string
taskID string
data [ ] byte
} {
{
qname : "default" ,
taskID : uuid . NewString ( ) ,
data : [ ] byte ( "hello" ) ,
} ,
}
for _ , tc := range tests {
h . FlushDB ( t , r . client )
n , err := r . WriteResult ( tc . qname , tc . taskID , tc . data )
if err != nil {
t . Errorf ( "WriteResult failed: %v" , err )
continue
}
if n != len ( tc . data ) {
t . Errorf ( "WriteResult returned %d, want %d" , n , len ( tc . data ) )
}
taskKey := base . TaskKey ( tc . qname , tc . taskID )
got := r . client . HGet ( context . Background ( ) , taskKey , "result" ) . Val ( )
if got != string ( tc . data ) {
t . Errorf ( "`result` field under %q key is set to %q, want %q" , taskKey , got , string ( tc . data ) )
}
}
}