2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00
Files
asynq/client.go
2020-01-02 18:13:16 -08:00

98 lines
2.2 KiB
Go

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"time"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
)
// A Client is responsible for scheduling tasks.
//
// A Client is used to register tasks that should be processed
// immediately or some time in the future.
//
// Clients are safe for concurrent use by multiple goroutines.
type Client struct {
rdb *rdb.RDB
}
// NewClient and returns a new Client given a redis configuration.
func NewClient(r *redis.Client) *Client {
rdb := rdb.NewRDB(r)
return &Client{rdb}
}
// Option specifies the processing behavior for the associated task.
type Option interface{}
// max number of times a task will be retried.
type retryOption int
// MaxRetry returns an option to specify the max number of times
// a task will be retried.
//
// Negative retry count is treated as zero retry.
func MaxRetry(n int) Option {
if n < 0 {
n = 0
}
return retryOption(n)
}
type option struct {
retry int
}
func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
}
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res.retry = int(opt)
default:
// ignore unexpected option
}
}
return res
}
const (
// Max retry count by default
defaultMaxRetry = 25
)
// Process registers a task to be processed at the specified time.
//
// Process returns nil if the task is registered successfully,
// otherwise returns non-nil error.
//
// opts specifies the behavior of task processing. If there are conflicting
// Option the last one overrides the ones before.
func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error {
opt := composeOptions(opts...)
msg := &base.TaskMessage{
ID: xid.New(),
Type: task.Type,
Payload: task.Payload,
Queue: "default",
Retry: opt.retry,
}
return c.enqueue(msg, processAt)
}
func (c *Client) enqueue(msg *base.TaskMessage, processAt time.Time) error {
if time.Now().After(processAt) {
return c.rdb.Enqueue(msg)
}
return c.rdb.Schedule(msg, processAt)
}