2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-26 11:16:12 +08:00

Add cancel command to asynqmon cli

This commit is contained in:
Ken Hibino
2020-02-12 17:33:41 -08:00
parent 2e07266ee1
commit c41e666643
3 changed files with 58 additions and 5 deletions

View File

@@ -132,9 +132,9 @@ func (p *ProcessInfo) IncrActiveWorkerCount(delta int) {
p.ActiveWorkerCount += delta p.ActiveWorkerCount += delta
} }
// Cancelations hold cancel functions for all in-progress tasks. // Cancelations is a collection that holds cancel functions for all in-progress tasks.
// //
// Its methods are safe to be used in multiple concurrent goroutines // Its methods are safe to be used in multiple goroutines.
type Cancelations struct { type Cancelations struct {
mu sync.Mutex mu sync.Mutex
cancelFuncs map[string]context.CancelFunc cancelFuncs map[string]context.CancelFunc
@@ -147,14 +147,14 @@ func NewCancelations() *Cancelations {
} }
} }
// Add adds a new cancel func to the set. // Add adds a new cancel func to the collection.
func (c *Cancelations) Add(id string, fn context.CancelFunc) { func (c *Cancelations) Add(id string, fn context.CancelFunc) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
c.cancelFuncs[id] = fn c.cancelFuncs[id] = fn
} }
// Delete deletes a cancel func from the set given an id. // Delete deletes a cancel func from the collection given an id.
func (c *Cancelations) Delete(id string) { func (c *Cancelations) Delete(id string) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()

View File

@@ -422,7 +422,7 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
} }
// PublishCancelation publish cancelation message to all subscribers. // PublishCancelation publish cancelation message to all subscribers.
// The message is a string representing the task to be canceled. // The message is the ID for the task to be canceled.
func (r *RDB) PublishCancelation(id string) error { func (r *RDB) PublishCancelation(id string) error {
return r.client.Publish(base.CancelChannel, id).Err() return r.client.Publish(base.CancelChannel, id).Err()
} }

View File

@@ -0,0 +1,53 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package cmd
import (
"fmt"
"os"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// cancelCmd represents the cancel command
var cancelCmd = &cobra.Command{
Use: "cancel [task id]",
Short: "Sends a cancelation signal to the goroutine processing the specified task",
Long: `Cancel (asynqmon cancel) will send a cancelation signal to the goroutine processing
the specified task.
The command takes one argument which specifies the task to cancel.
The task should be in in-progress state.
Identifier for a task should be obtained by running "asynqmon ls" command.
Handler implementation needs to be context aware for cancelation signal to
actually cancel the processing.
Example: asynqmon cancel bnogo8gt6toe23vhef0g`,
Args: cobra.ExactArgs(1),
Run: cancel,
}
func init() {
rootCmd.AddCommand(cancelCmd)
}
func cancel(cmd *cobra.Command, args []string) {
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
}))
err := r.PublishCancelation(args[0])
if err != nil {
fmt.Printf("could not send cancelation signal: %v\n", err)
os.Exit(1)
}
fmt.Printf("Successfully sent cancelation siganl for task %s\n", args[0])
}