mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-08 18:53:34 +08:00
Update Client to add task to group if Group option is specified
This commit is contained in:
40
client.go
40
client.go
@@ -227,6 +227,7 @@ type option struct {
|
||||
uniqueTTL time.Duration
|
||||
processAt time.Time
|
||||
retention time.Duration
|
||||
groupKey string
|
||||
}
|
||||
|
||||
// composeOptions merges user provided options into the default options
|
||||
@@ -254,8 +255,8 @@ func composeOptions(opts ...Option) (option, error) {
|
||||
res.queue = qname
|
||||
case taskIDOption:
|
||||
id := string(opt)
|
||||
if err := validateTaskID(id); err != nil {
|
||||
return option{}, err
|
||||
if isBlank(id) {
|
||||
return option{}, errors.New("task ID cannot be empty")
|
||||
}
|
||||
res.taskID = id
|
||||
case timeoutOption:
|
||||
@@ -274,6 +275,12 @@ func composeOptions(opts ...Option) (option, error) {
|
||||
res.processAt = time.Now().Add(time.Duration(opt))
|
||||
case retentionOption:
|
||||
res.retention = time.Duration(opt)
|
||||
case groupOption:
|
||||
key := string(opt)
|
||||
if isBlank(key) {
|
||||
return option{}, errors.New("group key cannot be empty")
|
||||
}
|
||||
res.groupKey = key
|
||||
default:
|
||||
// ignore unexpected option
|
||||
}
|
||||
@@ -281,12 +288,9 @@ func composeOptions(opts ...Option) (option, error) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// validates user provided task ID string.
|
||||
func validateTaskID(id string) error {
|
||||
if strings.TrimSpace(id) == "" {
|
||||
return errors.New("task ID cannot be empty")
|
||||
}
|
||||
return nil
|
||||
// isBlank returns true if the given s is empty or consist of all whitespaces.
|
||||
func isBlank(s string) bool {
|
||||
return strings.TrimSpace(s) == ""
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -375,13 +379,18 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option)
|
||||
}
|
||||
now := time.Now()
|
||||
var state base.TaskState
|
||||
if opt.processAt.Before(now) || opt.processAt.Equal(now) {
|
||||
if opt.processAt.After(now) {
|
||||
err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
|
||||
state = base.TaskStateScheduled
|
||||
} else if opt.groupKey != "" {
|
||||
// Use zero value for processAt since we don't know when the task will be aggregated and processed.
|
||||
opt.processAt = time.Time{}
|
||||
err = c.addToGroup(ctx, msg, opt.groupKey, opt.uniqueTTL)
|
||||
state = base.TaskStateAggregating
|
||||
} else {
|
||||
opt.processAt = now
|
||||
err = c.enqueue(ctx, msg, opt.uniqueTTL)
|
||||
state = base.TaskStatePending
|
||||
} else {
|
||||
err = c.schedule(ctx, msg, opt.processAt, opt.uniqueTTL)
|
||||
state = base.TaskStateScheduled
|
||||
}
|
||||
switch {
|
||||
case errors.Is(err, errors.ErrDuplicateTask):
|
||||
@@ -408,3 +417,10 @@ func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Tim
|
||||
}
|
||||
return c.rdb.Schedule(ctx, msg, t)
|
||||
}
|
||||
|
||||
func (c *Client) addToGroup(ctx context.Context, msg *base.TaskMessage, groupKey string, uniqueTTL time.Duration) error {
|
||||
if uniqueTTL > 0 {
|
||||
return c.rdb.AddToGroupUnique(ctx, msg, groupKey, uniqueTTL)
|
||||
}
|
||||
return c.rdb.AddToGroup(ctx, msg, groupKey)
|
||||
}
|
||||
|
Reference in New Issue
Block a user