diff --git a/go.mod b/go.mod index 12e3d14..0ae1b01 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,6 @@ go 1.13 require ( github.com/go-redis/redis v6.15.6+incompatible github.com/go-redis/redis/v7 v7.0.0-beta.4 + github.com/google/go-cmp v0.3.1 github.com/google/uuid v1.1.1 ) diff --git a/go.sum b/go.sum index a2a817c..7177735 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/go-redis/redis v6.15.6+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8w github.com/go-redis/redis/v7 v7.0.0-beta.4 h1:p6z7Pde69EGRWvlC++y8aFcaWegyrKHzOBGo0zUACTQ= github.com/go-redis/redis/v7 v7.0.0-beta.4/go.mod h1:xhhSbUMTsleRPur+Vgx9sUHtyN33bdjxY+9/0n9Ig8s= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/manager.go b/manager.go index dad8bee..1071320 100644 --- a/manager.go +++ b/manager.go @@ -62,6 +62,7 @@ func (m *manager) start() { func (m *manager) processTasks() { // pull message out of the queue and process it // TODO(hibiken): sort the list of queues in order of priority + // NOTE: BLPOP needs to timeout in case a new queue is added. msg, err := m.rdb.bpop(5*time.Second, m.rdb.listQueues()...) if err != nil { switch err { diff --git a/rdb.go b/rdb.go index d2371bf..4492a0e 100644 --- a/rdb.go +++ b/rdb.go @@ -55,10 +55,10 @@ func (r *rdb) push(msg *taskMessage) error { return nil } -// bpop blocks until there is a taskMessage available to be processed. -// bpop returns immediately if there are already taskMessages waiting to be processed. +// bpop blocks until there is a taskMessage available to be processed, +// returns immediately if there are already tasks waiting to be processed. func (r *rdb) bpop(timeout time.Duration, keys ...string) (*taskMessage, error) { - res, err := r.client.BLPop(5*time.Second, keys...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added. + res, err := r.client.BLPop(timeout, keys...).Result() if err != nil { if err != redis.Nil { return nil, fmt.Errorf("command BLPOP %v %v failed: %v", @@ -150,6 +150,12 @@ func (r *rdb) kill(msg *taskMessage) error { } // listQueues returns the list of all queues. +// NOTE: Add default to the slice if empty because +// BLPOP will error out if empty list is passed. func (r *rdb) listQueues() []string { - return r.client.SMembers(allQueues).Val() + queues := r.client.SMembers(allQueues).Val() + if len(queues) == 0 { + queues = append(queues, queuePrefix+"default") + } + return queues } diff --git a/rdb_test.go b/rdb_test.go new file mode 100644 index 0000000..410e5a8 --- /dev/null +++ b/rdb_test.go @@ -0,0 +1,79 @@ +package asynq + +import ( + "encoding/json" + "testing" + "time" + + "github.com/go-redis/redis/v7" + "github.com/google/go-cmp/cmp" +) + +var client *redis.Client + +// setup connects to a redis database and flush all keys +// before returning an instance of rdb. +func setup() *rdb { + client = redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + DB: 15, // use database 15 to separate from other applications + }) + // Start each test with a clean slate. + if err := client.FlushDB().Err(); err != nil { + panic(err) + } + return newRDB(client) +} + +func TestPush(t *testing.T) { + r := setup() + msg := &taskMessage{ + Type: "sendEmail", + Queue: "default", + Retry: 10, + } + + err := r.push(msg) + if err != nil { + t.Fatalf("could not push message to queue: %v", err) + } + + res := client.LRange("asynq:queues:default", 0, -1).Val() + if len(res) != 1 { + t.Fatalf("len(res) = %d, want %d", len(res), 1) + } + bytes, err := json.Marshal(msg) + if err != nil { + t.Fatalf("json.Marshal(msg) failed: %v", err) + } + if res[0] != string(bytes) { + t.Fatalf("res[0] = %s, want %s", res[0], string(bytes)) + } +} + +func TestBPopImmediateReturn(t *testing.T) { + r := setup() + msg := &taskMessage{ + Type: "GenerateCSVExport", + Queue: "csv", + Retry: 10, + } + r.push(msg) + + res, err := r.bpop(time.Second, "asynq:queues:csv") + if err != nil { + t.Fatalf("r.bpop() failed: %v", err) + } + if !cmp.Equal(res, msg) { + t.Errorf("cmp.Equal(res, msg) = %t, want %t", false, true) + } +} + +func TestBPopTimeout(t *testing.T) { + r := setup() + + _, err := r.bpop(time.Second, "asynq:queues:default") + if err != errQueuePopTimeout { + t.Errorf("err = %v, want %v", err, errQueuePopTimeout) + } +}