Compare commits

..

7 Commits

Author SHA1 Message Date
Mohammed Sohail
7c3d57d30e fix: "only once" tasks should be recovered and retried on worker crash 2025-05-15 10:26:05 +03:00
Khash Sajadi
c327bc40a2 docs: Update server.go (#1010)
Typo in the docs
2025-04-01 09:06:12 +03:00
Broderick Westrope
ea0c6e93f0 chore: fix godoc comment (#1009) 2025-04-01 09:05:18 +03:00
Mohammed Sohail
489e21920b release: v0.25.1 2024-12-11 09:19:37 +03:00
Mohamed Sohail
043dcfbf56 fix: call Stop on all other signals to correctly set the server state for the shutdown procedure to complete successfully (#982)
* fixes: #979
2024-12-11 09:05:00 +03:00
Robin Joseph
02907551b4 feat(dash): Add --insecure option (#980) 2024-12-09 09:09:12 +03:00
Mohamed Sohail
127fac2e90 fix: NewScheduler incorrectly creates underlying Client, closing broker properly (#977)
* fix: NewScheduler wrongly creates a client whose sharedConnection value is always true

* This is affecting the PeriodicManager as well as the Scheduler

* fix: closing the Client also closes the broker

* The error was also previously unhandled. For shared connections an error will be returned by the broker itself because the sharedConnection bool is also set on the client. This also means we can get rid of the sharedConnection flag on the Scheduler itself and let it work internally.
2024-12-06 08:40:04 +03:00
6 changed files with 31 additions and 7 deletions

View File

@@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.25.1] - 2024-12-11
### Upgrades
* Some packages
### Added
* Add `HeartbeatInterval` option to the scheduler (PR: https://github.com/hibiken/asynq/pull/956)
* Add `RedisUniversalClient` support to periodic task manager (PR: https://github.com/hibiken/asynq/pull/958)
* Add `--insecure` flag to CLI dash command (PR: https://github.com/hibiken/asynq/pull/980)
* Add logging for registration errors (PR: https://github.com/hibiken/asynq/pull/657)
### Fixes
- Perf: Use string concat inplace of fmt.Sprintf in hotpath (PR: https://github.com/hibiken/asynq/pull/962)
- Perf: Init map with size (PR: https://github.com/hibiken/asynq/pull/673)
- Fix: `Scheduler` and `PeriodicTaskManager` graceful shutdown (PR: https://github.com/hibiken/asynq/pull/977)
- Fix: `Server` graceful shutdown on UNIX systems (PR: https://github.com/hibiken/asynq/pull/982)
## [0.25.0] - 2024-10-29 ## [0.25.0] - 2024-10-29
### Upgrades ### Upgrades

View File

@@ -23,7 +23,7 @@ import (
) )
// Version of asynq library and CLI. // Version of asynq library and CLI.
const Version = "0.25.0" const Version = "0.25.1"
// DefaultQueueName is the queue name used if none are specified by user. // DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default" const DefaultQueueName = "default"

View File

@@ -95,7 +95,7 @@ func (r *recoverer) recoverLeaseExpiredTasks() {
return return
} }
for _, msg := range msgs { for _, msg := range msgs {
if msg.Retried >= msg.Retry { if msg.Retried >= msg.Retry && msg.Retry > 0 {
r.archive(msg, ErrLeaseExpired) r.archive(msg, ErrLeaseExpired)
} else { } else {
r.retry(msg, ErrLeaseExpired) r.retry(msg, ErrLeaseExpired)

View File

@@ -174,16 +174,15 @@ type Config struct {
// }) // })
// //
// ErrorHandler: asynq.ErrorHandlerFunc(reportError) // ErrorHandler: asynq.ErrorHandlerFunc(reportError)
//
// we can also handle panic error like: // we can also handle panic error like:
// func reportError(ctx context, task *asynq.Task, err error) { // func reportError(ctx context, task *asynq.Task, err error) {
// if asynq.IsPanic(err) { // if asynq.IsPanicError(err) {
// errorReportingService.Notify(err) // errorReportingService.Notify(err)
// } // }
// }) // })
// //
// ErrorHandler: asynq.ErrorHandlerFunc(reportError) // ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler ErrorHandler ErrorHandler
// Logger specifies the logger used by the server instance. // Logger specifies the logger used by the server instance.

View File

@@ -24,8 +24,10 @@ func (srv *Server) waitForSignals() {
if sig == unix.SIGTSTP { if sig == unix.SIGTSTP {
srv.Stop() srv.Stop()
continue continue
} else {
srv.Stop()
break
} }
break
} }
} }

View File

@@ -40,6 +40,7 @@ var (
useRedisCluster bool useRedisCluster bool
clusterAddrs string clusterAddrs string
tlsServerName string tlsServerName string
insecure bool
) )
// rootCmd represents the base command when called without any subcommands // rootCmd represents the base command when called without any subcommands
@@ -314,6 +315,8 @@ func init() {
"List of comma-separated redis server addresses") "List of comma-separated redis server addresses")
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server", rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
"", "Server name for TLS validation") "", "Server name for TLS validation")
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
false, "Allow insecure TLS connection by skipping cert validation")
// Bind flags with config. // Bind flags with config.
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri")) viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db")) viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
@@ -321,6 +324,7 @@ func init() {
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster")) viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs")) viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server")) viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
} }
// initConfig reads in config file and ENV variables if set. // initConfig reads in config file and ENV variables if set.
@@ -402,7 +406,7 @@ func getTLSConfig() *tls.Config {
if tlsServer == "" { if tlsServer == "" {
return nil return nil
} }
return &tls.Config{ServerName: tlsServer} return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
} }
// printTable is a helper function to print data in table format. // printTable is a helper function to print data in table format.