mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-19 21:07:05 +08:00
Compare commits
7 Commits
sohail/pm-
...
sohail/rec
Author | SHA1 | Date | |
---|---|---|---|
|
7c3d57d30e | ||
|
c327bc40a2 | ||
|
ea0c6e93f0 | ||
|
489e21920b | ||
|
043dcfbf56 | ||
|
02907551b4 | ||
|
127fac2e90 |
19
CHANGELOG.md
19
CHANGELOG.md
@@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.25.1] - 2024-12-11
|
||||||
|
|
||||||
|
### Upgrades
|
||||||
|
|
||||||
|
* Some packages
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
|
||||||
|
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
|
||||||
|
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
|
||||||
|
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
|
||||||
|
|
||||||
|
### Fixes
|
||||||
|
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
|
||||||
|
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
|
||||||
|
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
|
||||||
|
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
|
||||||
|
|
||||||
## [0.25.0] - 2024-10-29
|
## [0.25.0] - 2024-10-29
|
||||||
|
|
||||||
### Upgrades
|
### Upgrades
|
||||||
|
@@ -23,7 +23,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Version of asynq library and CLI.
|
// Version of asynq library and CLI.
|
||||||
const Version = "0.25.0"
|
const Version = "0.25.1"
|
||||||
|
|
||||||
// DefaultQueueName is the queue name used if none are specified by user.
|
// DefaultQueueName is the queue name used if none are specified by user.
|
||||||
const DefaultQueueName = "default"
|
const DefaultQueueName = "default"
|
||||||
|
@@ -95,7 +95,7 @@ func (r *recoverer) recoverLeaseExpiredTasks() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
if msg.Retried >= msg.Retry {
|
if msg.Retried >= msg.Retry && msg.Retry > 0 {
|
||||||
r.archive(msg, ErrLeaseExpired)
|
r.archive(msg, ErrLeaseExpired)
|
||||||
} else {
|
} else {
|
||||||
r.retry(msg, ErrLeaseExpired)
|
r.retry(msg, ErrLeaseExpired)
|
||||||
|
27
scheduler.go
27
scheduler.go
@@ -44,9 +44,6 @@ type Scheduler struct {
|
|||||||
// to avoid using cron.EntryID as the public API of
|
// to avoid using cron.EntryID as the public API of
|
||||||
// the Scheduler.
|
// the Scheduler.
|
||||||
idmap map[string]cron.EntryID
|
idmap map[string]cron.EntryID
|
||||||
// When a Scheduler has been created with an existing Redis connection, we do
|
|
||||||
// not want to close it.
|
|
||||||
sharedConnection bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultHeartbeatInterval = 10 * time.Second
|
const defaultHeartbeatInterval = 10 * time.Second
|
||||||
@@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second
|
|||||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||||
// The parameter opts is optional, defaults will be used if opts is set to nil
|
// The parameter opts is optional, defaults will be used if opts is set to nil
|
||||||
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||||
|
scheduler := newScheduler(opts)
|
||||||
|
|
||||||
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||||
}
|
}
|
||||||
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
|
|
||||||
scheduler.sharedConnection = false
|
rdb := rdb.NewRDB(redisClient)
|
||||||
|
|
||||||
|
scheduler.rdb = rdb
|
||||||
|
scheduler.client = &Client{broker: rdb, sharedConnection: false}
|
||||||
|
|
||||||
return scheduler
|
return scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
|||||||
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
||||||
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||||
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
||||||
|
scheduler := newScheduler(opts)
|
||||||
|
|
||||||
|
scheduler.rdb = rdb.NewRDB(c)
|
||||||
|
scheduler.client = NewClientFromRedisClient(c)
|
||||||
|
|
||||||
|
return scheduler
|
||||||
|
}
|
||||||
|
|
||||||
|
func newScheduler(opts *SchedulerOpts) *Scheduler {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = &SchedulerOpts{}
|
opts = &SchedulerOpts{}
|
||||||
}
|
}
|
||||||
@@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
|
|||||||
state: &serverState{value: srvStateNew},
|
state: &serverState{value: srvStateNew},
|
||||||
heartbeatInterval: heartbeatInterval,
|
heartbeatInterval: heartbeatInterval,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
client: NewClientFromRedisClient(c),
|
|
||||||
rdb: rdb.NewRDB(c),
|
|
||||||
cron: cron.New(cron.WithLocation(loc)),
|
cron: cron.New(cron.WithLocation(loc)),
|
||||||
location: loc,
|
location: loc,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
@@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
|
|||||||
if err := s.client.Close(); err != nil {
|
if err := s.client.Close(); err != nil {
|
||||||
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
||||||
}
|
}
|
||||||
if !s.sharedConnection {
|
|
||||||
s.rdb.Close()
|
|
||||||
}
|
|
||||||
s.logger.Info("Scheduler stopped")
|
s.logger.Info("Scheduler stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -174,16 +174,15 @@ type Config struct {
|
|||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||||
|
//
|
||||||
// we can also handle panic error like:
|
// we can also handle panic error like:
|
||||||
// func reportError(ctx context, task *asynq.Task, err error) {
|
// func reportError(ctx context, task *asynq.Task, err error) {
|
||||||
// if asynq.IsPanic(err) {
|
// if asynq.IsPanicError(err) {
|
||||||
// errorReportingService.Notify(err)
|
// errorReportingService.Notify(err)
|
||||||
// }
|
// }
|
||||||
// })
|
// })
|
||||||
//
|
//
|
||||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||||
|
|
||||||
ErrorHandler ErrorHandler
|
ErrorHandler ErrorHandler
|
||||||
|
|
||||||
// Logger specifies the logger used by the server instance.
|
// Logger specifies the logger used by the server instance.
|
||||||
|
@@ -24,8 +24,10 @@ func (srv *Server) waitForSignals() {
|
|||||||
if sig == unix.SIGTSTP {
|
if sig == unix.SIGTSTP {
|
||||||
srv.Stop()
|
srv.Stop()
|
||||||
continue
|
continue
|
||||||
|
} else {
|
||||||
|
srv.Stop()
|
||||||
|
break
|
||||||
}
|
}
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -40,6 +40,7 @@ var (
|
|||||||
useRedisCluster bool
|
useRedisCluster bool
|
||||||
clusterAddrs string
|
clusterAddrs string
|
||||||
tlsServerName string
|
tlsServerName string
|
||||||
|
insecure bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// rootCmd represents the base command when called without any subcommands
|
// rootCmd represents the base command when called without any subcommands
|
||||||
@@ -314,6 +315,8 @@ func init() {
|
|||||||
"List of comma-separated redis server addresses")
|
"List of comma-separated redis server addresses")
|
||||||
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
|
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
|
||||||
"", "Server name for TLS validation")
|
"", "Server name for TLS validation")
|
||||||
|
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
|
||||||
|
false, "Allow insecure TLS connection by skipping cert validation")
|
||||||
// Bind flags with config.
|
// Bind flags with config.
|
||||||
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
|
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
|
||||||
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
|
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
|
||||||
@@ -321,6 +324,7 @@ func init() {
|
|||||||
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
|
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
|
||||||
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
|
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
|
||||||
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
|
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
|
||||||
|
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// initConfig reads in config file and ENV variables if set.
|
// initConfig reads in config file and ENV variables if set.
|
||||||
@@ -402,7 +406,7 @@ func getTLSConfig() *tls.Config {
|
|||||||
if tlsServer == "" {
|
if tlsServer == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return &tls.Config{ServerName: tlsServer}
|
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
|
||||||
}
|
}
|
||||||
|
|
||||||
// printTable is a helper function to print data in table format.
|
// printTable is a helper function to print data in table format.
|
||||||
|
Reference in New Issue
Block a user