mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-19 21:07:05 +08:00
Compare commits
7 Commits
dependabot
...
sohail/pm-
Author | SHA1 | Date | |
---|---|---|---|
|
f1e7dc4056 | ||
|
ee17997650 | ||
|
106c07adaa | ||
|
1c7195ff1a | ||
|
12cbba4926 | ||
|
80479b528d | ||
|
e14c312fe3 |
2
.github/workflows/build.yml
vendored
2
.github/workflows/build.yml
vendored
@@ -39,7 +39,7 @@ jobs:
|
||||
run: go test -run=^$ -bench=. -loglevel=debug ./...
|
||||
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v4
|
||||
uses: codecov/codecov-action@v5
|
||||
|
||||
build-tool:
|
||||
strategy:
|
||||
|
4
go.mod
4
go.mod
@@ -9,9 +9,9 @@ require (
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/spf13/cast v1.7.0
|
||||
go.uber.org/goleak v1.3.0
|
||||
golang.org/x/sys v0.26.0
|
||||
golang.org/x/sys v0.27.0
|
||||
golang.org/x/time v0.8.0
|
||||
google.golang.org/protobuf v1.35.1
|
||||
google.golang.org/protobuf v1.35.2
|
||||
)
|
||||
|
||||
require (
|
||||
|
8
go.sum
8
go.sum
@@ -32,11 +32,11 @@ github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PK
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
|
||||
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg=
|
||||
golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
|
||||
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io=
|
||||
google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
@@ -10,6 +10,8 @@ import (
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// PeriodicTaskManager manages scheduling of periodic tasks.
|
||||
@@ -27,9 +29,12 @@ type PeriodicTaskManagerOpts struct {
|
||||
// Required: must be non nil
|
||||
PeriodicTaskConfigProvider PeriodicTaskConfigProvider
|
||||
|
||||
// Required: must be non nil
|
||||
// Optional: if RedisUniversalClient is nil must be non nil
|
||||
RedisConnOpt RedisConnOpt
|
||||
|
||||
// Optional: if RedisUniversalClient is non nil, RedisConnOpt is ignored.
|
||||
RedisUniversalClient redis.UniversalClient
|
||||
|
||||
// Optional: scheduler options
|
||||
*SchedulerOpts
|
||||
|
||||
@@ -45,10 +50,16 @@ func NewPeriodicTaskManager(opts PeriodicTaskManagerOpts) (*PeriodicTaskManager,
|
||||
if opts.PeriodicTaskConfigProvider == nil {
|
||||
return nil, fmt.Errorf("PeriodicTaskConfigProvider cannot be nil")
|
||||
}
|
||||
if opts.RedisConnOpt == nil {
|
||||
return nil, fmt.Errorf("RedisConnOpt cannot be nil")
|
||||
if opts.RedisConnOpt == nil && opts.RedisUniversalClient == nil {
|
||||
return nil, fmt.Errorf("RedisConnOpt/RedisUniversalClient cannot be nil")
|
||||
}
|
||||
scheduler := NewScheduler(opts.RedisConnOpt, opts.SchedulerOpts)
|
||||
var scheduler *Scheduler
|
||||
if opts.RedisUniversalClient != nil {
|
||||
scheduler = NewSchedulerFromRedisClient(opts.RedisUniversalClient, opts.SchedulerOpts)
|
||||
} else {
|
||||
scheduler = NewScheduler(opts.RedisConnOpt, opts.SchedulerOpts)
|
||||
}
|
||||
|
||||
syncInterval := opts.SyncInterval
|
||||
if syncInterval == 0 {
|
||||
syncInterval = defaultSyncInterval
|
||||
@@ -172,8 +183,8 @@ func (mgr *PeriodicTaskManager) add(configs []*PeriodicTaskConfig) {
|
||||
for _, c := range configs {
|
||||
entryID, err := mgr.s.Register(c.Cronspec, c.Task, c.Opts...)
|
||||
if err != nil {
|
||||
mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q",
|
||||
c.Cronspec, c.Task.Type())
|
||||
mgr.s.logger.Errorf("Failed to register periodic task: cronspec=%q task=%q err=%v",
|
||||
c.Cronspec, c.Task.Type(), err)
|
||||
continue
|
||||
}
|
||||
mgr.m[c.hash()] = entryID
|
||||
|
28
scheduler.go
28
scheduler.go
@@ -44,9 +44,6 @@ type Scheduler struct {
|
||||
// to avoid using cron.EntryID as the public API of
|
||||
// the Scheduler.
|
||||
idmap map[string]cron.EntryID
|
||||
// When a Scheduler has been created with an existing Redis connection, we do
|
||||
// not want to close it.
|
||||
sharedConnection bool
|
||||
}
|
||||
|
||||
const defaultHeartbeatInterval = 10 * time.Second
|
||||
@@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second
|
||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||
// The parameter opts is optional, defaults will be used if opts is set to nil
|
||||
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||
scheduler := newScheduler(opts)
|
||||
|
||||
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||
}
|
||||
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
|
||||
scheduler.sharedConnection = false
|
||||
|
||||
rdb := rdb.NewRDB(redisClient)
|
||||
|
||||
scheduler.rdb = rdb
|
||||
scheduler.client = &Client{broker: rdb, sharedConnection: false}
|
||||
|
||||
return scheduler
|
||||
}
|
||||
|
||||
@@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
||||
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
||||
scheduler := newScheduler(opts)
|
||||
|
||||
scheduler.rdb = rdb.NewRDB(c)
|
||||
scheduler.client = NewClientFromRedisClient(c)
|
||||
|
||||
return scheduler
|
||||
}
|
||||
|
||||
func newScheduler(opts *SchedulerOpts) *Scheduler {
|
||||
if opts == nil {
|
||||
opts = &SchedulerOpts{}
|
||||
}
|
||||
@@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
|
||||
state: &serverState{value: srvStateNew},
|
||||
heartbeatInterval: heartbeatInterval,
|
||||
logger: logger,
|
||||
client: NewClientFromRedisClient(c),
|
||||
rdb: rdb.NewRDB(c),
|
||||
cron: cron.New(cron.WithLocation(loc)),
|
||||
location: loc,
|
||||
done: make(chan struct{}),
|
||||
@@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
|
||||
if err := s.client.Close(); err != nil {
|
||||
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
||||
}
|
||||
if !s.sharedConnection {
|
||||
s.rdb.Close()
|
||||
}
|
||||
s.logger.Info("Scheduler stopped")
|
||||
}
|
||||
|
||||
@@ -334,7 +341,6 @@ func (s *Scheduler) beat() {
|
||||
}
|
||||
entries = append(entries, e)
|
||||
}
|
||||
s.logger.Debugf("Writing entries %v", entries)
|
||||
if err := s.rdb.WriteSchedulerEntries(s.id, entries, s.heartbeatInterval*2); err != nil {
|
||||
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user