From 4aa5078dc45e95bf680685f9109fce92f5d74bbd Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 25 Nov 2019 07:09:39 -0800 Subject: [PATCH] Add (*rdb).forward method using lua script --- rdb.go | 18 ++++++++++++++++++ rdb_test.go | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/rdb.go b/rdb.go index c78ff16..556c246 100644 --- a/rdb.go +++ b/rdb.go @@ -185,3 +185,21 @@ func (r *rdb) moveAll(src, dst string) error { } return r.client.Watch(txf, src) } + +// forward moves all tasks with a score less than the current unix time +// from the given zset to the default queue. +func (r *rdb) forward(from string) error { + script := redis.NewScript(` + local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) + for _, msg in ipairs(msgs) do + redis.call("ZREM", KEYS[1], msg) + redis.call("SADD", KEYS[2], KEYS[3]) + redis.call("LPUSH", KEYS[3], msg) + end + return msgs + `) + now := float64(time.Now().Unix()) + res, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result() + fmt.Printf("[DEBUGGING LUA} %v, %v\n", res, err) + return err +} diff --git a/rdb_test.go b/rdb_test.go index cec2de9..48f4463 100644 --- a/rdb_test.go +++ b/rdb_test.go @@ -126,3 +126,40 @@ func TestMoveAll(t *testing.T) { t.Errorf("LLEN %q = %d, want %d", defaultQueue, l, len(seed)) } } + +func TestForward(t *testing.T) { + r := setup() + t1 := randomTask("send_email", defaultQueue) + t2 := randomTask("generate_csv", defaultQueue) + secondAgo := time.Now().Add(-time.Second) // use timestamp for the past to avoid advancing time + json1, err := json.Marshal(t1) + if err != nil { + t.Fatalf("json.Marshal() failed: %v", err) + } + json2, err := json.Marshal(t2) + if err != nil { + t.Fatalf("json.Marshal() failed: %v", err) + } + client.ZAdd(scheduled, &redis.Z{ + Member: string(json1), + Score: float64(secondAgo.Unix()), + }, &redis.Z{ + Member: string(json2), + Score: float64(secondAgo.Unix()), + }) + + err = r.forward(scheduled) + if err != nil { + t.Fatalf("r.forward() failed: %v", err) + } + + if c := client.ZCard(scheduled).Val(); c != 0 { + t.Errorf("ZCARD %q = %d, want 0", scheduled, c) + } + if l := client.LLen(defaultQueue).Val(); l != 2 { + t.Errorf("LLEN %q = %d, want 2", defaultQueue, l) + } + if c := client.SCard(allQueues).Val(); c != 1 { + t.Errorf("SCARD %q = %d, want 1", allQueues, c) + } +}