mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 07:12:17 +08:00
Add workers command to asynqmon
This commit is contained in:
parent
e21fe3bd79
commit
26e399bc9c
@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
### Added
|
||||||
|
|
||||||
|
- `asynqmon workers` was added to list all running workers information
|
||||||
|
|
||||||
## [0.4.0] - 2020-02-13
|
## [0.4.0] - 2020-02-13
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
@ -372,6 +372,7 @@ func (r *RDB) forwardSingle(src, dst string) error {
|
|||||||
var writeProcessInfoCmd = redis.NewScript(`
|
var writeProcessInfoCmd = redis.NewScript(`
|
||||||
redis.call("SETEX", KEYS[1], ARGV[2], ARGV[3])
|
redis.call("SETEX", KEYS[1], ARGV[2], ARGV[3])
|
||||||
redis.call("ZADD", KEYS[2], ARGV[1], KEYS[1])
|
redis.call("ZADD", KEYS[2], ARGV[1], KEYS[1])
|
||||||
|
redis.call("DEL", KEYS[3])
|
||||||
for i = 4, table.getn(ARGV)-1, 2 do
|
for i = 4, table.getn(ARGV)-1, 2 do
|
||||||
redis.call("HSET", KEYS[3], ARGV[i], ARGV[i+1])
|
redis.call("HSET", KEYS[3], ARGV[i], ARGV[i+1])
|
||||||
end
|
end
|
||||||
|
75
tools/asynqmon/cmd/workers.go
Normal file
75
tools/asynqmon/cmd/workers.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT license
|
||||||
|
// that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package cmd
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/go-redis/redis/v7"
|
||||||
|
"github.com/hibiken/asynq/internal/rdb"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
"github.com/spf13/viper"
|
||||||
|
)
|
||||||
|
|
||||||
|
// workersCmd represents the workers command
|
||||||
|
var workersCmd = &cobra.Command{
|
||||||
|
Use: "workers",
|
||||||
|
Short: "Shows all running workers information",
|
||||||
|
Long: `Workers (asynqmon workers) will show all running workers information.
|
||||||
|
|
||||||
|
The command shows the follwoing for each worker:
|
||||||
|
* Process in which the worker is running
|
||||||
|
* ID of the task worker is processing
|
||||||
|
* Type of the task worker is processing
|
||||||
|
* Payload of the task worker is processing
|
||||||
|
* Queue that the task was pulled from.
|
||||||
|
* Time the worker started processing the task`,
|
||||||
|
Args: cobra.NoArgs,
|
||||||
|
Run: workers,
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
rootCmd.AddCommand(workersCmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
func workers(cmd *cobra.Command, args []string) {
|
||||||
|
r := rdb.NewRDB(redis.NewClient(&redis.Options{
|
||||||
|
Addr: viper.GetString("uri"),
|
||||||
|
DB: viper.GetInt("db"),
|
||||||
|
Password: viper.GetString("password"),
|
||||||
|
}))
|
||||||
|
|
||||||
|
workers, err := r.ListWorkers()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(workers) == 0 {
|
||||||
|
fmt.Println("No workers")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort by started timestamp or ID.
|
||||||
|
sort.Slice(workers, func(i, j int) bool {
|
||||||
|
x, y := workers[i], workers[j]
|
||||||
|
if x.Started != y.Started {
|
||||||
|
return x.Started.Before(y.Started)
|
||||||
|
}
|
||||||
|
return x.ID.String() < y.ID.String()
|
||||||
|
})
|
||||||
|
|
||||||
|
cols := []string{"Process", "ID", "Type", "Payload", "Queue", "Started"}
|
||||||
|
printRows := func(w io.Writer, tmpl string) {
|
||||||
|
for _, wk := range workers {
|
||||||
|
fmt.Fprintf(w, tmpl,
|
||||||
|
fmt.Sprintf("%s:%d", wk.Host, wk.PID), wk.ID, wk.Type, wk.Payload, wk.Queue, timeAgo(wk.Started))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
printTable(cols, printRows)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user