2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Add (*rdb).forward method using lua script

This commit is contained in:
Ken Hibino 2019-11-25 07:09:39 -08:00
parent f91004e6aa
commit 4aa5078dc4
2 changed files with 55 additions and 0 deletions

18
rdb.go
View File

@ -185,3 +185,21 @@ func (r *rdb) moveAll(src, dst string) error {
} }
return r.client.Watch(txf, src) return r.client.Watch(txf, src)
} }
// forward moves all tasks with a score less than the current unix time
// from the given zset to the default queue.
func (r *rdb) forward(from string) error {
script := redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do
redis.call("ZREM", KEYS[1], msg)
redis.call("SADD", KEYS[2], KEYS[3])
redis.call("LPUSH", KEYS[3], msg)
end
return msgs
`)
now := float64(time.Now().Unix())
res, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result()
fmt.Printf("[DEBUGGING LUA} %v, %v\n", res, err)
return err
}

View File

@ -126,3 +126,40 @@ func TestMoveAll(t *testing.T) {
t.Errorf("LLEN %q = %d, want %d", defaultQueue, l, len(seed)) t.Errorf("LLEN %q = %d, want %d", defaultQueue, l, len(seed))
} }
} }
func TestForward(t *testing.T) {
r := setup()
t1 := randomTask("send_email", defaultQueue)
t2 := randomTask("generate_csv", defaultQueue)
secondAgo := time.Now().Add(-time.Second) // use timestamp for the past to avoid advancing time
json1, err := json.Marshal(t1)
if err != nil {
t.Fatalf("json.Marshal() failed: %v", err)
}
json2, err := json.Marshal(t2)
if err != nil {
t.Fatalf("json.Marshal() failed: %v", err)
}
client.ZAdd(scheduled, &redis.Z{
Member: string(json1),
Score: float64(secondAgo.Unix()),
}, &redis.Z{
Member: string(json2),
Score: float64(secondAgo.Unix()),
})
err = r.forward(scheduled)
if err != nil {
t.Fatalf("r.forward() failed: %v", err)
}
if c := client.ZCard(scheduled).Val(); c != 0 {
t.Errorf("ZCARD %q = %d, want 0", scheduled, c)
}
if l := client.LLen(defaultQueue).Val(); l != 2 {
t.Errorf("LLEN %q = %d, want 2", defaultQueue, l)
}
if c := client.SCard(allQueues).Val(); c != 1 {
t.Errorf("SCARD %q = %d, want 1", allQueues, c)
}
}