mirror of
https://github.com/hibiken/asynq.git
synced 2025-02-23 12:20:19 +08:00
Refactor (*rdb).forward test
This commit is contained in:
parent
810c40bd54
commit
cff5e67018
3
rdb.go
3
rdb.go
@ -199,7 +199,6 @@ func (r *rdb) forward(from string) error {
|
|||||||
return msgs
|
return msgs
|
||||||
`)
|
`)
|
||||||
now := float64(time.Now().Unix())
|
now := float64(time.Now().Unix())
|
||||||
res, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result()
|
_, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result()
|
||||||
fmt.Printf("[DEBUGGING LUA} %v, %v\n", res, err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
71
rdb_test.go
71
rdb_test.go
@ -3,6 +3,7 @@ package asynq
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -149,7 +150,6 @@ func TestForward(t *testing.T) {
|
|||||||
r := setup(t)
|
r := setup(t)
|
||||||
t1 := randomTask("send_email", defaultQueue, nil)
|
t1 := randomTask("send_email", defaultQueue, nil)
|
||||||
t2 := randomTask("generate_csv", defaultQueue, nil)
|
t2 := randomTask("generate_csv", defaultQueue, nil)
|
||||||
secondAgo := time.Now().Add(-time.Second) // use timestamp for the past to avoid advancing time
|
|
||||||
json1, err := json.Marshal(t1)
|
json1, err := json.Marshal(t1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
@ -158,26 +158,67 @@ func TestForward(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
client.ZAdd(scheduled, &redis.Z{
|
secondAgo := time.Now().Add(-time.Second)
|
||||||
Member: string(json1),
|
hourFromNow := time.Now().Add(time.Hour)
|
||||||
Score: float64(secondAgo.Unix()),
|
|
||||||
}, &redis.Z{
|
tests := []struct {
|
||||||
Member: string(json2),
|
tasks []*redis.Z // scheduled tasks with timestamp as a score
|
||||||
Score: float64(secondAgo.Unix()),
|
wantQueued []string // queue after calling forward
|
||||||
|
wantScheduled []string // scheduled queue after calling forward
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
tasks: []*redis.Z{
|
||||||
|
&redis.Z{Member: string(json1), Score: float64(secondAgo.Unix())},
|
||||||
|
&redis.Z{Member: string(json2), Score: float64(secondAgo.Unix())}},
|
||||||
|
wantQueued: []string{string(json1), string(json2)},
|
||||||
|
wantScheduled: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
tasks: []*redis.Z{
|
||||||
|
&redis.Z{Member: string(json1), Score: float64(hourFromNow.Unix())},
|
||||||
|
&redis.Z{Member: string(json2), Score: float64(secondAgo.Unix())}},
|
||||||
|
wantQueued: []string{string(json2)},
|
||||||
|
wantScheduled: []string{string(json1)},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
tasks: []*redis.Z{
|
||||||
|
&redis.Z{Member: string(json1), Score: float64(hourFromNow.Unix())},
|
||||||
|
&redis.Z{Member: string(json2), Score: float64(hourFromNow.Unix())}},
|
||||||
|
wantQueued: []string{},
|
||||||
|
wantScheduled: []string{string(json1), string(json2)},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
sortStrOpt := cmp.Transformer("SortStr", func(in []string) []string {
|
||||||
|
out := append([]string(nil), in...) // Copy input to avoid mutating it
|
||||||
|
sort.Strings(out)
|
||||||
|
return out
|
||||||
})
|
})
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
// clean up db before each test case.
|
||||||
|
if err := client.FlushDB().Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := client.ZAdd(scheduled, tc.tasks...).Err(); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
err = r.forward(scheduled)
|
err = r.forward(scheduled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*rdb).forward(%q) = %v, want nil", scheduled, err)
|
t.Errorf("(*rdb).forward(%q) = %v, want nil", scheduled, err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
gotQueued := client.LRange(defaultQueue, 0, -1).Val()
|
||||||
if c := client.ZCard(scheduled).Val(); c != 0 {
|
if diff := cmp.Diff(tc.wantQueued, gotQueued, sortStrOpt); diff != "" {
|
||||||
t.Errorf("ZCARD %q = %d, want 0", scheduled, c)
|
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", defaultQueue, len(gotQueued), len(tc.wantQueued), diff)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
gotScheduled := client.ZRangeByScore(scheduled, &redis.ZRangeBy{Min: "-inf", Max: "+inf"}).Val()
|
||||||
|
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortStrOpt); diff != "" {
|
||||||
|
t.Errorf("%q has %d tasks, want %d tasks; (-want, +got)\n%s", scheduled, len(gotScheduled), len(tc.wantScheduled), diff)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
if l := client.LLen(defaultQueue).Val(); l != 2 {
|
|
||||||
t.Errorf("LLEN %q = %d, want 2", defaultQueue, l)
|
|
||||||
}
|
|
||||||
if c := client.SCard(allQueues).Val(); c != 1 {
|
|
||||||
t.Errorf("SCARD %q = %d, want 1", allQueues, c)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user