mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-19 21:07:05 +08:00
Compare commits
3 Commits
sohail/pm-
...
sohail/uni
Author | SHA1 | Date | |
---|---|---|---|
|
71c746d00a | ||
|
02907551b4 | ||
|
127fac2e90 |
27
scheduler.go
27
scheduler.go
@@ -44,9 +44,6 @@ type Scheduler struct {
|
|||||||
// to avoid using cron.EntryID as the public API of
|
// to avoid using cron.EntryID as the public API of
|
||||||
// the Scheduler.
|
// the Scheduler.
|
||||||
idmap map[string]cron.EntryID
|
idmap map[string]cron.EntryID
|
||||||
// When a Scheduler has been created with an existing Redis connection, we do
|
|
||||||
// not want to close it.
|
|
||||||
sharedConnection bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const defaultHeartbeatInterval = 10 * time.Second
|
const defaultHeartbeatInterval = 10 * time.Second
|
||||||
@@ -54,12 +51,18 @@ const defaultHeartbeatInterval = 10 * time.Second
|
|||||||
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
// NewScheduler returns a new Scheduler instance given the redis connection option.
|
||||||
// The parameter opts is optional, defaults will be used if opts is set to nil
|
// The parameter opts is optional, defaults will be used if opts is set to nil
|
||||||
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
||||||
|
scheduler := newScheduler(opts)
|
||||||
|
|
||||||
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
redisClient, ok := r.MakeRedisClient().(redis.UniversalClient)
|
||||||
if !ok {
|
if !ok {
|
||||||
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r))
|
||||||
}
|
}
|
||||||
scheduler := NewSchedulerFromRedisClient(redisClient, opts)
|
|
||||||
scheduler.sharedConnection = false
|
rdb := rdb.NewRDB(redisClient)
|
||||||
|
|
||||||
|
scheduler.rdb = rdb
|
||||||
|
scheduler.client = &Client{broker: rdb, sharedConnection: false}
|
||||||
|
|
||||||
return scheduler
|
return scheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,6 +70,15 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
|
|||||||
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
// The parameter opts is optional, defaults will be used if opts is set to nil.
|
||||||
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
// Warning: The underlying redis connection pool will not be closed by Asynq, you are responsible for closing it.
|
||||||
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *Scheduler {
|
||||||
|
scheduler := newScheduler(opts)
|
||||||
|
|
||||||
|
scheduler.rdb = rdb.NewRDB(c)
|
||||||
|
scheduler.client = NewClientFromRedisClient(c)
|
||||||
|
|
||||||
|
return scheduler
|
||||||
|
}
|
||||||
|
|
||||||
|
func newScheduler(opts *SchedulerOpts) *Scheduler {
|
||||||
if opts == nil {
|
if opts == nil {
|
||||||
opts = &SchedulerOpts{}
|
opts = &SchedulerOpts{}
|
||||||
}
|
}
|
||||||
@@ -93,8 +105,6 @@ func NewSchedulerFromRedisClient(c redis.UniversalClient, opts *SchedulerOpts) *
|
|||||||
state: &serverState{value: srvStateNew},
|
state: &serverState{value: srvStateNew},
|
||||||
heartbeatInterval: heartbeatInterval,
|
heartbeatInterval: heartbeatInterval,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
client: NewClientFromRedisClient(c),
|
|
||||||
rdb: rdb.NewRDB(c),
|
|
||||||
cron: cron.New(cron.WithLocation(loc)),
|
cron: cron.New(cron.WithLocation(loc)),
|
||||||
location: loc,
|
location: loc,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
@@ -294,9 +304,6 @@ func (s *Scheduler) Shutdown() {
|
|||||||
if err := s.client.Close(); err != nil {
|
if err := s.client.Close(); err != nil {
|
||||||
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
s.logger.Errorf("Failed to close redis client connection: %v", err)
|
||||||
}
|
}
|
||||||
if !s.sharedConnection {
|
|
||||||
s.rdb.Close()
|
|
||||||
}
|
|
||||||
s.logger.Info("Scheduler stopped")
|
s.logger.Info("Scheduler stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -24,8 +24,10 @@ func (srv *Server) waitForSignals() {
|
|||||||
if sig == unix.SIGTSTP {
|
if sig == unix.SIGTSTP {
|
||||||
srv.Stop()
|
srv.Stop()
|
||||||
continue
|
continue
|
||||||
|
} else {
|
||||||
|
srv.Stop()
|
||||||
|
break
|
||||||
}
|
}
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -40,6 +40,7 @@ var (
|
|||||||
useRedisCluster bool
|
useRedisCluster bool
|
||||||
clusterAddrs string
|
clusterAddrs string
|
||||||
tlsServerName string
|
tlsServerName string
|
||||||
|
insecure bool
|
||||||
)
|
)
|
||||||
|
|
||||||
// rootCmd represents the base command when called without any subcommands
|
// rootCmd represents the base command when called without any subcommands
|
||||||
@@ -314,6 +315,8 @@ func init() {
|
|||||||
"List of comma-separated redis server addresses")
|
"List of comma-separated redis server addresses")
|
||||||
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
|
rootCmd.PersistentFlags().StringVar(&tlsServerName, "tls_server",
|
||||||
"", "Server name for TLS validation")
|
"", "Server name for TLS validation")
|
||||||
|
rootCmd.PersistentFlags().BoolVar(&insecure, "insecure",
|
||||||
|
false, "Allow insecure TLS connection by skipping cert validation")
|
||||||
// Bind flags with config.
|
// Bind flags with config.
|
||||||
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
|
viper.BindPFlag("uri", rootCmd.PersistentFlags().Lookup("uri"))
|
||||||
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
|
viper.BindPFlag("db", rootCmd.PersistentFlags().Lookup("db"))
|
||||||
@@ -321,6 +324,7 @@ func init() {
|
|||||||
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
|
viper.BindPFlag("cluster", rootCmd.PersistentFlags().Lookup("cluster"))
|
||||||
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
|
viper.BindPFlag("cluster_addrs", rootCmd.PersistentFlags().Lookup("cluster_addrs"))
|
||||||
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
|
viper.BindPFlag("tls_server", rootCmd.PersistentFlags().Lookup("tls_server"))
|
||||||
|
viper.BindPFlag("insecure", rootCmd.PersistentFlags().Lookup("insecure"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// initConfig reads in config file and ENV variables if set.
|
// initConfig reads in config file and ENV variables if set.
|
||||||
@@ -402,7 +406,7 @@ func getTLSConfig() *tls.Config {
|
|||||||
if tlsServer == "" {
|
if tlsServer == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return &tls.Config{ServerName: tlsServer}
|
return &tls.Config{ServerName: tlsServer, InsecureSkipVerify: viper.GetBool("insecure")}
|
||||||
}
|
}
|
||||||
|
|
||||||
// printTable is a helper function to print data in table format.
|
// printTable is a helper function to print data in table format.
|
||||||
|
Reference in New Issue
Block a user