diff --git a/internal/base/base.go b/internal/base/base.go index 0cda45f..98694ff 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -200,6 +200,21 @@ func UniqueKey(qname, tasktype string, payload []byte) string { return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:])) } +// GroupKey returns a redis key used to group tasks belong in the same group. +func GroupKey(qname, gkey string) string { + return fmt.Sprintf("%sg:%s", QueueKeyPrefix(qname), gkey) +} + +// AllGroups return a redis key used to store all group keys used in a given queue. +func AllGroups(qname string) string { + return fmt.Sprintf("%sgroups", QueueKeyPrefix(qname)) +} + +// AllStagedGroups returns a redis key used to store all groups staged to be aggregated in a given queue. +func AllStagedGroups(qname string) string { + return fmt.Sprintf("%sstaged_groups", QueueKeyPrefix(qname)) +} + // TaskMessage is the internal representation of a task with additional metadata fields. // Serialized data of this type gets written to redis. type TaskMessage struct { diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 27bc9af..f4eefb0 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -395,6 +395,78 @@ func TestUniqueKey(t *testing.T) { } } +func TestGroupKey(t *testing.T) { + tests := []struct { + qname string + gkey string + want string + }{ + { + qname: "default", + gkey: "mygroup", + want: "asynq:{default}:g:mygroup", + }, + { + qname: "custom", + gkey: "foo", + want: "asynq:{custom}:g:foo", + }, + } + + for _, tc := range tests { + got := GroupKey(tc.qname, tc.gkey) + if got != tc.want { + t.Errorf("GroupKey(%q, %q) = %q, want %q", tc.qname, tc.gkey, got, tc.want) + } + } +} + +func TestAllGroups(t *testing.T) { + tests := []struct { + qname string + want string + }{ + { + qname: "default", + want: "asynq:{default}:groups", + }, + { + qname: "custom", + want: "asynq:{custom}:groups", + }, + } + + for _, tc := range tests { + got := AllGroups(tc.qname) + if got != tc.want { + t.Errorf("AllGroups(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + +func TestAllStagedGroups(t *testing.T) { + tests := []struct { + qname string + want string + }{ + { + qname: "default", + want: "asynq:{default}:staged_groups", + }, + { + qname: "custom", + want: "asynq:{custom}:staged_groups", + }, + } + + for _, tc := range tests { + got := AllStagedGroups(tc.qname) + if got != tc.want { + t.Errorf("AllStagedGroups(%q) = %q, want %q", tc.qname, got, tc.want) + } + } +} + func TestMessageEncoding(t *testing.T) { id := uuid.NewString() tests := []struct {