diff --git a/background.go b/background.go index 550a887..273be7d 100644 --- a/background.go +++ b/background.go @@ -33,6 +33,7 @@ type Background struct { mu sync.Mutex running bool + pinfo *base.ProcessInfo rdb *rdb.RDB scheduler *scheduler processor *processor @@ -108,7 +109,6 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { if queues == nil || len(queues) == 0 { queues = defaultQueueConfig } - qcfg := normalizeQueueCfg(queues) host, err := os.Hostname() if err != nil { @@ -116,13 +116,15 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { } pid := os.Getpid() + pinfo := base.NewProcessInfo(host, pid, n, queues, cfg.StrictPriority) rdb := rdb.NewRDB(createRedisClient(r)) syncRequestCh := make(chan *syncRequest) syncer := newSyncer(syncRequestCh, 5*time.Second) - heartbeater := newHeartbeater(rdb, 5*time.Second, host, pid, queues, n) - scheduler := newScheduler(rdb, 5*time.Second, qcfg) - processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh) + heartbeater := newHeartbeater(rdb, pinfo, 5*time.Second) + scheduler := newScheduler(rdb, 5*time.Second, queues) + processor := newProcessor(rdb, pinfo, delayFunc, syncRequestCh) return &Background{ + pinfo: pinfo, rdb: rdb, scheduler: scheduler, processor: processor, @@ -174,7 +176,7 @@ func (bg *Background) Run(handler Handler) { sig := <-sigs if sig == syscall.SIGTSTP { bg.processor.stop() - bg.heartbeater.setState("stopped") + bg.pinfo.SetState("stopped") continue } break @@ -215,41 +217,10 @@ func (bg *Background) stop() { bg.syncer.terminate() bg.heartbeater.terminate() + bg.rdb.ClearProcessInfo(bg.pinfo) bg.rdb.Close() bg.processor.handler = nil bg.running = false logger.info("Bye!") } - -// normalizeQueueCfg divides priority numbers by their -// greatest common divisor. -func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint { - var xs []uint - for _, x := range queueCfg { - xs = append(xs, x) - } - d := gcd(xs...) - res := make(map[string]uint) - for q, x := range queueCfg { - res[q] = x / d - } - return res -} - -func gcd(xs ...uint) uint { - fn := func(x, y uint) uint { - for y > 0 { - x, y = y, x%y - } - return x - } - res := xs[0] - for i := 0; i < len(xs); i++ { - res = fn(xs[i], res) - if res == 1 { - return 1 - } - } - return res -} diff --git a/heartbeat.go b/heartbeat.go index d9ead5d..d85bb4d 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -5,20 +5,18 @@ package asynq import ( - "sync" "time" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) -// heartbeater is responsible for writing process status to redis periodically to +// heartbeater is responsible for writing process info to redis periodically to // indicate that the background worker process is up. type heartbeater struct { rdb *rdb.RDB - mu sync.Mutex - ps *base.ProcessStatus + pinfo *base.ProcessInfo // channel to communicate back to the long running "heartbeater" goroutine. done chan struct{} @@ -27,16 +25,10 @@ type heartbeater struct { interval time.Duration } -func newHeartbeater(rdb *rdb.RDB, interval time.Duration, host string, pid int, queues map[string]uint, n int) *heartbeater { - ps := &base.ProcessStatus{ - Concurrency: n, - Queues: queues, - Host: host, - PID: pid, - } +func newHeartbeater(rdb *rdb.RDB, pinfo *base.ProcessInfo, interval time.Duration) *heartbeater { return &heartbeater{ rdb: rdb, - ps: ps, + pinfo: pinfo, done: make(chan struct{}), interval: interval, } @@ -48,31 +40,28 @@ func (h *heartbeater) terminate() { h.done <- struct{}{} } -func (h *heartbeater) setState(state string) { - h.mu.Lock() - defer h.mu.Unlock() - h.ps.State = state -} - func (h *heartbeater) start() { - h.ps.Started = time.Now() - h.ps.State = "running" + h.pinfo.SetStarted(time.Now()) + h.pinfo.SetState("running") go func() { + h.beat() for { select { case <-h.done: logger.info("Heartbeater done") return case <-time.After(h.interval): - // Note: Set TTL to be long enough value so that it won't expire before we write again - // and short enough to expire quickly once process is shut down. - h.mu.Lock() - err := h.rdb.WriteProcessStatus(h.ps, h.interval*2) - h.mu.Unlock() - if err != nil { - logger.error("could not write heartbeat data: %v", err) - } + h.beat() } } }() } + +func (h *heartbeater) beat() { + // Note: Set TTL to be long enough so that it won't expire before we write again + // and short enough to expire quickly once the process is shut down or killed. + err := h.rdb.WriteProcessInfo(h.pinfo, h.interval*2) + if err != nil { + logger.error("could not write heartbeat data: %v", err) + } +} diff --git a/heartbeat_test.go b/heartbeat_test.go index 9aee07f..d4fffb2 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -29,12 +29,15 @@ func TestHeartbeater(t *testing.T) { {time.Second, "some.address.ec2.aws.com", 45678, map[string]uint{"default": 1}, 10}, } + timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) + ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) for _, tc := range tests { h.FlushDB(t, r) - hb := newHeartbeater(rdbClient, tc.interval, tc.host, tc.pid, tc.queues, tc.concurrency) + pi := base.NewProcessInfo(tc.host, tc.pid, tc.concurrency, tc.queues, false) + hb := newHeartbeater(rdbClient, pi, tc.interval) - want := &base.ProcessStatus{ + want := &base.ProcessInfo{ Host: tc.host, PID: tc.pid, Queues: tc.queues, @@ -47,35 +50,34 @@ func TestHeartbeater(t *testing.T) { // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) - got, err := rdbClient.ReadProcessStatus(tc.host, tc.pid) + got, err := rdbClient.ReadProcessInfo(tc.host, tc.pid) if err != nil { t.Errorf("could not read process status from redis: %v", err) hb.terminate() continue } - var timeCmpOpt = cmpopts.EquateApproxTime(10 * time.Millisecond) - if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff) hb.terminate() continue } // state change - hb.setState("stopped") + pi.SetState("stopped") // allow for heartbeater to write to redis time.Sleep(tc.interval * 2) want.State = "stopped" - got, err = rdbClient.ReadProcessStatus(tc.host, tc.pid) + got, err = rdbClient.ReadProcessInfo(tc.host, tc.pid) if err != nil { t.Errorf("could not read process status from redis: %v", err) hb.terminate() continue } - if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" { + if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff) hb.terminate() continue diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 5bbed3a..531605d 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -41,6 +41,18 @@ var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) [ return out }) +// SortProcessInfoOpt is a cmp.Option to sort base.ProcessInfo for comparing slice of process info. +var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.ProcessInfo) []*base.ProcessInfo { + out := append([]*base.ProcessInfo(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + if out[i].Host != out[j].Host { + return out[i].Host < out[j].Host + } + return out[i].PID < out[j].PID + }) + return out +}) + // IgnoreIDOpt is an cmp.Option to ignore ID field in task messages when comparing. var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") diff --git a/internal/base/base.go b/internal/base/base.go index 87c9dab..f6956da 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -8,6 +8,7 @@ package base import ( "fmt" "strings" + "sync" "time" "github.com/rs/xid" @@ -19,6 +20,7 @@ const DefaultQueueName = "default" // Redis keys const ( psPrefix = "asynq:ps:" // HASH + AllProcesses = "asynq:ps" // ZSET processedPrefix = "asynq:processed:" // STRING - asynq:processed: failurePrefix = "asynq:failure:" // STRING - asynq:failure: QueuePrefix = "asynq:queues:" // LIST - asynq:queues: @@ -47,8 +49,8 @@ func FailureKey(t time.Time) string { return failurePrefix + t.UTC().Format("2006-01-02") } -// ProcessStatusKey returns a redis key string for process status. -func ProcessStatusKey(hostname string, pid int) string { +// ProcessInfoKey returns a redis key string for process info. +func ProcessInfoKey(hostname string, pid int) string { return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid) } @@ -77,12 +79,47 @@ type TaskMessage struct { ErrorMsg string } -// ProcessStatus holds information about running background worker process. -type ProcessStatus struct { - Concurrency int - Queues map[string]uint - PID int - Host string - State string - Started time.Time +// ProcessInfo holds information about running background worker process. +type ProcessInfo struct { + mu sync.Mutex + Concurrency int + Queues map[string]uint + StrictPriority bool + PID int + Host string + State string + Started time.Time + ActiveWorkerCount int +} + +// NewProcessInfo returns a new instance of ProcessInfo. +func NewProcessInfo(host string, pid, concurrency int, queues map[string]uint, strict bool) *ProcessInfo { + return &ProcessInfo{ + Host: host, + PID: pid, + Concurrency: concurrency, + Queues: queues, + StrictPriority: strict, + } +} + +// SetState set the state field of the process info. +func (p *ProcessInfo) SetState(state string) { + p.mu.Lock() + defer p.mu.Unlock() + p.State = state +} + +// SetStarted set the started field of the process info. +func (p *ProcessInfo) SetStarted(t time.Time) { + p.mu.Lock() + defer p.mu.Unlock() + p.Started = t +} + +// IncrActiveWorkerCount increments active worker count by delta. +func (p *ProcessInfo) IncrActiveWorkerCount(delta int) { + p.mu.Lock() + defer p.mu.Unlock() + p.ActiveWorkerCount += delta } diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 79f88b4..00be893 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -5,6 +5,7 @@ package base import ( + "sync" "testing" "time" ) @@ -61,7 +62,7 @@ func TestFailureKey(t *testing.T) { } } -func TestProcessStatusKey(t *testing.T) { +func TestProcessInfoKey(t *testing.T) { tests := []struct { hostname string pid int @@ -72,9 +73,36 @@ func TestProcessStatusKey(t *testing.T) { } for _, tc := range tests { - got := ProcessStatusKey(tc.hostname, tc.pid) + got := ProcessInfoKey(tc.hostname, tc.pid) if got != tc.want { - t.Errorf("ProcessStatusKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want) + t.Errorf("ProcessInfoKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want) } } } + +// Note: Run this test with -race flag to check for data race. +func TestProcessInfoSetter(t *testing.T) { + pi := NewProcessInfo("localhost", 1234, 8, map[string]uint{"default": 1}, false) + + var wg sync.WaitGroup + + wg.Add(3) + + go func() { + pi.SetState("runnning") + wg.Done() + }() + + go func() { + pi.SetStarted(time.Now()) + pi.IncrActiveWorkerCount(1) + wg.Done() + }() + + go func() { + pi.SetState("stopped") + wg.Done() + }() + + wg.Wait() +} diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index d47dafb..07ed9d3 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -755,3 +755,40 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { } return nil } + +// ListProcesses returns the list of process statuses. +func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) { + // Note: Script also removes stale keys. + script := redis.NewScript(` + local res = {} + local now = tonumber(ARGV[1]) + local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") + for _, key in ipairs(keys) do + local ps = redis.call("GET", key) + if ps then + table.insert(res, ps) + end + end + redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) + return res + `) + res, err := script.Run(r.client, + []string{base.AllProcesses}, time.Now().UTC().Unix()).Result() + if err != nil { + return nil, err + } + data, err := cast.ToStringSliceE(res) + if err != nil { + return nil, err + } + var processes []*base.ProcessInfo + for _, s := range data { + var ps base.ProcessInfo + err := json.Unmarshal([]byte(s), &ps) + if err != nil { + continue // skip bad data + } + processes = append(processes, &ps) + } + return processes, nil +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 33cd0db..91422c0 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2050,3 +2050,56 @@ func TestRemoveQueueError(t *testing.T) { } } } + +func TestListProcesses(t *testing.T) { + r := setup(t) + + ps1 := &base.ProcessInfo{ + Concurrency: 10, + Queues: map[string]uint{"default": 1}, + Host: "do.droplet1", + PID: 1234, + State: "running", + Started: time.Now().Add(-time.Hour), + ActiveWorkerCount: 5, + } + + ps2 := &base.ProcessInfo{ + Concurrency: 20, + Queues: map[string]uint{"email": 1}, + Host: "do.droplet2", + PID: 9876, + State: "stopped", + Started: time.Now().Add(-2 * time.Hour), + ActiveWorkerCount: 20, + } + + tests := []struct { + processes []*base.ProcessInfo + }{ + {processes: []*base.ProcessInfo{}}, + {processes: []*base.ProcessInfo{ps1}}, + {processes: []*base.ProcessInfo{ps1, ps2}}, + } + + ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) + + for _, tc := range tests { + h.FlushDB(t, r.client) + + for _, ps := range tc.processes { + if err := r.WriteProcessInfo(ps, 5*time.Second); err != nil { + t.Fatal(err) + } + } + + got, err := r.ListProcesses() + if err != nil { + t.Errorf("r.ListProcesses returned an error: %v", err) + } + if diff := cmp.Diff(tc.processes, got, h.SortProcessInfoOpt, ignoreOpt); diff != "" { + t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s", + got, tc.processes, diff) + } + } +} diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 6403da4..ae05a55 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -347,28 +347,52 @@ func (r *RDB) forwardSingle(src, dst string) error { []string{src, dst}, now).Err() } -// WriteProcessStatus writes process information to redis with expiration +// WriteProcessInfo writes process information to redis with expiration // set to the value ttl. -func (r *RDB) WriteProcessStatus(ps *base.ProcessStatus, ttl time.Duration) error { +func (r *RDB) WriteProcessInfo(ps *base.ProcessInfo, ttl time.Duration) error { bytes, err := json.Marshal(ps) if err != nil { return err } - key := base.ProcessStatusKey(ps.Host, ps.PID) - return r.client.Set(key, string(bytes), ttl).Err() + // Note: Add key to ZSET with expiration time as score. + // ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996 + exp := time.Now().Add(ttl).UTC() + key := base.ProcessInfoKey(ps.Host, ps.PID) + // KEYS[1] -> asynq:ps + // KEYS[2] -> asynq:ps: + // ARGV[1] -> expiration time + // ARGV[2] -> TTL in seconds + // ARGV[3] -> process info + script := redis.NewScript(` + redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) + redis.call("SETEX", KEYS[2], ARGV[2], ARGV[3]) + return redis.status_reply("OK") + `) + return script.Run(r.client, []string{base.AllProcesses, key}, float64(exp.Unix()), ttl.Seconds(), string(bytes)).Err() } -// ReadProcessStatus reads process information stored in redis. -func (r *RDB) ReadProcessStatus(host string, pid int) (*base.ProcessStatus, error) { - key := base.ProcessStatusKey(host, pid) +// ReadProcessInfo reads process information stored in redis. +func (r *RDB) ReadProcessInfo(host string, pid int) (*base.ProcessInfo, error) { + key := base.ProcessInfoKey(host, pid) data, err := r.client.Get(key).Result() if err != nil { return nil, err } - var ps base.ProcessStatus - err = json.Unmarshal([]byte(data), &ps) + var pinfo base.ProcessInfo + err = json.Unmarshal([]byte(data), &pinfo) if err != nil { return nil, err } - return &ps, nil + return &pinfo, nil +} + +// ClearProcessInfo deletes process information from redis. +func (r *RDB) ClearProcessInfo(ps *base.ProcessInfo) error { + key := base.ProcessInfoKey(ps.Host, ps.PID) + script := redis.NewScript(` + redis.call("ZREM", KEYS[1], KEYS[2]) + redis.call("DEL", KEYS[2]) + return redis.status_reply("OK") + `) + return script.Run(r.client, []string{base.AllProcesses, key}).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 4cae1ef..d478608 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -6,11 +6,13 @@ package rdb import ( "fmt" + "strconv" "testing" "time" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" ) @@ -739,48 +741,81 @@ func TestCheckAndEnqueue(t *testing.T) { } } -func TestReadWriteProcessStatus(t *testing.T) { +func TestReadWriteClearProcessInfo(t *testing.T) { r := setup(t) - ps1 := &base.ProcessStatus{ - Concurrency: 10, - Queues: map[string]uint{"default": 2, "email": 5, "low": 1}, - PID: 98765, - Host: "localhost", - State: "running", - Started: time.Now(), + pinfo := &base.ProcessInfo{ + Concurrency: 10, + Queues: map[string]uint{"default": 2, "email": 5, "low": 1}, + PID: 98765, + Host: "localhost", + State: "running", + Started: time.Now(), + ActiveWorkerCount: 1, } tests := []struct { - ps *base.ProcessStatus + pi *base.ProcessInfo ttl time.Duration }{ - {ps1, 5 * time.Second}, + {pinfo, 5 * time.Second}, } for _, tc := range tests { h.FlushDB(t, r.client) - err := r.WriteProcessStatus(tc.ps, tc.ttl) + err := r.WriteProcessInfo(tc.pi, tc.ttl) if err != nil { - t.Errorf("r.WriteProcessStatus returned an error: %v", err) + t.Errorf("r.WriteProcessInfo returned an error: %v", err) continue } - got, err := r.ReadProcessStatus(tc.ps.Host, tc.ps.PID) + got, err := r.ReadProcessInfo(tc.pi.Host, tc.pi.PID) if err != nil { - t.Errorf("r.ReadProcessStatus returned an error: %v", err) + t.Errorf("r.ReadProcessInfo returned an error: %v", err) continue } - if diff := cmp.Diff(tc.ps, got); diff != "" { - t.Errorf("r.ReadProcessStatus(%q, %d) = %+v, want %+v; (-want,+got)\n%s", - tc.ps.Host, tc.ps.PID, got, tc.ps, diff) + ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) + if diff := cmp.Diff(tc.pi, got, ignoreOpt); diff != "" { + t.Errorf("r.ReadProcessInfo(%q, %d) = %+v, want %+v; (-want,+got)\n%s", + tc.pi.Host, tc.pi.PID, got, tc.pi, diff) } - key := base.ProcessStatusKey(tc.ps.Host, tc.ps.PID) + key := base.ProcessInfoKey(tc.pi.Host, tc.pi.PID) gotTTL := r.client.TTL(key).Val() if !cmp.Equal(tc.ttl, gotTTL, timeCmpOpt) { t.Errorf("redis TTL %q returned %v, want %v", key, gotTTL, tc.ttl) } + + now := time.Now().UTC() + allKeys, err := r.client.ZRangeByScore(base.AllProcesses, &redis.ZRangeBy{ + Min: strconv.Itoa(int(now.Unix())), + Max: "+inf", + }).Result() + if err != nil { + t.Errorf("redis ZRANGEBYSCORE %q %d +inf returned an error: %v", + base.AllProcesses, now.Unix(), err) + continue + } + + wantAllKeys := []string{key} + if diff := cmp.Diff(wantAllKeys, allKeys); diff != "" { + t.Errorf("all keys = %v, want %v; (-want,+got)\n%s", allKeys, wantAllKeys, diff) + } + + if err := r.ClearProcessInfo(tc.pi); err != nil { + t.Errorf("r.ClearProcessInfo returned an error: %v", err) + continue + } + + // 1 means key exists + if r.client.Exists(key).Val() == 1 { + t.Errorf("expected %q to be deleted", key) + } + + if r.client.ZCard(base.AllProcesses).Val() != 0 { + t.Errorf("expected %q to be empty", base.AllProcesses) + } + } } diff --git a/processor.go b/processor.go index de3496b..32b63a3 100644 --- a/processor.go +++ b/processor.go @@ -19,6 +19,8 @@ import ( type processor struct { rdb *rdb.RDB + pinfo *base.ProcessInfo + handler Handler queueConfig map[string]uint @@ -53,25 +55,21 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. -// -// r is an instance of RDB used by the processor. -// n specifies the max number of concurrenct worker goroutines. -// qfcg is a mapping of queue names to associated priority level. -// strict specifies whether queue priority should be treated strictly. -// fn is a function to compute retry delay. -func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor { +func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor { + qcfg := normalizeQueueCfg(pinfo.Queues) orderedQueues := []string(nil) - if strict { + if pinfo.StrictPriority { orderedQueues = sortByPriority(qcfg) } return &processor{ rdb: r, + pinfo: pinfo, queueConfig: qcfg, orderedQueues: orderedQueues, retryDelayFunc: fn, syncRequestCh: syncRequestCh, errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1), - sema: make(chan struct{}, n), + sema: make(chan struct{}, pinfo.Concurrency), done: make(chan struct{}), abort: make(chan struct{}), quit: make(chan struct{}), @@ -153,8 +151,12 @@ func (p *processor) exec() { p.requeue(msg) return case p.sema <- struct{}{}: // acquire token + p.pinfo.IncrActiveWorkerCount(1) go func() { - defer func() { <-p.sema /* release token */ }() + defer func() { + <-p.sema /* release token */ + p.pinfo.IncrActiveWorkerCount(-1) + }() resCh := make(chan error, 1) task := NewTask(msg.Type, msg.Payload) @@ -331,3 +333,35 @@ type byPriority []*queue func (x byPriority) Len() int { return len(x) } func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority } func (x byPriority) Swap(i, j int) { x[i], x[j] = x[j], x[i] } + +// normalizeQueueCfg divides priority numbers by their +// greatest common divisor. +func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint { + var xs []uint + for _, x := range queueCfg { + xs = append(xs, x) + } + d := gcd(xs...) + res := make(map[string]uint) + for q, x := range queueCfg { + res[q] = x / d + } + return res +} + +func gcd(xs ...uint) uint { + fn := func(x, y uint) uint { + for y > 0 { + x, y = y, x%y + } + return x + } + res := xs[0] + for i := 0; i < len(xs); i++ { + res = fn(xs[i], res) + if res == 1 { + return 1 + } + } + return res +} diff --git a/processor_test.go b/processor_test.go index 07b39e8..893a9f3 100644 --- a/processor_test.go +++ b/processor_test.go @@ -65,7 +65,8 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil) + pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) + p := newProcessor(rdbClient, pi, defaultDelayFunc, nil) p.handler = HandlerFunc(handler) p.start() @@ -148,7 +149,8 @@ func TestProcessorRetry(t *testing.T) { handler := func(task *Task) error { return fmt.Errorf(errMsg) } - p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil) + pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) + p := newProcessor(rdbClient, pi, delayFunc, nil) p.handler = HandlerFunc(handler) p.start() @@ -207,7 +209,8 @@ func TestProcessorQueues(t *testing.T) { } for _, tc := range tests { - p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil) + pi := base.NewProcessInfo("localhost", 1234, 10, tc.queueCfg, false) + p := newProcessor(nil, pi, defaultDelayFunc, nil) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -273,7 +276,8 @@ func TestProcessorWithStrictPriority(t *testing.T) { "low": 1, } // Note: Set concurrency to 1 to make sure tasks are processed one at a time. - p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil) + pi := base.NewProcessInfo("localhost", 1234, 1 /*concurrency */, queueCfg, true /* strict */) + p := newProcessor(rdbClient, pi, defaultDelayFunc, nil) p.handler = HandlerFunc(handler) p.start() diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index 929f875..3837927 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -10,7 +10,6 @@ import ( "os" "strconv" "strings" - "text/tabwriter" "time" "github.com/go-redis/redis/v7" @@ -215,18 +214,3 @@ func listDead(r *rdb.RDB) { printTable(cols, printRows) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) } - -func printTable(cols []string, printRows func(w io.Writer, tmpl string)) { - format := strings.Repeat("%v\t", len(cols)) + "\n" - tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) - var headers []interface{} - var seps []interface{} - for _, name := range cols { - headers = append(headers, name) - seps = append(seps, strings.Repeat("-", len(name))) - } - fmt.Fprintf(tw, format, headers...) - fmt.Fprintf(tw, format, seps...) - printRows(tw, format) - tw.Flush() -} diff --git a/tools/asynqmon/cmd/ps.go b/tools/asynqmon/cmd/ps.go new file mode 100644 index 0000000..8157ff1 --- /dev/null +++ b/tools/asynqmon/cmd/ps.go @@ -0,0 +1,89 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/rdb" + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +// psCmd represents the ps command +var psCmd = &cobra.Command{ + Use: "ps", + Short: "Shows all background worker processes", + Long: `Ps (asynqmon ps) will show all background worker processes +backed by the specified redis instance. + +The command shows the following for each process: +* Host and PID of the process +* Number of active workers out of worker pool +* Queues configuration +* State of the process ("running" | "stopped") + +A "running" process is processing tasks in queues. +A "stopped" process are no longer processing new tasks.`, + Args: cobra.NoArgs, + Run: ps, +} + +func init() { + rootCmd.AddCommand(psCmd) +} + +func ps(cmd *cobra.Command, args []string) { + c := redis.NewClient(&redis.Options{ + Addr: viper.GetString("uri"), + DB: viper.GetInt("db"), + Password: viper.GetString("password"), + }) + r := rdb.NewRDB(c) + + processes, err := r.ListProcesses() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(processes) == 0 { + fmt.Println("No processes") + return + } + cols := []string{"Host", "PID", "State", "Active Workers", "Queues", "Started"} + printRows := func(w io.Writer, tmpl string) { + for _, ps := range processes { + fmt.Fprintf(w, tmpl, + ps.Host, ps.PID, ps.State, + fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency), + formatQueues(ps.Queues), timeAgo(ps.Started)) + } + } + printTable(cols, printRows) +} + +// timeAgo takes a time and returns a string of the format " ago". +func timeAgo(since time.Time) string { + d := time.Since(since).Round(time.Second) + return fmt.Sprintf("%v ago", d) +} + +func formatQueues(queues map[string]uint) string { + var b strings.Builder + l := len(queues) + for qname, p := range queues { + fmt.Fprintf(&b, "%s:%d", qname, p) + l-- + if l > 0 { + b.WriteString(" ") + } + } + return b.String() +} diff --git a/tools/asynqmon/cmd/root.go b/tools/asynqmon/cmd/root.go index 95af88d..e18aaf5 100644 --- a/tools/asynqmon/cmd/root.go +++ b/tools/asynqmon/cmd/root.go @@ -6,7 +6,10 @@ package cmd import ( "fmt" + "io" "os" + "strings" + "text/tabwriter" "github.com/spf13/cobra" @@ -81,3 +84,36 @@ func initConfig() { fmt.Println("Using config file:", viper.ConfigFileUsed()) } } + +// printTable is a helper function to print data in table format. +// +// cols is a list of headers and printRow specifies how to print rows. +// +// Example: +// type User struct { +// Name string +// Addr string +// Age int +// } +// data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...} +// cols := []string{"Name", "Addr", "Age"} +// printRows := func(w io.Writer, tmpl string) { +// for _, u := range data { +// fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age) +// } +// } +// printTable(cols, printRows) +func printTable(cols []string, printRows func(w io.Writer, tmpl string)) { + format := strings.Repeat("%v\t", len(cols)) + "\n" + tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) + var headers []interface{} + var seps []interface{} + for _, name := range cols { + headers = append(headers, name) + seps = append(seps, strings.Repeat("-", len(name))) + } + fmt.Fprintf(tw, format, headers...) + fmt.Fprintf(tw, format, seps...) + printRows(tw, format) + tw.Flush() +}