mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Update RDB.Dequeue to insert task ID to lease set
This commit is contained in:
parent
852af7abd1
commit
d865d89900
@ -423,6 +423,13 @@ func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) [
|
|||||||
return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive)
|
return getMessagesFromZSetWithScores(tb, r, qname, base.DeadlinesKey, base.TaskStateActive)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetLeaseEntries returns all task IDs and its score in the lease set for the given queue.
|
||||||
|
// It also asserts the state field of the task.
|
||||||
|
func GetLeaseEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||||
|
tb.Helper()
|
||||||
|
return getMessagesFromZSetWithScores(tb, r, qname, base.LeaseKey, base.TaskStateActive)
|
||||||
|
}
|
||||||
|
|
||||||
// GetCompletedEntries returns all completed messages and its score in the given queue.
|
// GetCompletedEntries returns all completed messages and its score in the given queue.
|
||||||
// It also asserts the state field of the task.
|
// It also asserts the state field of the task.
|
||||||
func GetCompletedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
func GetCompletedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
|
||||||
|
@ -615,7 +615,7 @@ type Broker interface {
|
|||||||
Ping() error
|
Ping() error
|
||||||
Enqueue(ctx context.Context, msg *TaskMessage) error
|
Enqueue(ctx context.Context, msg *TaskMessage) error
|
||||||
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
|
EnqueueUnique(ctx context.Context, msg *TaskMessage, ttl time.Duration) error
|
||||||
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
Dequeue(qnames ...string) (*TaskMessage, error)
|
||||||
Done(msg *TaskMessage) error
|
Done(msg *TaskMessage) error
|
||||||
MarkAsComplete(msg *TaskMessage) error
|
MarkAsComplete(msg *TaskMessage) error
|
||||||
Requeue(msg *TaskMessage) error
|
Requeue(msg *TaskMessage) error
|
||||||
|
@ -113,7 +113,7 @@ func BenchmarkDequeueSingleQueue(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if _, _, err := r.Dequeue(base.DefaultQueueName); err != nil {
|
if _, err := r.Dequeue(base.DefaultQueueName); err != nil {
|
||||||
b.Fatalf("Dequeue failed: %v", err)
|
b.Fatalf("Dequeue failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,7 +139,7 @@ func BenchmarkDequeueMultipleQueues(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer()
|
b.StartTimer()
|
||||||
|
|
||||||
if _, _, err := r.Dequeue(qnames...); err != nil {
|
if _, err := r.Dequeue(qnames...); err != nil {
|
||||||
b.Fatalf("Dequeue failed: %v", err)
|
b.Fatalf("Dequeue failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,8 @@ import (
|
|||||||
|
|
||||||
const statsTTL = 90 * 24 * time.Hour // 90 days
|
const statsTTL = 90 * 24 * time.Hour // 90 days
|
||||||
|
|
||||||
|
const leaseDuration = 30 * time.Second
|
||||||
|
|
||||||
// RDB is a client interface to query and mutate task queues.
|
// RDB is a client interface to query and mutate task queues.
|
||||||
type RDB struct {
|
type RDB struct {
|
||||||
client redis.UniversalClient
|
client redis.UniversalClient
|
||||||
@ -213,20 +215,17 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
|
|||||||
// KEYS[1] -> asynq:{<qname>}:pending
|
// KEYS[1] -> asynq:{<qname>}:pending
|
||||||
// KEYS[2] -> asynq:{<qname>}:paused
|
// KEYS[2] -> asynq:{<qname>}:paused
|
||||||
// KEYS[3] -> asynq:{<qname>}:active
|
// KEYS[3] -> asynq:{<qname>}:active
|
||||||
// KEYS[4] -> asynq:{<qname>}:deadlines
|
// KEYS[4] -> asynq:{<qname>}:lease
|
||||||
// --
|
// --
|
||||||
// ARGV[1] -> current time in Unix time
|
// ARGV[1] -> initial lease expiration Unix time
|
||||||
// ARGV[2] -> task key prefix
|
// ARGV[2] -> task key prefix
|
||||||
//
|
//
|
||||||
// Output:
|
// Output:
|
||||||
// Returns nil if no processable task is found in the given queue.
|
// Returns nil if no processable task is found in the given queue.
|
||||||
// Returns tuple {msg , deadline} if task is found, where `msg` is the encoded
|
// Returns an encoded TaskMessage.
|
||||||
// TaskMessage, and `deadline` is Unix time in seconds.
|
|
||||||
//
|
//
|
||||||
// Note: dequeueCmd checks whether a queue is paused first, before
|
// Note: dequeueCmd checks whether a queue is paused first, before
|
||||||
// calling RPOPLPUSH to pop a task from the queue.
|
// calling RPOPLPUSH to pop a task from the queue.
|
||||||
// It computes the task deadline by inspecting Timout and Deadline fields,
|
|
||||||
// and inserts the task to the deadlines zset with the computed deadline.
|
|
||||||
var dequeueCmd = redis.NewScript(`
|
var dequeueCmd = redis.NewScript(`
|
||||||
if redis.call("EXISTS", KEYS[2]) == 0 then
|
if redis.call("EXISTS", KEYS[2]) == 0 then
|
||||||
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
|
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
|
||||||
@ -234,70 +233,45 @@ if redis.call("EXISTS", KEYS[2]) == 0 then
|
|||||||
local key = ARGV[2] .. id
|
local key = ARGV[2] .. id
|
||||||
redis.call("HSET", key, "state", "active")
|
redis.call("HSET", key, "state", "active")
|
||||||
redis.call("HDEL", key, "pending_since")
|
redis.call("HDEL", key, "pending_since")
|
||||||
local data = redis.call("HMGET", key, "msg", "timeout", "deadline")
|
redis.call("ZADD", KEYS[4], ARGV[1], id)
|
||||||
local msg = data[1]
|
return redis.call("HGET", key, "msg")
|
||||||
local timeout = tonumber(data[2])
|
|
||||||
local deadline = tonumber(data[3])
|
|
||||||
local score
|
|
||||||
if timeout ~= 0 and deadline ~= 0 then
|
|
||||||
score = math.min(ARGV[1]+timeout, deadline)
|
|
||||||
elseif timeout ~= 0 then
|
|
||||||
score = ARGV[1] + timeout
|
|
||||||
elseif deadline ~= 0 then
|
|
||||||
score = deadline
|
|
||||||
else
|
|
||||||
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
|
|
||||||
end
|
|
||||||
redis.call("ZADD", KEYS[4], score, id)
|
|
||||||
return {msg, score}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
return nil`)
|
return nil`)
|
||||||
|
|
||||||
// Dequeue queries given queues in order and pops a task message
|
// Dequeue queries given queues in order and pops a task message
|
||||||
// off a queue if one exists and returns the message and deadline.
|
// off a queue if one exists and returns the message.
|
||||||
// Dequeue skips a queue if the queue is paused.
|
// Dequeue skips a queue if the queue is paused.
|
||||||
// If all queues are empty, ErrNoProcessableTask error is returned.
|
// If all queues are empty, ErrNoProcessableTask error is returned.
|
||||||
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) {
|
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, err error) {
|
||||||
var op errors.Op = "rdb.Dequeue"
|
var op errors.Op = "rdb.Dequeue"
|
||||||
for _, qname := range qnames {
|
for _, qname := range qnames {
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.PendingKey(qname),
|
base.PendingKey(qname),
|
||||||
base.PausedKey(qname),
|
base.PausedKey(qname),
|
||||||
base.ActiveKey(qname),
|
base.ActiveKey(qname),
|
||||||
base.DeadlinesKey(qname),
|
base.LeaseKey(qname),
|
||||||
}
|
}
|
||||||
argv := []interface{}{
|
argv := []interface{}{
|
||||||
r.clock.Now().Unix(),
|
r.clock.Now().Add(leaseDuration).Unix(),
|
||||||
base.TaskKeyPrefix(qname),
|
base.TaskKeyPrefix(qname),
|
||||||
}
|
}
|
||||||
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
res, err := dequeueCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||||
}
|
}
|
||||||
data, err := cast.ToSliceE(res)
|
encoded, err := cast.ToStringE(res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
||||||
}
|
|
||||||
if len(data) != 2 {
|
|
||||||
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("Lua script returned %d values; expected 2", len(data)))
|
|
||||||
}
|
|
||||||
encoded, err := cast.ToStringE(data[0])
|
|
||||||
if err != nil {
|
|
||||||
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
|
||||||
}
|
|
||||||
d, err := cast.ToInt64E(data[1])
|
|
||||||
if err != nil {
|
|
||||||
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
|
||||||
}
|
}
|
||||||
if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
|
if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
|
||||||
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err))
|
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err))
|
||||||
}
|
}
|
||||||
return msg, time.Unix(d, 0), nil
|
return msg, nil
|
||||||
}
|
}
|
||||||
return nil, time.Time{}, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask)
|
return nil, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:active
|
// KEYS[1] -> asynq:{<qname>}:active
|
||||||
|
@ -322,7 +322,6 @@ func TestDequeue(t *testing.T) {
|
|||||||
Timeout: 1800,
|
Timeout: 1800,
|
||||||
Deadline: 0,
|
Deadline: 0,
|
||||||
}
|
}
|
||||||
t1Deadline := now.Unix() + t1.Timeout
|
|
||||||
t2 := &base.TaskMessage{
|
t2 := &base.TaskMessage{
|
||||||
ID: uuid.NewString(),
|
ID: uuid.NewString(),
|
||||||
Type: "export_csv",
|
Type: "export_csv",
|
||||||
@ -331,7 +330,6 @@ func TestDequeue(t *testing.T) {
|
|||||||
Timeout: 0,
|
Timeout: 0,
|
||||||
Deadline: 1593021600,
|
Deadline: 1593021600,
|
||||||
}
|
}
|
||||||
t2Deadline := t2.Deadline
|
|
||||||
t3 := &base.TaskMessage{
|
t3 := &base.TaskMessage{
|
||||||
ID: uuid.NewString(),
|
ID: uuid.NewString(),
|
||||||
Type: "reindex",
|
Type: "reindex",
|
||||||
@ -343,28 +341,26 @@ func TestDequeue(t *testing.T) {
|
|||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
args []string // list of queues to query
|
qnames []string // list of queues to query
|
||||||
wantMsg *base.TaskMessage
|
wantMsg *base.TaskMessage
|
||||||
wantDeadline time.Time
|
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
wantActive map[string][]*base.TaskMessage
|
wantActive map[string][]*base.TaskMessage
|
||||||
wantDeadlines map[string][]base.Z
|
wantLease map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
pending: map[string][]*base.TaskMessage{
|
pending: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
},
|
},
|
||||||
args: []string{"default"},
|
qnames: []string{"default"},
|
||||||
wantMsg: t1,
|
wantMsg: t1,
|
||||||
wantDeadline: time.Unix(t1Deadline, 0),
|
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
},
|
},
|
||||||
wantActive: map[string][]*base.TaskMessage{
|
wantActive: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
},
|
},
|
||||||
wantDeadlines: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {{Message: t1, Score: t1Deadline}},
|
"default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -373,9 +369,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"critical": {t2},
|
"critical": {t2},
|
||||||
"low": {t3},
|
"low": {t3},
|
||||||
},
|
},
|
||||||
args: []string{"critical", "default", "low"},
|
qnames: []string{"critical", "default", "low"},
|
||||||
wantMsg: t2,
|
wantMsg: t2,
|
||||||
wantDeadline: time.Unix(t2Deadline, 0),
|
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
@ -386,9 +381,9 @@ func TestDequeue(t *testing.T) {
|
|||||||
"critical": {t2},
|
"critical": {t2},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
wantDeadlines: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
"critical": {{Message: t2, Score: t2Deadline}},
|
"critical": {{Message: t2, Score: now.Add(leaseDuration).Unix()}},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -398,9 +393,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {t3},
|
"low": {t3},
|
||||||
},
|
},
|
||||||
args: []string{"critical", "default", "low"},
|
qnames: []string{"critical", "default", "low"},
|
||||||
wantMsg: t1,
|
wantMsg: t1,
|
||||||
wantDeadline: time.Unix(t1Deadline, 0),
|
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
@ -411,8 +405,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
wantDeadlines: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {{Message: t1, Score: t1Deadline}},
|
"default": {{Message: t1, Score: now.Add(leaseDuration).Unix()}},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
@ -423,19 +417,14 @@ func TestDequeue(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||||
|
|
||||||
gotMsg, gotDeadline, err := r.Dequeue(tc.args...)
|
gotMsg, err := r.Dequeue(tc.qnames...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.args, err)
|
t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.qnames, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if !cmp.Equal(gotMsg, tc.wantMsg) {
|
if !cmp.Equal(gotMsg, tc.wantMsg) {
|
||||||
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v",
|
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v",
|
||||||
tc.args, gotMsg, tc.wantMsg)
|
tc.qnames, gotMsg, tc.wantMsg)
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !cmp.Equal(gotDeadline, tc.wantDeadline) {
|
|
||||||
t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v",
|
|
||||||
tc.args, gotDeadline, tc.wantDeadline)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -451,10 +440,10 @@ func TestDequeue(t *testing.T) {
|
|||||||
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff)
|
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for queue, want := range tc.wantDeadlines {
|
for queue, want := range tc.wantLease {
|
||||||
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
|
gotLease := h.GetLeaseEntries(t, r.client, queue)
|
||||||
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
|
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff)
|
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LeaseKey(queue), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -466,17 +455,17 @@ func TestDequeueError(t *testing.T) {
|
|||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
args []string // list of queues to query
|
qnames []string // list of queues to query
|
||||||
wantErr error
|
wantErr error
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
wantActive map[string][]*base.TaskMessage
|
wantActive map[string][]*base.TaskMessage
|
||||||
wantDeadlines map[string][]base.Z
|
wantLease map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
pending: map[string][]*base.TaskMessage{
|
pending: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
},
|
},
|
||||||
args: []string{"default"},
|
qnames: []string{"default"},
|
||||||
wantErr: errors.ErrNoProcessableTask,
|
wantErr: errors.ErrNoProcessableTask,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
@ -484,7 +473,7 @@ func TestDequeueError(t *testing.T) {
|
|||||||
wantActive: map[string][]*base.TaskMessage{
|
wantActive: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
},
|
},
|
||||||
wantDeadlines: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -494,7 +483,7 @@ func TestDequeueError(t *testing.T) {
|
|||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
args: []string{"critical", "default", "low"},
|
qnames: []string{"critical", "default", "low"},
|
||||||
wantErr: errors.ErrNoProcessableTask,
|
wantErr: errors.ErrNoProcessableTask,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
@ -506,7 +495,7 @@ func TestDequeueError(t *testing.T) {
|
|||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
wantDeadlines: map[string][]base.Z{
|
wantLease: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
@ -518,18 +507,14 @@ func TestDequeueError(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||||
|
|
||||||
gotMsg, gotDeadline, gotErr := r.Dequeue(tc.args...)
|
gotMsg, gotErr := r.Dequeue(tc.qnames...)
|
||||||
if !errors.Is(gotErr, tc.wantErr) {
|
if !errors.Is(gotErr, tc.wantErr) {
|
||||||
t.Errorf("(*RDB).Dequeue(%v) returned error %v; want %v",
|
t.Errorf("(*RDB).Dequeue(%v) returned error %v; want %v",
|
||||||
tc.args, gotErr, tc.wantErr)
|
tc.qnames, gotErr, tc.wantErr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if gotMsg != nil {
|
if gotMsg != nil {
|
||||||
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want nil", tc.args, gotMsg)
|
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want nil", tc.qnames, gotMsg)
|
||||||
continue
|
|
||||||
}
|
|
||||||
if !gotDeadline.IsZero() {
|
|
||||||
t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", tc.args, gotDeadline, time.Time{})
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -545,10 +530,10 @@ func TestDequeueError(t *testing.T) {
|
|||||||
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff)
|
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for queue, want := range tc.wantDeadlines {
|
for queue, want := range tc.wantLease {
|
||||||
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
|
gotLease := h.GetLeaseEntries(t, r.client, queue)
|
||||||
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
|
if diff := cmp.Diff(want, gotLease, h.SortZSetEntryOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff)
|
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LeaseKey(queue), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -577,7 +562,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
paused []string // list of paused queues
|
paused []string // list of paused queues
|
||||||
pending map[string][]*base.TaskMessage
|
pending map[string][]*base.TaskMessage
|
||||||
args []string // list of queues to query
|
qnames []string // list of queues to query
|
||||||
wantMsg *base.TaskMessage
|
wantMsg *base.TaskMessage
|
||||||
wantErr error
|
wantErr error
|
||||||
wantPending map[string][]*base.TaskMessage
|
wantPending map[string][]*base.TaskMessage
|
||||||
@ -589,7 +574,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
"default": {t1},
|
"default": {t1},
|
||||||
"critical": {t2},
|
"critical": {t2},
|
||||||
},
|
},
|
||||||
args: []string{"default", "critical"},
|
qnames: []string{"default", "critical"},
|
||||||
wantMsg: t2,
|
wantMsg: t2,
|
||||||
wantErr: nil,
|
wantErr: nil,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
@ -606,7 +591,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
pending: map[string][]*base.TaskMessage{
|
pending: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
},
|
},
|
||||||
args: []string{"default"},
|
qnames: []string{"default"},
|
||||||
wantMsg: nil,
|
wantMsg: nil,
|
||||||
wantErr: errors.ErrNoProcessableTask,
|
wantErr: errors.ErrNoProcessableTask,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
@ -622,7 +607,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
"default": {t1},
|
"default": {t1},
|
||||||
"critical": {t2},
|
"critical": {t2},
|
||||||
},
|
},
|
||||||
args: []string{"default", "critical"},
|
qnames: []string{"default", "critical"},
|
||||||
wantMsg: nil,
|
wantMsg: nil,
|
||||||
wantErr: errors.ErrNoProcessableTask,
|
wantErr: errors.ErrNoProcessableTask,
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
@ -645,10 +630,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
}
|
}
|
||||||
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
h.SeedAllPendingQueues(t, r.client, tc.pending)
|
||||||
|
|
||||||
got, _, err := r.Dequeue(tc.args...)
|
got, err := r.Dequeue(tc.qnames...)
|
||||||
if !cmp.Equal(got, tc.wantMsg) || !errors.Is(err, tc.wantErr) {
|
if !cmp.Equal(got, tc.wantMsg) || !errors.Is(err, tc.wantErr) {
|
||||||
t.Errorf("Dequeue(%v) = %v, %v; want %v, %v",
|
t.Errorf("Dequeue(%v) = %v, %v; want %v, %v",
|
||||||
tc.args, got, err, tc.wantMsg, tc.wantErr)
|
tc.qnames, got, err, tc.wantMsg, tc.wantErr)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,11 +64,11 @@ func (tb *TestBroker) EnqueueUnique(ctx context.Context, msg *base.TaskMessage,
|
|||||||
return tb.real.EnqueueUnique(ctx, msg, ttl)
|
return tb.real.EnqueueUnique(ctx, msg, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, time.Time, error) {
|
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, error) {
|
||||||
tb.mu.Lock()
|
tb.mu.Lock()
|
||||||
defer tb.mu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
if tb.sleeping {
|
if tb.sleeping {
|
||||||
return nil, time.Time{}, errRedisDown
|
return nil, errRedisDown
|
||||||
}
|
}
|
||||||
return tb.real.Dequeue(qnames...)
|
return tb.real.Dequeue(qnames...)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user