mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Make GroupMaxSize and GroupMaxDelay config optional
This commit is contained in:
parent
196db64d4d
commit
888b5590fb
@ -137,7 +137,7 @@ func (a *aggregator) aggregate(t time.Time) {
|
|||||||
}
|
}
|
||||||
for _, gname := range groups {
|
for _, gname := range groups {
|
||||||
aggregationSetID, err := a.broker.AggregationCheck(
|
aggregationSetID, err := a.broker.AggregationCheck(
|
||||||
qname, gname, t.Add(-a.gracePeriod), t.Add(-a.maxDelay), a.maxSize)
|
qname, gname, t, a.gracePeriod, a.maxDelay, a.maxSize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.logger.Errorf("Failed to run aggregation check: queue=%q group=%q", qname, gname)
|
a.logger.Errorf("Failed to run aggregation check: queue=%q group=%q", qname, gname)
|
||||||
continue
|
continue
|
||||||
|
@ -731,7 +731,7 @@ type Broker interface {
|
|||||||
AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error
|
AddToGroup(ctx context.Context, msg *TaskMessage, gname string) error
|
||||||
AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error
|
AddToGroupUnique(ctx context.Context, msg *TaskMessage, groupKey string, ttl time.Duration) error
|
||||||
ListGroups(qname string) ([]string, error)
|
ListGroups(qname string) ([]string, error)
|
||||||
AggregationCheck(qname, gname string, gracePeriodStartTime, maxDelayTime time.Time, maxSize int) (aggregationSetID string, err error)
|
AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (aggregationSetID string, err error)
|
||||||
ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error)
|
ReadAggregationSet(qname, gname, aggregationSetID string) ([]*TaskMessage, time.Time, error)
|
||||||
DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error
|
DeleteAggregationSet(ctx context.Context, qname, gname, aggregationSetID string) error
|
||||||
|
|
||||||
|
@ -1003,6 +1003,7 @@ func (r *RDB) ListGroups(qname string) ([]string, error) {
|
|||||||
// ARGV[3] -> start time of the grace period
|
// ARGV[3] -> start time of the grace period
|
||||||
// ARGV[4] -> aggregation set ID
|
// ARGV[4] -> aggregation set ID
|
||||||
// ARGV[5] -> aggregation set expire time
|
// ARGV[5] -> aggregation set expire time
|
||||||
|
// ARGV[6] -> current time in unix time
|
||||||
//
|
//
|
||||||
// Output:
|
// Output:
|
||||||
// Returns 0 if no aggregation set was created
|
// Returns 0 if no aggregation set was created
|
||||||
@ -1013,7 +1014,7 @@ if size == 0 then
|
|||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
local maxSize = tonumber(ARGV[1])
|
local maxSize = tonumber(ARGV[1])
|
||||||
if size >= maxSize then
|
if maxSize ~= 0 and size >= maxSize then
|
||||||
local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1)
|
local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1)
|
||||||
for _, msg in ipairs(msgs) do
|
for _, msg in ipairs(msgs) do
|
||||||
redis.call("SADD", KEYS[2], msg)
|
redis.call("SADD", KEYS[2], msg)
|
||||||
@ -1022,21 +1023,25 @@ if size >= maxSize then
|
|||||||
redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4])
|
redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4])
|
||||||
return 1
|
return 1
|
||||||
end
|
end
|
||||||
local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")
|
local maxDelay = tonumber(ARGV[2])
|
||||||
local oldestEntryScore = tonumber(oldestEntry[2])
|
local currentTime = tonumber(ARGV[6])
|
||||||
local maxDelayTime = tonumber(ARGV[2])
|
if maxDelay ~= 0 then
|
||||||
if oldestEntryScore <= maxDelayTime then
|
local oldestEntry = redis.call("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")
|
||||||
local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1)
|
local oldestEntryScore = tonumber(oldestEntry[2])
|
||||||
for _, msg in ipairs(msgs) do
|
local maxDelayTime = currentTime - maxDelay
|
||||||
redis.call("SADD", KEYS[2], msg)
|
if oldestEntryScore <= maxDelayTime then
|
||||||
|
local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1)
|
||||||
|
for _, msg in ipairs(msgs) do
|
||||||
|
redis.call("SADD", KEYS[2], msg)
|
||||||
|
end
|
||||||
|
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
|
||||||
|
redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4])
|
||||||
|
return 1
|
||||||
end
|
end
|
||||||
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, maxSize-1)
|
|
||||||
redis.call("ZADD", KEYS[3], ARGV[5], ARGV[4])
|
|
||||||
return 1
|
|
||||||
end
|
end
|
||||||
local latestEntry = redis.call("ZREVRANGE", KEYS[1], 0, 0, "WITHSCORES")
|
local latestEntry = redis.call("ZREVRANGE", KEYS[1], 0, 0, "WITHSCORES")
|
||||||
local latestEntryScore = tonumber(latestEntry[2])
|
local latestEntryScore = tonumber(latestEntry[2])
|
||||||
local gracePeriodStartTime = tonumber(ARGV[3])
|
local gracePeriodStartTime = currentTime - tonumber(ARGV[3])
|
||||||
if latestEntryScore <= gracePeriodStartTime then
|
if latestEntryScore <= gracePeriodStartTime then
|
||||||
local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1)
|
local msgs = redis.call("ZRANGE", KEYS[1], 0, maxSize-1)
|
||||||
for _, msg in ipairs(msgs) do
|
for _, msg in ipairs(msgs) do
|
||||||
@ -1055,11 +1060,12 @@ const aggregationTimeout = 2 * time.Minute
|
|||||||
|
|
||||||
// AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the
|
// AggregationCheck checks the group identified by the given queue and group name to see if the tasks in the
|
||||||
// group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns
|
// group are ready to be aggregated. If so, it moves the tasks to be aggregated to a aggregation set and returns
|
||||||
// set ID. If not, it returns an empty string for the set ID.
|
// the set ID. If not, it returns an empty string for the set ID.
|
||||||
|
// The time for gracePeriod and maxDelay is computed relative to the time t.
|
||||||
//
|
//
|
||||||
// Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words,
|
// Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words,
|
||||||
// the function only checks the most recently added task aganist the given gracePeriod.
|
// the function only checks the most recently added task aganist the given gracePeriod.
|
||||||
func (r *RDB) AggregationCheck(qname, gname string, gracePeriodStartTime, maxDelayTime time.Time, maxSize int) (string, error) {
|
func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error) {
|
||||||
var op errors.Op = "RDB.AggregationCheck"
|
var op errors.Op = "RDB.AggregationCheck"
|
||||||
aggregationSetID := uuid.NewString()
|
aggregationSetID := uuid.NewString()
|
||||||
expireTime := r.clock.Now().Add(aggregationTimeout)
|
expireTime := r.clock.Now().Add(aggregationTimeout)
|
||||||
@ -1070,10 +1076,11 @@ func (r *RDB) AggregationCheck(qname, gname string, gracePeriodStartTime, maxDel
|
|||||||
}
|
}
|
||||||
argv := []interface{}{
|
argv := []interface{}{
|
||||||
maxSize,
|
maxSize,
|
||||||
maxDelayTime.Unix(),
|
int64(maxDelay.Seconds()),
|
||||||
gracePeriodStartTime.Unix(),
|
int64(gracePeriod.Seconds()),
|
||||||
aggregationSetID,
|
aggregationSetID,
|
||||||
expireTime.Unix(),
|
expireTime.Unix(),
|
||||||
|
t.Unix(),
|
||||||
}
|
}
|
||||||
n, err := r.runScriptWithErrorCode(context.Background(), op, aggregationCheckCmd, keys, argv...)
|
n, err := r.runScriptWithErrorCode(context.Background(), op, aggregationCheckCmd, keys, argv...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -3254,15 +3254,65 @@ func TestAggregationCheck(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "with unlimited size",
|
||||||
|
groups: map[string]map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
"mygroup": {
|
||||||
|
{Message: msg1, Score: now.Add(-15 * time.Minute).Unix()},
|
||||||
|
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
|
||||||
|
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
|
||||||
|
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
|
||||||
|
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
gname: "mygroup",
|
||||||
|
gracePeriod: 1 * time.Minute,
|
||||||
|
maxDelay: 30 * time.Minute,
|
||||||
|
maxSize: 0, // maxSize=0 indicates no size limit
|
||||||
|
shouldCreateSet: false,
|
||||||
|
wantAggregationSet: nil,
|
||||||
|
wantGroups: map[string]map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
"mygroup": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "with unlimited delay",
|
||||||
|
groups: map[string]map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
"mygroup": {
|
||||||
|
{Message: msg1, Score: now.Add(-15 * time.Minute).Unix()},
|
||||||
|
{Message: msg2, Score: now.Add(-3 * time.Minute).Unix()},
|
||||||
|
{Message: msg3, Score: now.Add(-2 * time.Minute).Unix()},
|
||||||
|
{Message: msg4, Score: now.Add(-1 * time.Minute).Unix()},
|
||||||
|
{Message: msg5, Score: now.Add(-10 * time.Second).Unix()},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
gname: "mygroup",
|
||||||
|
gracePeriod: 1 * time.Minute,
|
||||||
|
maxDelay: 0, // maxDelay=0 indicates no limit
|
||||||
|
maxSize: 10,
|
||||||
|
shouldCreateSet: false,
|
||||||
|
wantAggregationSet: nil,
|
||||||
|
wantGroups: map[string]map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
"mygroup": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
h.SeedAllGroups(t, r.client, tc.groups)
|
h.SeedAllGroups(t, r.client, tc.groups)
|
||||||
|
|
||||||
gracePeriodStartTime := now.Add(-tc.gracePeriod)
|
aggregationSetID, err := r.AggregationCheck(tc.qname, tc.gname, now, tc.gracePeriod, tc.maxDelay, tc.maxSize)
|
||||||
maxDelayTime := now.Add(-tc.maxDelay)
|
|
||||||
aggregationSetID, err := r.AggregationCheck(tc.qname, tc.gname, gracePeriodStartTime, maxDelayTime, tc.maxSize)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s: AggregationCheck returned error: %v", tc.desc, err)
|
t.Errorf("%s: AggregationCheck returned error: %v", tc.desc, err)
|
||||||
continue
|
continue
|
||||||
|
22
server.go
22
server.go
@ -197,7 +197,7 @@ type Config struct {
|
|||||||
|
|
||||||
// GroupGracePeriod specifies the amount of time the server will wait for an incoming task before aggregating
|
// GroupGracePeriod specifies the amount of time the server will wait for an incoming task before aggregating
|
||||||
// the tasks in a group. If an incoming task is received within this period, the server will wait for another
|
// the tasks in a group. If an incoming task is received within this period, the server will wait for another
|
||||||
// period of the same length, up to GroupMaxDelay.
|
// period of the same length, up to GroupMaxDelay if specified.
|
||||||
//
|
//
|
||||||
// If unset or zero, the grace period is set to 1 minute.
|
// If unset or zero, the grace period is set to 1 minute.
|
||||||
// Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to
|
// Minimum duration for GroupGracePeriod is 1 second. If value specified is less than a second, the call to
|
||||||
@ -207,13 +207,13 @@ type Config struct {
|
|||||||
// GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating
|
// GroupMaxDelay specifies the maximum amount of time the server will wait for incoming tasks before aggregating
|
||||||
// the tasks in a group.
|
// the tasks in a group.
|
||||||
//
|
//
|
||||||
// If unset or zero, the max delay is set to 10 minutes.
|
// If unset or zero, no delay limit is used.
|
||||||
GroupMaxDelay time.Duration
|
GroupMaxDelay time.Duration
|
||||||
|
|
||||||
// GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group.
|
// GroupMaxSize specifies the maximum number of tasks that can be aggregated into a single task within a group.
|
||||||
// If GroupMaxSize is reached, the server will aggregate the tasks into one immediately.
|
// If GroupMaxSize is reached, the server will aggregate the tasks into one immediately.
|
||||||
//
|
//
|
||||||
// If unset or zero, the max size is set to 100.
|
// If unset or zero, no size limit is used.
|
||||||
GroupMaxSize int
|
GroupMaxSize int
|
||||||
|
|
||||||
// GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into one task.
|
// GroupAggregateFunc specifies the aggregation function used to aggregate multiple tasks in a group into one task.
|
||||||
@ -367,10 +367,6 @@ const (
|
|||||||
defaultDelayedTaskCheckInterval = 5 * time.Second
|
defaultDelayedTaskCheckInterval = 5 * time.Second
|
||||||
|
|
||||||
defaultGroupGracePeriod = 1 * time.Minute
|
defaultGroupGracePeriod = 1 * time.Minute
|
||||||
|
|
||||||
defaultGroupMaxDelay = 10 * time.Minute
|
|
||||||
|
|
||||||
defaultGroupMaxSize = 100
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewServer returns a new Server given a redis connection option
|
// NewServer returns a new Server given a redis connection option
|
||||||
@ -428,14 +424,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
if groupGracePeriod < time.Second {
|
if groupGracePeriod < time.Second {
|
||||||
panic("GroupGracePeriod cannot be less than a second")
|
panic("GroupGracePeriod cannot be less than a second")
|
||||||
}
|
}
|
||||||
groupMaxDelay := cfg.GroupMaxDelay
|
|
||||||
if groupMaxDelay == 0 {
|
|
||||||
groupMaxDelay = defaultGroupMaxDelay
|
|
||||||
}
|
|
||||||
groupMaxSize := cfg.GroupMaxSize
|
|
||||||
if groupMaxSize == 0 {
|
|
||||||
groupMaxSize = defaultGroupMaxSize
|
|
||||||
}
|
|
||||||
logger := log.NewLogger(cfg.Logger)
|
logger := log.NewLogger(cfg.Logger)
|
||||||
loglevel := cfg.LogLevel
|
loglevel := cfg.LogLevel
|
||||||
if loglevel == level_unspecified {
|
if loglevel == level_unspecified {
|
||||||
@ -522,8 +510,8 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
|
|||||||
broker: rdb,
|
broker: rdb,
|
||||||
queues: qnames,
|
queues: qnames,
|
||||||
gracePeriod: groupGracePeriod,
|
gracePeriod: groupGracePeriod,
|
||||||
maxDelay: groupMaxDelay,
|
maxDelay: cfg.GroupMaxDelay,
|
||||||
maxSize: groupMaxSize,
|
maxSize: cfg.GroupMaxSize,
|
||||||
aggregateFunc: cfg.GroupAggregateFunc,
|
aggregateFunc: cfg.GroupAggregateFunc,
|
||||||
})
|
})
|
||||||
return &Server{
|
return &Server{
|
||||||
|
Loading…
Reference in New Issue
Block a user