mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 15:52:18 +08:00
Fix DeleteAggregationSet
This commit is contained in:
parent
1c388baf06
commit
e63f41fb24
@ -1138,6 +1138,7 @@ func (r *RDB) ReadAggregationSet(qname, gname, setID string) ([]*base.TaskMessag
|
|||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>
|
// KEYS[1] -> asynq:{<qname>}:g:<gname>:<aggregation_set_id>
|
||||||
|
// KEYS[2] -> asynq:{<qname>}:aggregation_sets
|
||||||
// -------
|
// -------
|
||||||
// ARGV[1] -> task key prefix
|
// ARGV[1] -> task key prefix
|
||||||
var deleteAggregationSetCmd = redis.NewScript(`
|
var deleteAggregationSetCmd = redis.NewScript(`
|
||||||
@ -1146,13 +1147,18 @@ for _, id in ipairs(ids) do
|
|||||||
redis.call("DEL", ARGV[1] .. id)
|
redis.call("DEL", ARGV[1] .. id)
|
||||||
end
|
end
|
||||||
redis.call("DEL", KEYS[1])
|
redis.call("DEL", KEYS[1])
|
||||||
|
redis.call("ZREM", KEYS[2], KEYS[1])
|
||||||
return redis.status_reply("OK")
|
return redis.status_reply("OK")
|
||||||
`)
|
`)
|
||||||
|
|
||||||
// DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.
|
// DeleteAggregationSet deletes the aggregation set and its members identified by the parameters.
|
||||||
func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error {
|
func (r *RDB) DeleteAggregationSet(ctx context.Context, qname, gname, setID string) error {
|
||||||
var op errors.Op = "RDB.DeleteAggregationSet"
|
var op errors.Op = "RDB.DeleteAggregationSet"
|
||||||
return r.runScript(ctx, op, deleteAggregationSetCmd, []string{base.AggregationSetKey(qname, gname, setID)}, base.TaskKeyPrefix(qname))
|
keys := []string{
|
||||||
|
base.AggregationSetKey(qname, gname, setID),
|
||||||
|
base.AllAggregationSets(qname),
|
||||||
|
}
|
||||||
|
return r.runScript(ctx, op, deleteAggregationSetCmd, keys, base.TaskKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:aggregation_sets
|
// KEYS[1] -> asynq:{<qname>}:aggregation_sets
|
||||||
|
@ -3356,6 +3356,7 @@ func TestAggregationCheck(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Rewrite this test with the new pattern of using redis key-value as data.
|
||||||
func TestDeleteAggregationSet(t *testing.T) {
|
func TestDeleteAggregationSet(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
@ -3388,11 +3389,16 @@ func TestDeleteAggregationSet(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
h.SeedAggregationSet(t, r.client, tc.aggregationSet, tc.qname, tc.gname, tc.setID)
|
h.SeedAggregationSet(t, r.client, tc.aggregationSet, tc.qname, tc.gname, tc.setID)
|
||||||
|
key := base.AggregationSetKey(tc.qname, tc.gname, tc.setID)
|
||||||
|
if err := r.client.ZAdd(context.Background(),
|
||||||
|
base.AllAggregationSets(tc.qname),
|
||||||
|
&redis.Z{Member: key, Score: float64(now.Add(aggregationTimeout).Unix())}).Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := r.DeleteAggregationSet(ctx, tc.qname, tc.gname, tc.setID); err != nil {
|
if err := r.DeleteAggregationSet(ctx, tc.qname, tc.gname, tc.setID); err != nil {
|
||||||
t.Fatalf("DeleteAggregationSet returned error: %v", err)
|
t.Fatalf("DeleteAggregationSet returned error: %v", err)
|
||||||
}
|
}
|
||||||
key := base.AggregationSetKey(tc.qname, tc.gname, tc.setID)
|
|
||||||
// Check if the set is deleted.
|
// Check if the set is deleted.
|
||||||
if r.client.Exists(ctx, key).Val() != 0 {
|
if r.client.Exists(ctx, key).Val() != 0 {
|
||||||
t.Errorf("aggregation set key %q still exists", key)
|
t.Errorf("aggregation set key %q still exists", key)
|
||||||
@ -3405,6 +3411,10 @@ func TestDeleteAggregationSet(t *testing.T) {
|
|||||||
t.Errorf("task key %q still exists", taskKey)
|
t.Errorf("task key %q still exists", taskKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, err := r.client.ZScore(ctx, base.AllAggregationSets(tc.qname), key).Result(); err != redis.Nil {
|
||||||
|
t.Errorf("aggregation_set key %q is still in key %q", key, base.AllAggregationSets(tc.qname))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user