mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Implement RDB.DeleteAggregationSet
This commit is contained in:
parent
99c00bffeb
commit
7849b1114c
@ -252,6 +252,12 @@ func SeedGroup(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname,
|
|||||||
seedRedisZSet(tb, r, base.GroupKey(qname, gname), entries, base.TaskStateAggregating)
|
seedRedisZSet(tb, r, base.GroupKey(qname, gname), entries, base.TaskStateAggregating)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SeedAggregationSet(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname, gname, setID string) {
|
||||||
|
tb.Helper()
|
||||||
|
r.SAdd(context.Background(), base.AllQueues, qname)
|
||||||
|
seedRedisSet(tb, r, base.AggregationSetKey(qname, gname, setID), msgs, base.TaskStateAggregating)
|
||||||
|
}
|
||||||
|
|
||||||
// SeedAllPendingQueues initializes all of the specified queues with the given messages.
|
// SeedAllPendingQueues initializes all of the specified queues with the given messages.
|
||||||
//
|
//
|
||||||
// pending maps a queue name to a list of messages.
|
// pending maps a queue name to a list of messages.
|
||||||
@ -330,14 +336,14 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string,
|
|||||||
if err := c.LPush(context.Background(), key, msg.ID).Err(); err != nil {
|
if err := c.LPush(context.Background(), key, msg.ID).Err(); err != nil {
|
||||||
tb.Fatal(err)
|
tb.Fatal(err)
|
||||||
}
|
}
|
||||||
key := base.TaskKey(msg.Queue, msg.ID)
|
taskKey := base.TaskKey(msg.Queue, msg.ID)
|
||||||
data := map[string]interface{}{
|
data := map[string]interface{}{
|
||||||
"msg": encoded,
|
"msg": encoded,
|
||||||
"state": state.String(),
|
"state": state.String(),
|
||||||
"unique_key": msg.UniqueKey,
|
"unique_key": msg.UniqueKey,
|
||||||
"group": msg.GroupKey,
|
"group": msg.GroupKey,
|
||||||
}
|
}
|
||||||
if err := c.HSet(context.Background(), key, data).Err(); err != nil {
|
if err := c.HSet(context.Background(), taskKey, data).Err(); err != nil {
|
||||||
tb.Fatal(err)
|
tb.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(msg.UniqueKey) > 0 {
|
if len(msg.UniqueKey) > 0 {
|
||||||
@ -359,14 +365,41 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string,
|
|||||||
if err := c.ZAdd(context.Background(), key, z).Err(); err != nil {
|
if err := c.ZAdd(context.Background(), key, z).Err(); err != nil {
|
||||||
tb.Fatal(err)
|
tb.Fatal(err)
|
||||||
}
|
}
|
||||||
key := base.TaskKey(msg.Queue, msg.ID)
|
taskKey := base.TaskKey(msg.Queue, msg.ID)
|
||||||
data := map[string]interface{}{
|
data := map[string]interface{}{
|
||||||
"msg": encoded,
|
"msg": encoded,
|
||||||
"state": state.String(),
|
"state": state.String(),
|
||||||
"unique_key": msg.UniqueKey,
|
"unique_key": msg.UniqueKey,
|
||||||
"group": msg.GroupKey,
|
"group": msg.GroupKey,
|
||||||
}
|
}
|
||||||
if err := c.HSet(context.Background(), key, data).Err(); err != nil {
|
if err := c.HSet(context.Background(), taskKey, data).Err(); err != nil {
|
||||||
|
tb.Fatal(err)
|
||||||
|
}
|
||||||
|
if len(msg.UniqueKey) > 0 {
|
||||||
|
err := c.SetNX(context.Background(), msg.UniqueKey, msg.ID, 1*time.Minute).Err()
|
||||||
|
if err != nil {
|
||||||
|
tb.Fatalf("Failed to set unique lock in redis: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func seedRedisSet(tb testing.TB, c redis.UniversalClient, key string,
|
||||||
|
msgs []*base.TaskMessage, state base.TaskState) {
|
||||||
|
tb.Helper()
|
||||||
|
for _, msg := range msgs {
|
||||||
|
encoded := MustMarshal(tb, msg)
|
||||||
|
if err := c.SAdd(context.Background(), key, msg.ID).Err(); err != nil {
|
||||||
|
tb.Fatal(err)
|
||||||
|
}
|
||||||
|
taskKey := base.TaskKey(msg.Queue, msg.ID)
|
||||||
|
data := map[string]interface{}{
|
||||||
|
"msg": encoded,
|
||||||
|
"state": state.String(),
|
||||||
|
"unique_key": msg.UniqueKey,
|
||||||
|
"group": msg.GroupKey,
|
||||||
|
}
|
||||||
|
if err := c.HSet(context.Background(), taskKey, data).Err(); err != nil {
|
||||||
tb.Fatal(err)
|
tb.Fatal(err)
|
||||||
}
|
}
|
||||||
if len(msg.UniqueKey) > 0 {
|
if len(msg.UniqueKey) > 0 {
|
||||||
|
@ -1124,9 +1124,22 @@ func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessag
|
|||||||
return msgs, time.Unix(int64(deadlineUnix), 0), nil
|
return msgs, time.Unix(int64(deadlineUnix), 0), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteAggregationSet deletes the aggregation set identified by the parameters.
|
// KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>
|
||||||
|
// -------
|
||||||
|
// ARGV[1] -> task key prefix
|
||||||
|
var deleteAggregationSetCmd = redis.NewScript(`
|
||||||
|
local ids = redis.call("SMEMBERS", KEYS[1])
|
||||||
|
for _, id in ipairs(ids) do
|
||||||
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
|
end
|
||||||
|
redis.call("DEL", KEYS[1])
|
||||||
|
return redis.status_reply("OK")
|
||||||
|
`)
|
||||||
|
|
||||||
|
// DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.
|
||||||
func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error {
|
func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error {
|
||||||
return nil
|
var op errors.Op = "RDB.DeleteAggregationSet"
|
||||||
|
return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:completed
|
// KEYS[1] -> asynq:{<qname>}:completed
|
||||||
|
@ -3284,3 +3284,50 @@ func TestAggregationCheck(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteAggregationSet(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
defer r.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
setID := uuid.NewString()
|
||||||
|
msg1 := h.NewTaskMessageBuilder().SetType("foo").SetQueue("default").SetGroup("mygroup").Build()
|
||||||
|
msg2 := h.NewTaskMessageBuilder().SetType("bar").SetQueue("default").SetGroup("mygroup").Build()
|
||||||
|
msg3 := h.NewTaskMessageBuilder().SetType("baz").SetQueue("default").SetGroup("mygroup").Build()
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
aggregationSet []*base.TaskMessage
|
||||||
|
qname string
|
||||||
|
gname string
|
||||||
|
setID string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
aggregationSet: []*base.TaskMessage{msg1, msg2, msg3},
|
||||||
|
qname: "default",
|
||||||
|
gname: "mygroup",
|
||||||
|
setID: setID,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client)
|
||||||
|
h.SeedAggregationSet(t, r.client, tc.aggregationSet, tc.qname, tc.gname, tc.setID)
|
||||||
|
|
||||||
|
if err := r.DeleteAggregationSet(ctx, tc.qname, tc.gname, tc.setID); err != nil {
|
||||||
|
t.Fatalf("DeleteAggregationSet returned error: %v", err)
|
||||||
|
}
|
||||||
|
key := base.AggregationSetKey(tc.qname, tc.gname, tc.setID)
|
||||||
|
// Check if the set is deleted.
|
||||||
|
if r.client.Exists(ctx, key).Val() != 0 {
|
||||||
|
t.Errorf("aggregation set key %q still exists", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check all tasks in the set are deleted.
|
||||||
|
for _, m := range tc.aggregationSet {
|
||||||
|
taskKey := base.TaskKey(m.Queue, m.ID)
|
||||||
|
if r.client.Exists(ctx, taskKey).Val() != 0 {
|
||||||
|
t.Errorf("task key %q still exists", taskKey)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user