mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Add helper functions to generate group key
This commit is contained in:
parent
2ce71e83b0
commit
e3d2939a4c
@ -200,6 +200,21 @@ func UniqueKey(qname, tasktype string, payload []byte) string {
|
|||||||
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:]))
|
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:]))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GroupKey returns a redis key used to group tasks belong in the same group.
|
||||||
|
func GroupKey(qname, gkey string) string {
|
||||||
|
return fmt.Sprintf("%sg:%s", QueueKeyPrefix(qname), gkey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllGroups return a redis key used to store all group keys used in a given queue.
|
||||||
|
func AllGroups(qname string) string {
|
||||||
|
return fmt.Sprintf("%sgroups", QueueKeyPrefix(qname))
|
||||||
|
}
|
||||||
|
|
||||||
|
// AllStagedGroups returns a redis key used to store all groups staged to be aggregated in a given queue.
|
||||||
|
func AllStagedGroups(qname string) string {
|
||||||
|
return fmt.Sprintf("%sstaged_groups", QueueKeyPrefix(qname))
|
||||||
|
}
|
||||||
|
|
||||||
// TaskMessage is the internal representation of a task with additional metadata fields.
|
// TaskMessage is the internal representation of a task with additional metadata fields.
|
||||||
// Serialized data of this type gets written to redis.
|
// Serialized data of this type gets written to redis.
|
||||||
type TaskMessage struct {
|
type TaskMessage struct {
|
||||||
|
@ -395,6 +395,78 @@ func TestUniqueKey(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGroupKey(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
qname string
|
||||||
|
gkey string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
qname: "default",
|
||||||
|
gkey: "mygroup",
|
||||||
|
want: "asynq:{default}:g:mygroup",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
qname: "custom",
|
||||||
|
gkey: "foo",
|
||||||
|
want: "asynq:{custom}:g:foo",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
got := GroupKey(tc.qname, tc.gkey)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("GroupKey(%q, %q) = %q, want %q", tc.qname, tc.gkey, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAllGroups(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
qname string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
qname: "default",
|
||||||
|
want: "asynq:{default}:groups",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
qname: "custom",
|
||||||
|
want: "asynq:{custom}:groups",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
got := AllGroups(tc.qname)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("AllGroups(%q) = %q, want %q", tc.qname, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAllStagedGroups(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
qname string
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
qname: "default",
|
||||||
|
want: "asynq:{default}:staged_groups",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
qname: "custom",
|
||||||
|
want: "asynq:{custom}:staged_groups",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
got := AllStagedGroups(tc.qname)
|
||||||
|
if got != tc.want {
|
||||||
|
t.Errorf("AllStagedGroups(%q) = %q, want %q", tc.qname, got, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestMessageEncoding(t *testing.T) {
|
func TestMessageEncoding(t *testing.T) {
|
||||||
id := uuid.NewString()
|
id := uuid.NewString()
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user