mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-03 05:12:01 +08:00
committed by
Mohammed Sohail
parent
53e4ec506d
commit
4db3079c92
@@ -3119,7 +3119,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
desc string
|
||||
// initial data
|
||||
tasks []*h.TaskSeedData
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
allGroups map[string][]string
|
||||
|
||||
// args
|
||||
@@ -3138,7 +3138,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{
|
||||
desc: "with an empty group",
|
||||
tasks: []*h.TaskSeedData{},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {},
|
||||
},
|
||||
allGroups: map[string][]string{
|
||||
@@ -3165,7 +3165,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3198,7 +3198,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3232,7 +3232,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg2, State: base.TaskStateAggregating},
|
||||
{Msg: msg3, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3263,7 +3263,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3296,7 +3296,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3335,7 +3335,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3368,7 +3368,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3470,8 +3470,8 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
desc string
|
||||
// initial data
|
||||
tasks []*h.TaskSeedData
|
||||
aggregationSets map[string][]*redis.Z
|
||||
allAggregationSets map[string][]*redis.Z
|
||||
aggregationSets map[string][]redis.Z
|
||||
allAggregationSets map[string][]redis.Z
|
||||
|
||||
// args
|
||||
ctx context.Context
|
||||
@@ -3491,14 +3491,14 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
{Msg: m2, State: base.TaskStateAggregating},
|
||||
{Msg: m3, State: base.TaskStateAggregating},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "mygroup", setID): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-4 * time.Minute).Unix())},
|
||||
{Member: m3.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "mygroup", setID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
},
|
||||
@@ -3525,7 +3525,7 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
{Msg: m2, State: base.TaskStateAggregating},
|
||||
{Msg: m3, State: base.TaskStateAggregating},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "mygroup", setID): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
},
|
||||
@@ -3534,7 +3534,7 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
{Member: m3.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "mygroup", setID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
{Member: base.AggregationSetKey("default", "mygroup", otherSetID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
@@ -3599,8 +3599,8 @@ func TestDeleteAggregationSetError(t *testing.T) {
|
||||
desc string
|
||||
// initial data
|
||||
tasks []*h.TaskSeedData
|
||||
aggregationSets map[string][]*redis.Z
|
||||
allAggregationSets map[string][]*redis.Z
|
||||
aggregationSets map[string][]redis.Z
|
||||
allAggregationSets map[string][]redis.Z
|
||||
|
||||
// args
|
||||
ctx context.Context
|
||||
@@ -3619,14 +3619,14 @@ func TestDeleteAggregationSetError(t *testing.T) {
|
||||
{Msg: m2, State: base.TaskStateAggregating},
|
||||
{Msg: m3, State: base.TaskStateAggregating},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "mygroup", setID): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-4 * time.Minute).Unix())},
|
||||
{Member: m3.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "mygroup", setID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
},
|
||||
@@ -3685,23 +3685,23 @@ func TestReclaimStaleAggregationSets(t *testing.T) {
|
||||
// Note: In this test, we're trying out a new way to test RDB by exactly describing how
|
||||
// keys and values are represented in Redis.
|
||||
tests := []struct {
|
||||
groups map[string][]*redis.Z // map redis-key to redis-zset
|
||||
aggregationSets map[string][]*redis.Z
|
||||
allAggregationSets map[string][]*redis.Z
|
||||
groups map[string][]redis.Z // map redis-key to redis-zset
|
||||
aggregationSets map[string][]redis.Z
|
||||
allAggregationSets map[string][]redis.Z
|
||||
qname string
|
||||
wantGroups map[string][]redis.Z
|
||||
wantAggregationSets map[string][]redis.Z
|
||||
wantAllAggregationSets map[string][]redis.Z
|
||||
}{
|
||||
{
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "foo"): {},
|
||||
base.GroupKey("default", "bar"): {},
|
||||
base.GroupKey("default", "qux"): {
|
||||
{Member: m4.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
|
||||
},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "foo", "set1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-4 * time.Minute).Unix())},
|
||||
@@ -3710,7 +3710,7 @@ func TestReclaimStaleAggregationSets(t *testing.T) {
|
||||
{Member: m3.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "foo", "set1"), Score: float64(now.Add(-10 * time.Second).Unix())}, // set1 is expired
|
||||
{Member: base.AggregationSetKey("default", "bar", "set2"), Score: float64(now.Add(40 * time.Second).Unix())}, // set2 is not expired
|
||||
|
Reference in New Issue
Block a user