2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Add tests for type rdb

This commit is contained in:
Ken Hibino 2019-11-20 07:01:24 -08:00
parent e9069bfb47
commit dd0b0b358c
5 changed files with 93 additions and 4 deletions

1
go.mod
View File

@ -5,5 +5,6 @@ go 1.13
require ( require (
github.com/go-redis/redis v6.15.6+incompatible github.com/go-redis/redis v6.15.6+incompatible
github.com/go-redis/redis/v7 v7.0.0-beta.4 github.com/go-redis/redis/v7 v7.0.0-beta.4
github.com/google/go-cmp v0.3.1
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
) )

2
go.sum
View File

@ -4,6 +4,8 @@ github.com/go-redis/redis v6.15.6+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8w
github.com/go-redis/redis/v7 v7.0.0-beta.4 h1:p6z7Pde69EGRWvlC++y8aFcaWegyrKHzOBGo0zUACTQ= github.com/go-redis/redis/v7 v7.0.0-beta.4 h1:p6z7Pde69EGRWvlC++y8aFcaWegyrKHzOBGo0zUACTQ=
github.com/go-redis/redis/v7 v7.0.0-beta.4/go.mod h1:xhhSbUMTsleRPur+Vgx9sUHtyN33bdjxY+9/0n9Ig8s= github.com/go-redis/redis/v7 v7.0.0-beta.4/go.mod h1:xhhSbUMTsleRPur+Vgx9sUHtyN33bdjxY+9/0n9Ig8s=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=

View File

@ -62,6 +62,7 @@ func (m *manager) start() {
func (m *manager) processTasks() { func (m *manager) processTasks() {
// pull message out of the queue and process it // pull message out of the queue and process it
// TODO(hibiken): sort the list of queues in order of priority // TODO(hibiken): sort the list of queues in order of priority
// NOTE: BLPOP needs to timeout in case a new queue is added.
msg, err := m.rdb.bpop(5*time.Second, m.rdb.listQueues()...) msg, err := m.rdb.bpop(5*time.Second, m.rdb.listQueues()...)
if err != nil { if err != nil {
switch err { switch err {

14
rdb.go
View File

@ -55,10 +55,10 @@ func (r *rdb) push(msg *taskMessage) error {
return nil return nil
} }
// bpop blocks until there is a taskMessage available to be processed. // bpop blocks until there is a taskMessage available to be processed,
// bpop returns immediately if there are already taskMessages waiting to be processed. // returns immediately if there are already tasks waiting to be processed.
func (r *rdb) bpop(timeout time.Duration, keys ...string) (*taskMessage, error) { func (r *rdb) bpop(timeout time.Duration, keys ...string) (*taskMessage, error) {
res, err := r.client.BLPop(5*time.Second, keys...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added. res, err := r.client.BLPop(timeout, keys...).Result()
if err != nil { if err != nil {
if err != redis.Nil { if err != redis.Nil {
return nil, fmt.Errorf("command BLPOP %v %v failed: %v", return nil, fmt.Errorf("command BLPOP %v %v failed: %v",
@ -150,6 +150,12 @@ func (r *rdb) kill(msg *taskMessage) error {
} }
// listQueues returns the list of all queues. // listQueues returns the list of all queues.
// NOTE: Add default to the slice if empty because
// BLPOP will error out if empty list is passed.
func (r *rdb) listQueues() []string { func (r *rdb) listQueues() []string {
return r.client.SMembers(allQueues).Val() queues := r.client.SMembers(allQueues).Val()
if len(queues) == 0 {
queues = append(queues, queuePrefix+"default")
}
return queues
} }

79
rdb_test.go Normal file
View File

@ -0,0 +1,79 @@
package asynq
import (
"encoding/json"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
)
var client *redis.Client
// setup connects to a redis database and flush all keys
// before returning an instance of rdb.
func setup() *rdb {
client = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DB: 15, // use database 15 to separate from other applications
})
// Start each test with a clean slate.
if err := client.FlushDB().Err(); err != nil {
panic(err)
}
return newRDB(client)
}
func TestPush(t *testing.T) {
r := setup()
msg := &taskMessage{
Type: "sendEmail",
Queue: "default",
Retry: 10,
}
err := r.push(msg)
if err != nil {
t.Fatalf("could not push message to queue: %v", err)
}
res := client.LRange("asynq:queues:default", 0, -1).Val()
if len(res) != 1 {
t.Fatalf("len(res) = %d, want %d", len(res), 1)
}
bytes, err := json.Marshal(msg)
if err != nil {
t.Fatalf("json.Marshal(msg) failed: %v", err)
}
if res[0] != string(bytes) {
t.Fatalf("res[0] = %s, want %s", res[0], string(bytes))
}
}
func TestBPopImmediateReturn(t *testing.T) {
r := setup()
msg := &taskMessage{
Type: "GenerateCSVExport",
Queue: "csv",
Retry: 10,
}
r.push(msg)
res, err := r.bpop(time.Second, "asynq:queues:csv")
if err != nil {
t.Fatalf("r.bpop() failed: %v", err)
}
if !cmp.Equal(res, msg) {
t.Errorf("cmp.Equal(res, msg) = %t, want %t", false, true)
}
}
func TestBPopTimeout(t *testing.T) {
r := setup()
_, err := r.bpop(time.Second, "asynq:queues:default")
if err != errQueuePopTimeout {
t.Errorf("err = %v, want %v", err, errQueuePopTimeout)
}
}