diff --git a/internal/base/base.go b/internal/base/base.go index a2959d7..b7896fa 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -132,9 +132,9 @@ func (p *ProcessInfo) IncrActiveWorkerCount(delta int) { p.ActiveWorkerCount += delta } -// Cancelations hold cancel functions for all in-progress tasks. +// Cancelations is a collection that holds cancel functions for all in-progress tasks. // -// Its methods are safe to be used in multiple concurrent goroutines +// Its methods are safe to be used in multiple goroutines. type Cancelations struct { mu sync.Mutex cancelFuncs map[string]context.CancelFunc @@ -147,14 +147,14 @@ func NewCancelations() *Cancelations { } } -// Add adds a new cancel func to the set. +// Add adds a new cancel func to the collection. func (c *Cancelations) Add(id string, fn context.CancelFunc) { c.mu.Lock() defer c.mu.Unlock() c.cancelFuncs[id] = fn } -// Delete deletes a cancel func from the set given an id. +// Delete deletes a cancel func from the collection given an id. func (c *Cancelations) Delete(id string) { c.mu.Lock() defer c.mu.Unlock() diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index fc719bc..3c166da 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -422,7 +422,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { } // PublishCancelation publish cancelation message to all subscribers. -// The message is a string representing the task to be canceled. +// The message is the ID for the task to be canceled. func (r *RDB) PublishCancelation(id string) error { return r.client.Publish(base.CancelChannel, id).Err() } diff --git a/tools/asynqmon/cmd/cancel.go b/tools/asynqmon/cmd/cancel.go new file mode 100644 index 0000000..772e8d1 --- /dev/null +++ b/tools/asynqmon/cmd/cancel.go @@ -0,0 +1,53 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "fmt" + "os" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// cancelCmd represents the cancel command +var cancelCmd = &cobra.Command{ + Use: "cancel [task id]", + Short: "Sends a cancelation signal to the goroutine processing the specified task", + Long: `Cancel (asynqmon cancel) will send a cancelation signal to the goroutine processing +the specified task. + +The command takes one argument which specifies the task to cancel. +The task should be in in-progress state. +Identifier for a task should be obtained by running "asynqmon ls" command. + +Handler implementation needs to be context aware for cancelation signal to +actually cancel the processing. + +Example: asynqmon cancel bnogo8gt6toe23vhef0g`, + Args: cobra.ExactArgs(1), + Run: cancel, +} + +func init() { + rootCmd.AddCommand(cancelCmd) +} + +func cancel(cmd *cobra.Command, args []string) { + r := rdb.NewRDB(redis.NewClient(&redis.Options{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + })) + + err := r.PublishCancelation(args[0]) + if err != nil { + fmt.Printf("could not send cancelation signal: %v\n", err) + os.Exit(1) + } + fmt.Printf("Successfully sent cancelation siganl for task %s\n", args[0]) +}