mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-19 21:07:05 +08:00
Compare commits
7 Commits
sohail/pm-
...
sohail/rec
Author | SHA1 | Date | |
---|---|---|---|
|
7c3d57d30e | ||
|
c327bc40a2 | ||
|
ea0c6e93f0 | ||
|
489e21920b | ||
|
043dcfbf56 | ||
|
02907551b4 | ||
|
127fac2e90 |
19
CHANGELOG.md
19
CHANGELOG.md
@@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.25.1] - 2024-12-11
|
||||
|
||||
### Upgrades
|
||||
|
||||
* Some packages
|
||||
|
||||
### Added
|
||||
|
||||
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
|
||||
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
|
||||
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
|
||||
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
|
||||
|
||||
### Fixes
|
||||
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
|
||||
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
|
||||
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
|
||||
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
|
||||
|
||||
## [0.25.0] - 2024-10-29
|
||||
|
||||
### Upgrades
|
||||
|
@@ -23,7 +23,7 @@ import (
|
||||
)
|
||||
|
||||
// Version of asynq library and CLI.
|
||||
const Version = "0.25.0"
|
||||
const Version = "0.25.1"
|
||||
|
||||
// DefaultQueueName is the queue name used if none are specified by user.
|
||||
const DefaultQueueName = "default"
|
||||
|
@@ -95,7 +95,7 @@ func (r *recoverer) recoverLeaseExpiredTasks() {
|
||||
return
|
||||
}
|
||||
for _, msg := range msgs {
|
||||
if msg.Retried >= msg.Retry {
|
||||
if msg.Retried >= msg.Retry && msg.Retry > 0 {
|
||||
r.archive(msg, ErrLeaseExpired)
|
||||
} else {
|
||||
r.retry(msg, ErrLeaseExpired)
|
||||
|
27
scheduler.go
27
scheduler.go
@@ -44,9 +44,6 @@ type Scheduler struct {
|
||||
// to avoid using cron.EntryID as the public API of
|
||||
// the Scheduler.
|
||||
idmap map[string]cron.EntryID
|
||||
// When a Scheduler has been created with an existing Redis connection, we do
|
||||
// not want to close it.
|
||||
sharedConnection bool
|
||||
}
|
||||
|
||||
const defaultHeartbeatInterval = 10 * time.Second
|
||||
@@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second
|
||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||
// The parameter opts is optional, defaults will be used if opts is set to nil
|
||||
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||
scheduler := newScheduler(opts)
|
||||
|
||||
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||
}
|
||||
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
|
||||
scheduler.sharedConnection = false
|
||||
|
||||
rdb := rdb.NewRDB(redisClient)
|
||||
|
||||
scheduler.rdb = rdb
|
||||
scheduler.client = &Client{broker: rdb, sharedConnection: false}
|
||||
|
||||
return scheduler
|
||||
}
|
||||
|
||||
@@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
||||
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
||||
scheduler := newScheduler(opts)
|
||||
|
||||
scheduler.rdb = rdb.NewRDB(c)
|
||||
scheduler.client = NewClientFromRedisClient(c)
|
||||
|
||||
return scheduler
|
||||
}
|
||||
|
||||
func newScheduler(opts *SchedulerOpts) *Scheduler {
|
||||
if opts == nil {
|
||||
opts = &SchedulerOpts{}
|
||||
}
|
||||
@@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
|
||||
state: &serverState{value: srvStateNew},
|
||||
heartbeatInterval: heartbeatInterval,
|
||||
logger: logger,
|
||||
client: NewClientFromRedisClient(c),
|
||||
rdb: rdb.NewRDB(c),
|
||||
cron: cron.New(cron.WithLocation(loc)),
|
||||
location: loc,
|
||||
done: make(chan struct{}),
|
||||
@@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
|
||||
if err := s.client.Close(); err != nil {
|
||||
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
||||
}
|
||||
if !s.sharedConnection {
|
||||
s.rdb.Close()
|
||||
}
|
||||
s.logger.Info("Scheduler stopped")
|
||||
}
|
||||
|
||||
|
@@ -174,16 +174,15 @@ type Config struct {
|
||||
// })
|
||||
//
|
||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||
|
||||
//
|
||||
// we can also handle panic error like:
|
||||
// func reportError(ctx context, task *asynq.Task, err error) {
|
||||
// if asynq.IsPanic(err) {
|
||||
// if asynq.IsPanicError(err) {
|
||||
// errorReportingService.Notify(err)
|
||||
// }
|
||||
// })
|
||||
//
|
||||
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
|
||||
|
||||
ErrorHandler ErrorHandler
|
||||
|
||||
// Logger specifies the logger used by the server instance.
|
||||
|
@@ -24,9 +24,11 @@ func (srv *Server) waitForSignals() {
|
||||
if sig == unix.SIGTSTP {
|
||||
srv.Stop()
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
srv.Stop()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) waitForSignals() {
|
||||
|
@@ -40,6 +40,7 @@ var (
|
||||
useRedisCluster bool
|
||||
clusterAddrs string
|
||||
tlsServerName string
|
||||
insecure bool
|
||||
)
|
||||
|
||||
// rootCmd represents the base command when called without any subcommands
|
||||
@@ -314,6 +315,8 @@ func init() {
|
||||
"List of comma-separated redis server addresses")
|
||||
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
|
||||
"", "Server name for TLS validation")
|
||||
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
|
||||
false, "Allow insecure TLS connection by skipping cert validation")
|
||||
// Bind flags with config.
|
||||
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
|
||||
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
|
||||
@@ -321,6 +324,7 @@ func init() {
|
||||
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
|
||||
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
|
||||
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
|
||||
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
|
||||
}
|
||||
|
||||
// initConfig reads in config file and ENV variables if set.
|
||||
@@ -402,7 +406,7 @@ func getTLSConfig() *tls.Config {
|
||||
if tlsServer == "" {
|
||||
return nil
|
||||
}
|
||||
return &tls.Config{ServerName: tlsServer}
|
||||
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
|
||||
}
|
||||
|
||||
// printTable is a helper function to print data in table format.
|
||||
|
Reference in New Issue
Block a user