mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Add ps command to asynqmon
This commit is contained in:
		| @@ -33,6 +33,7 @@ type Background struct { | ||||
| 	mu      sync.Mutex | ||||
| 	running bool | ||||
|  | ||||
| 	pinfo       *base.ProcessInfo | ||||
| 	rdb         *rdb.RDB | ||||
| 	scheduler   *scheduler | ||||
| 	processor   *processor | ||||
| @@ -108,7 +109,6 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { | ||||
| 	if queues == nil || len(queues) == 0 { | ||||
| 		queues = defaultQueueConfig | ||||
| 	} | ||||
| 	qcfg := normalizeQueueCfg(queues) | ||||
|  | ||||
| 	host, err := os.Hostname() | ||||
| 	if err != nil { | ||||
| @@ -116,13 +116,15 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { | ||||
| 	} | ||||
| 	pid := os.Getpid() | ||||
|  | ||||
| 	pinfo := base.NewProcessInfo(host, pid, n, queues, cfg.StrictPriority) | ||||
| 	rdb := rdb.NewRDB(createRedisClient(r)) | ||||
| 	syncRequestCh := make(chan *syncRequest) | ||||
| 	syncer := newSyncer(syncRequestCh, 5*time.Second) | ||||
| 	heartbeater := newHeartbeater(rdb, 5*time.Second, host, pid, queues, n) | ||||
| 	scheduler := newScheduler(rdb, 5*time.Second, qcfg) | ||||
| 	processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc, syncRequestCh) | ||||
| 	heartbeater := newHeartbeater(rdb, pinfo, 5*time.Second) | ||||
| 	scheduler := newScheduler(rdb, 5*time.Second, queues) | ||||
| 	processor := newProcessor(rdb, pinfo, delayFunc, syncRequestCh) | ||||
| 	return &Background{ | ||||
| 		pinfo:       pinfo, | ||||
| 		rdb:         rdb, | ||||
| 		scheduler:   scheduler, | ||||
| 		processor:   processor, | ||||
| @@ -174,7 +176,7 @@ func (bg *Background) Run(handler Handler) { | ||||
| 		sig := <-sigs | ||||
| 		if sig == syscall.SIGTSTP { | ||||
| 			bg.processor.stop() | ||||
| 			bg.heartbeater.setState("stopped") | ||||
| 			bg.pinfo.SetState("stopped") | ||||
| 			continue | ||||
| 		} | ||||
| 		break | ||||
| @@ -215,41 +217,10 @@ func (bg *Background) stop() { | ||||
| 	bg.syncer.terminate() | ||||
| 	bg.heartbeater.terminate() | ||||
|  | ||||
| 	bg.rdb.ClearProcessInfo(bg.pinfo) | ||||
| 	bg.rdb.Close() | ||||
| 	bg.processor.handler = nil | ||||
| 	bg.running = false | ||||
|  | ||||
| 	logger.info("Bye!") | ||||
| } | ||||
|  | ||||
| // normalizeQueueCfg divides priority numbers by their | ||||
| // greatest common divisor. | ||||
| func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint { | ||||
| 	var xs []uint | ||||
| 	for _, x := range queueCfg { | ||||
| 		xs = append(xs, x) | ||||
| 	} | ||||
| 	d := gcd(xs...) | ||||
| 	res := make(map[string]uint) | ||||
| 	for q, x := range queueCfg { | ||||
| 		res[q] = x / d | ||||
| 	} | ||||
| 	return res | ||||
| } | ||||
|  | ||||
| func gcd(xs ...uint) uint { | ||||
| 	fn := func(x, y uint) uint { | ||||
| 		for y > 0 { | ||||
| 			x, y = y, x%y | ||||
| 		} | ||||
| 		return x | ||||
| 	} | ||||
| 	res := xs[0] | ||||
| 	for i := 0; i < len(xs); i++ { | ||||
| 		res = fn(xs[i], res) | ||||
| 		if res == 1 { | ||||
| 			return 1 | ||||
| 		} | ||||
| 	} | ||||
| 	return res | ||||
| } | ||||
|   | ||||
							
								
								
									
										45
									
								
								heartbeat.go
									
									
									
									
									
								
							
							
						
						
									
										45
									
								
								heartbeat.go
									
									
									
									
									
								
							| @@ -5,20 +5,18 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/hibiken/asynq/internal/base" | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| ) | ||||
|  | ||||
| // heartbeater is responsible for writing process status to redis periodically to | ||||
| // heartbeater is responsible for writing process info to redis periodically to | ||||
| // indicate that the background worker process is up. | ||||
| type heartbeater struct { | ||||
| 	rdb *rdb.RDB | ||||
|  | ||||
| 	mu sync.Mutex | ||||
| 	ps *base.ProcessStatus | ||||
| 	pinfo *base.ProcessInfo | ||||
|  | ||||
| 	// channel to communicate back to the long running "heartbeater" goroutine. | ||||
| 	done chan struct{} | ||||
| @@ -27,16 +25,10 @@ type heartbeater struct { | ||||
| 	interval time.Duration | ||||
| } | ||||
|  | ||||
| func newHeartbeater(rdb *rdb.RDB, interval time.Duration, host string, pid int, queues map[string]uint, n int) *heartbeater { | ||||
| 	ps := &base.ProcessStatus{ | ||||
| 		Concurrency: n, | ||||
| 		Queues:      queues, | ||||
| 		Host:        host, | ||||
| 		PID:         pid, | ||||
| 	} | ||||
| func newHeartbeater(rdb *rdb.RDB, pinfo *base.ProcessInfo, interval time.Duration) *heartbeater { | ||||
| 	return &heartbeater{ | ||||
| 		rdb:      rdb, | ||||
| 		ps:       ps, | ||||
| 		pinfo:    pinfo, | ||||
| 		done:     make(chan struct{}), | ||||
| 		interval: interval, | ||||
| 	} | ||||
| @@ -48,31 +40,28 @@ func (h *heartbeater) terminate() { | ||||
| 	h.done <- struct{}{} | ||||
| } | ||||
|  | ||||
| func (h *heartbeater) setState(state string) { | ||||
| 	h.mu.Lock() | ||||
| 	defer h.mu.Unlock() | ||||
| 	h.ps.State = state | ||||
| } | ||||
|  | ||||
| func (h *heartbeater) start() { | ||||
| 	h.ps.Started = time.Now() | ||||
| 	h.ps.State = "running" | ||||
| 	h.pinfo.SetStarted(time.Now()) | ||||
| 	h.pinfo.SetState("running") | ||||
| 	go func() { | ||||
| 		h.beat() | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-h.done: | ||||
| 				logger.info("Heartbeater done") | ||||
| 				return | ||||
| 			case <-time.After(h.interval): | ||||
| 				// Note: Set TTL to be long enough value so that it won't expire before we write again | ||||
| 				// and short enough to expire quickly once process is shut down. | ||||
| 				h.mu.Lock() | ||||
| 				err := h.rdb.WriteProcessStatus(h.ps, h.interval*2) | ||||
| 				h.mu.Unlock() | ||||
| 				if err != nil { | ||||
| 					logger.error("could not write heartbeat data: %v", err) | ||||
| 				} | ||||
| 				h.beat() | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (h *heartbeater) beat() { | ||||
| 	// Note: Set TTL to be long enough so that it won't expire before we write again | ||||
| 	// and short enough to expire quickly once the process is shut down or killed. | ||||
| 	err := h.rdb.WriteProcessInfo(h.pinfo, h.interval*2) | ||||
| 	if err != nil { | ||||
| 		logger.error("could not write heartbeat data: %v", err) | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -29,12 +29,15 @@ func TestHeartbeater(t *testing.T) { | ||||
| 		{time.Second, "some.address.ec2.aws.com", 45678, map[string]uint{"default": 1}, 10}, | ||||
| 	} | ||||
|  | ||||
| 	timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) | ||||
| 	ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) | ||||
| 	for _, tc := range tests { | ||||
| 		h.FlushDB(t, r) | ||||
|  | ||||
| 		hb := newHeartbeater(rdbClient, tc.interval, tc.host, tc.pid, tc.queues, tc.concurrency) | ||||
| 		pi := base.NewProcessInfo(tc.host, tc.pid, tc.concurrency, tc.queues, false) | ||||
| 		hb := newHeartbeater(rdbClient, pi, tc.interval) | ||||
|  | ||||
| 		want := &base.ProcessStatus{ | ||||
| 		want := &base.ProcessInfo{ | ||||
| 			Host:        tc.host, | ||||
| 			PID:         tc.pid, | ||||
| 			Queues:      tc.queues, | ||||
| @@ -47,35 +50,34 @@ func TestHeartbeater(t *testing.T) { | ||||
| 		// allow for heartbeater to write to redis | ||||
| 		time.Sleep(tc.interval * 2) | ||||
|  | ||||
| 		got, err := rdbClient.ReadProcessStatus(tc.host, tc.pid) | ||||
| 		got, err := rdbClient.ReadProcessInfo(tc.host, tc.pid) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("could not read process status from redis: %v", err) | ||||
| 			hb.terminate() | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		var timeCmpOpt = cmpopts.EquateApproxTime(10 * time.Millisecond) | ||||
| 		if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" { | ||||
| 		if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" { | ||||
| 			t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff) | ||||
| 			hb.terminate() | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// state change | ||||
| 		hb.setState("stopped") | ||||
| 		pi.SetState("stopped") | ||||
|  | ||||
| 		// allow for heartbeater to write to redis | ||||
| 		time.Sleep(tc.interval * 2) | ||||
|  | ||||
| 		want.State = "stopped" | ||||
| 		got, err = rdbClient.ReadProcessStatus(tc.host, tc.pid) | ||||
| 		got, err = rdbClient.ReadProcessInfo(tc.host, tc.pid) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("could not read process status from redis: %v", err) | ||||
| 			hb.terminate() | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if diff := cmp.Diff(want, got, timeCmpOpt); diff != "" { | ||||
| 		if diff := cmp.Diff(want, got, timeCmpOpt, ignoreOpt); diff != "" { | ||||
| 			t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", got, want, diff) | ||||
| 			hb.terminate() | ||||
| 			continue | ||||
|   | ||||
| @@ -41,6 +41,18 @@ var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) [ | ||||
| 	return out | ||||
| }) | ||||
|  | ||||
| // SortProcessInfoOpt is a cmp.Option to sort base.ProcessInfo for comparing slice of process info. | ||||
| var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.ProcessInfo) []*base.ProcessInfo { | ||||
| 	out := append([]*base.ProcessInfo(nil), in...) // Copy input to avoid mutating it | ||||
| 	sort.Slice(out, func(i, j int) bool { | ||||
| 		if out[i].Host != out[j].Host { | ||||
| 			return out[i].Host < out[j].Host | ||||
| 		} | ||||
| 		return out[i].PID < out[j].PID | ||||
| 	}) | ||||
| 	return out | ||||
| }) | ||||
|  | ||||
| // IgnoreIDOpt is an cmp.Option to ignore ID field in task messages when comparing. | ||||
| var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") | ||||
|  | ||||
|   | ||||
| @@ -8,6 +8,7 @@ package base | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/rs/xid" | ||||
| @@ -19,6 +20,7 @@ const DefaultQueueName = "default" | ||||
| // Redis keys | ||||
| const ( | ||||
| 	psPrefix        = "asynq:ps:"                    // HASH | ||||
| 	AllProcesses    = "asynq:ps"                     // ZSET | ||||
| 	processedPrefix = "asynq:processed:"             // STRING - asynq:processed:<yyyy-mm-dd> | ||||
| 	failurePrefix   = "asynq:failure:"               // STRING - asynq:failure:<yyyy-mm-dd> | ||||
| 	QueuePrefix     = "asynq:queues:"                // LIST   - asynq:queues:<qname> | ||||
| @@ -47,8 +49,8 @@ func FailureKey(t time.Time) string { | ||||
| 	return failurePrefix + t.UTC().Format("2006-01-02") | ||||
| } | ||||
|  | ||||
| // ProcessStatusKey returns a redis key string for process status. | ||||
| func ProcessStatusKey(hostname string, pid int) string { | ||||
| // ProcessInfoKey returns a redis key string for process info. | ||||
| func ProcessInfoKey(hostname string, pid int) string { | ||||
| 	return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid) | ||||
| } | ||||
|  | ||||
| @@ -77,12 +79,47 @@ type TaskMessage struct { | ||||
| 	ErrorMsg string | ||||
| } | ||||
|  | ||||
| // ProcessStatus holds information about running background worker process. | ||||
| type ProcessStatus struct { | ||||
| 	Concurrency int | ||||
| 	Queues      map[string]uint | ||||
| 	PID         int | ||||
| 	Host        string | ||||
| 	State       string | ||||
| 	Started     time.Time | ||||
| // ProcessInfo holds information about running background worker process. | ||||
| type ProcessInfo struct { | ||||
| 	mu                sync.Mutex | ||||
| 	Concurrency       int | ||||
| 	Queues            map[string]uint | ||||
| 	StrictPriority    bool | ||||
| 	PID               int | ||||
| 	Host              string | ||||
| 	State             string | ||||
| 	Started           time.Time | ||||
| 	ActiveWorkerCount int | ||||
| } | ||||
|  | ||||
| // NewProcessInfo returns a new instance of ProcessInfo. | ||||
| func NewProcessInfo(host string, pid, concurrency int, queues map[string]uint, strict bool) *ProcessInfo { | ||||
| 	return &ProcessInfo{ | ||||
| 		Host:           host, | ||||
| 		PID:            pid, | ||||
| 		Concurrency:    concurrency, | ||||
| 		Queues:         queues, | ||||
| 		StrictPriority: strict, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // SetState set the state field of the process info. | ||||
| func (p *ProcessInfo) SetState(state string) { | ||||
| 	p.mu.Lock() | ||||
| 	defer p.mu.Unlock() | ||||
| 	p.State = state | ||||
| } | ||||
|  | ||||
| // SetStarted set the started field of the process info. | ||||
| func (p *ProcessInfo) SetStarted(t time.Time) { | ||||
| 	p.mu.Lock() | ||||
| 	defer p.mu.Unlock() | ||||
| 	p.Started = t | ||||
| } | ||||
|  | ||||
| // IncrActiveWorkerCount increments active worker count by delta. | ||||
| func (p *ProcessInfo) IncrActiveWorkerCount(delta int) { | ||||
| 	p.mu.Lock() | ||||
| 	defer p.mu.Unlock() | ||||
| 	p.ActiveWorkerCount += delta | ||||
| } | ||||
|   | ||||
| @@ -5,6 +5,7 @@ | ||||
| package base | ||||
|  | ||||
| import ( | ||||
| 	"sync" | ||||
| 	"testing" | ||||
| 	"time" | ||||
| ) | ||||
| @@ -61,7 +62,7 @@ func TestFailureKey(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestProcessStatusKey(t *testing.T) { | ||||
| func TestProcessInfoKey(t *testing.T) { | ||||
| 	tests := []struct { | ||||
| 		hostname string | ||||
| 		pid      int | ||||
| @@ -72,9 +73,36 @@ func TestProcessStatusKey(t *testing.T) { | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		got := ProcessStatusKey(tc.hostname, tc.pid) | ||||
| 		got := ProcessInfoKey(tc.hostname, tc.pid) | ||||
| 		if got != tc.want { | ||||
| 			t.Errorf("ProcessStatusKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want) | ||||
| 			t.Errorf("ProcessInfoKey(%s, %d) = %s, want %s", tc.hostname, tc.pid, got, tc.want) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Note: Run this test with -race flag to check for data race. | ||||
| func TestProcessInfoSetter(t *testing.T) { | ||||
| 	pi := NewProcessInfo("localhost", 1234, 8, map[string]uint{"default": 1}, false) | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
|  | ||||
| 	wg.Add(3) | ||||
|  | ||||
| 	go func() { | ||||
| 		pi.SetState("runnning") | ||||
| 		wg.Done() | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		pi.SetStarted(time.Now()) | ||||
| 		pi.IncrActiveWorkerCount(1) | ||||
| 		wg.Done() | ||||
| 	}() | ||||
|  | ||||
| 	go func() { | ||||
| 		pi.SetState("stopped") | ||||
| 		wg.Done() | ||||
| 	}() | ||||
|  | ||||
| 	wg.Wait() | ||||
| } | ||||
|   | ||||
| @@ -755,3 +755,40 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // ListProcesses returns the list of process statuses. | ||||
| func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) { | ||||
| 	// Note: Script also removes stale keys. | ||||
| 	script := redis.NewScript(` | ||||
| 	local res = {} | ||||
| 	local now = tonumber(ARGV[1]) | ||||
| 	local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") | ||||
| 	for _, key in ipairs(keys) do | ||||
| 		local ps = redis.call("GET", key) | ||||
| 		if ps then | ||||
| 			table.insert(res, ps) | ||||
| 		end   | ||||
| 	end | ||||
| 	redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) | ||||
| 	return res | ||||
| 	`) | ||||
| 	res, err := script.Run(r.client, | ||||
| 		[]string{base.AllProcesses}, time.Now().UTC().Unix()).Result() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	data, err := cast.ToStringSliceE(res) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	var processes []*base.ProcessInfo | ||||
| 	for _, s := range data { | ||||
| 		var ps base.ProcessInfo | ||||
| 		err := json.Unmarshal([]byte(s), &ps) | ||||
| 		if err != nil { | ||||
| 			continue // skip bad data | ||||
| 		} | ||||
| 		processes = append(processes, &ps) | ||||
| 	} | ||||
| 	return processes, nil | ||||
| } | ||||
|   | ||||
| @@ -2050,3 +2050,56 @@ func TestRemoveQueueError(t *testing.T) { | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestListProcesses(t *testing.T) { | ||||
| 	r := setup(t) | ||||
|  | ||||
| 	ps1 := &base.ProcessInfo{ | ||||
| 		Concurrency:       10, | ||||
| 		Queues:            map[string]uint{"default": 1}, | ||||
| 		Host:              "do.droplet1", | ||||
| 		PID:               1234, | ||||
| 		State:             "running", | ||||
| 		Started:           time.Now().Add(-time.Hour), | ||||
| 		ActiveWorkerCount: 5, | ||||
| 	} | ||||
|  | ||||
| 	ps2 := &base.ProcessInfo{ | ||||
| 		Concurrency:       20, | ||||
| 		Queues:            map[string]uint{"email": 1}, | ||||
| 		Host:              "do.droplet2", | ||||
| 		PID:               9876, | ||||
| 		State:             "stopped", | ||||
| 		Started:           time.Now().Add(-2 * time.Hour), | ||||
| 		ActiveWorkerCount: 20, | ||||
| 	} | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		processes []*base.ProcessInfo | ||||
| 	}{ | ||||
| 		{processes: []*base.ProcessInfo{}}, | ||||
| 		{processes: []*base.ProcessInfo{ps1}}, | ||||
| 		{processes: []*base.ProcessInfo{ps1, ps2}}, | ||||
| 	} | ||||
|  | ||||
| 	ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		h.FlushDB(t, r.client) | ||||
|  | ||||
| 		for _, ps := range tc.processes { | ||||
| 			if err := r.WriteProcessInfo(ps, 5*time.Second); err != nil { | ||||
| 				t.Fatal(err) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		got, err := r.ListProcesses() | ||||
| 		if err != nil { | ||||
| 			t.Errorf("r.ListProcesses returned an error: %v", err) | ||||
| 		} | ||||
| 		if diff := cmp.Diff(tc.processes, got, h.SortProcessInfoOpt, ignoreOpt); diff != "" { | ||||
| 			t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s", | ||||
| 				got, tc.processes, diff) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
| @@ -347,28 +347,52 @@ func (r *RDB) forwardSingle(src, dst string) error { | ||||
| 		[]string{src, dst}, now).Err() | ||||
| } | ||||
|  | ||||
| // WriteProcessStatus writes process information to redis with expiration | ||||
| // WriteProcessInfo writes process information to redis with expiration | ||||
| // set to the value ttl. | ||||
| func (r *RDB) WriteProcessStatus(ps *base.ProcessStatus, ttl time.Duration) error { | ||||
| func (r *RDB) WriteProcessInfo(ps *base.ProcessInfo, ttl time.Duration) error { | ||||
| 	bytes, err := json.Marshal(ps) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	key := base.ProcessStatusKey(ps.Host, ps.PID) | ||||
| 	return r.client.Set(key, string(bytes), ttl).Err() | ||||
| 	// Note: Add key to ZSET with expiration time as score. | ||||
| 	// ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996 | ||||
| 	exp := time.Now().Add(ttl).UTC() | ||||
| 	key := base.ProcessInfoKey(ps.Host, ps.PID) | ||||
| 	// KEYS[1] -> asynq:ps | ||||
| 	// KEYS[2] -> asynq:ps:<host:pid> | ||||
| 	// ARGV[1] -> expiration time | ||||
| 	// ARGV[2] -> TTL in seconds | ||||
| 	// ARGV[3] -> process info | ||||
| 	script := redis.NewScript(` | ||||
| 	redis.call("ZADD", KEYS[1], ARGV[1], KEYS[2]) | ||||
| 	redis.call("SETEX", KEYS[2], ARGV[2], ARGV[3]) | ||||
| 	return redis.status_reply("OK") | ||||
| 	`) | ||||
| 	return script.Run(r.client, []string{base.AllProcesses, key}, float64(exp.Unix()), ttl.Seconds(), string(bytes)).Err() | ||||
| } | ||||
|  | ||||
| // ReadProcessStatus reads process information stored in redis. | ||||
| func (r *RDB) ReadProcessStatus(host string, pid int) (*base.ProcessStatus, error) { | ||||
| 	key := base.ProcessStatusKey(host, pid) | ||||
| // ReadProcessInfo reads process information stored in redis. | ||||
| func (r *RDB) ReadProcessInfo(host string, pid int) (*base.ProcessInfo, error) { | ||||
| 	key := base.ProcessInfoKey(host, pid) | ||||
| 	data, err := r.client.Get(key).Result() | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	var ps base.ProcessStatus | ||||
| 	err = json.Unmarshal([]byte(data), &ps) | ||||
| 	var pinfo base.ProcessInfo | ||||
| 	err = json.Unmarshal([]byte(data), &pinfo) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
| 	return &ps, nil | ||||
| 	return &pinfo, nil | ||||
| } | ||||
|  | ||||
| // ClearProcessInfo deletes process information from redis. | ||||
| func (r *RDB) ClearProcessInfo(ps *base.ProcessInfo) error { | ||||
| 	key := base.ProcessInfoKey(ps.Host, ps.PID) | ||||
| 	script := redis.NewScript(` | ||||
| 	redis.call("ZREM", KEYS[1], KEYS[2]) | ||||
| 	redis.call("DEL", KEYS[2]) | ||||
| 	return redis.status_reply("OK") | ||||
| 	`) | ||||
| 	return script.Run(r.client, []string{base.AllProcesses, key}).Err() | ||||
| } | ||||
|   | ||||
| @@ -6,11 +6,13 @@ package rdb | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"testing" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/go-redis/redis/v7" | ||||
| 	"github.com/google/go-cmp/cmp" | ||||
| 	"github.com/google/go-cmp/cmp/cmpopts" | ||||
| 	h "github.com/hibiken/asynq/internal/asynqtest" | ||||
| 	"github.com/hibiken/asynq/internal/base" | ||||
| ) | ||||
| @@ -739,48 +741,81 @@ func TestCheckAndEnqueue(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestReadWriteProcessStatus(t *testing.T) { | ||||
| func TestReadWriteClearProcessInfo(t *testing.T) { | ||||
| 	r := setup(t) | ||||
| 	ps1 := &base.ProcessStatus{ | ||||
| 		Concurrency: 10, | ||||
| 		Queues:      map[string]uint{"default": 2, "email": 5, "low": 1}, | ||||
| 		PID:         98765, | ||||
| 		Host:        "localhost", | ||||
| 		State:       "running", | ||||
| 		Started:     time.Now(), | ||||
| 	pinfo := &base.ProcessInfo{ | ||||
| 		Concurrency:       10, | ||||
| 		Queues:            map[string]uint{"default": 2, "email": 5, "low": 1}, | ||||
| 		PID:               98765, | ||||
| 		Host:              "localhost", | ||||
| 		State:             "running", | ||||
| 		Started:           time.Now(), | ||||
| 		ActiveWorkerCount: 1, | ||||
| 	} | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		ps  *base.ProcessStatus | ||||
| 		pi  *base.ProcessInfo | ||||
| 		ttl time.Duration | ||||
| 	}{ | ||||
| 		{ps1, 5 * time.Second}, | ||||
| 		{pinfo, 5 * time.Second}, | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		h.FlushDB(t, r.client) | ||||
|  | ||||
| 		err := r.WriteProcessStatus(tc.ps, tc.ttl) | ||||
| 		err := r.WriteProcessInfo(tc.pi, tc.ttl) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("r.WriteProcessStatus returned an error: %v", err) | ||||
| 			t.Errorf("r.WriteProcessInfo returned an error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		got, err := r.ReadProcessStatus(tc.ps.Host, tc.ps.PID) | ||||
| 		got, err := r.ReadProcessInfo(tc.pi.Host, tc.pi.PID) | ||||
| 		if err != nil { | ||||
| 			t.Errorf("r.ReadProcessStatus returned an error: %v", err) | ||||
| 			t.Errorf("r.ReadProcessInfo returned an error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		if diff := cmp.Diff(tc.ps, got); diff != "" { | ||||
| 			t.Errorf("r.ReadProcessStatus(%q, %d) = %+v, want %+v; (-want,+got)\n%s", | ||||
| 				tc.ps.Host, tc.ps.PID, got, tc.ps, diff) | ||||
| 		ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) | ||||
| 		if diff := cmp.Diff(tc.pi, got, ignoreOpt); diff != "" { | ||||
| 			t.Errorf("r.ReadProcessInfo(%q, %d) = %+v, want %+v; (-want,+got)\n%s", | ||||
| 				tc.pi.Host, tc.pi.PID, got, tc.pi, diff) | ||||
| 		} | ||||
|  | ||||
| 		key := base.ProcessStatusKey(tc.ps.Host, tc.ps.PID) | ||||
| 		key := base.ProcessInfoKey(tc.pi.Host, tc.pi.PID) | ||||
| 		gotTTL := r.client.TTL(key).Val() | ||||
| 		if !cmp.Equal(tc.ttl, gotTTL, timeCmpOpt) { | ||||
| 			t.Errorf("redis TTL %q returned %v, want %v", key, gotTTL, tc.ttl) | ||||
| 		} | ||||
|  | ||||
| 		now := time.Now().UTC() | ||||
| 		allKeys, err := r.client.ZRangeByScore(base.AllProcesses, &redis.ZRangeBy{ | ||||
| 			Min: strconv.Itoa(int(now.Unix())), | ||||
| 			Max: "+inf", | ||||
| 		}).Result() | ||||
| 		if err != nil { | ||||
| 			t.Errorf("redis ZRANGEBYSCORE %q %d +inf returned an error: %v", | ||||
| 				base.AllProcesses, now.Unix(), err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		wantAllKeys := []string{key} | ||||
| 		if diff := cmp.Diff(wantAllKeys, allKeys); diff != "" { | ||||
| 			t.Errorf("all keys = %v, want %v; (-want,+got)\n%s", allKeys, wantAllKeys, diff) | ||||
| 		} | ||||
|  | ||||
| 		if err := r.ClearProcessInfo(tc.pi); err != nil { | ||||
| 			t.Errorf("r.ClearProcessInfo returned an error: %v", err) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// 1 means key exists | ||||
| 		if r.client.Exists(key).Val() == 1 { | ||||
| 			t.Errorf("expected %q to be deleted", key) | ||||
| 		} | ||||
|  | ||||
| 		if r.client.ZCard(base.AllProcesses).Val() != 0 { | ||||
| 			t.Errorf("expected %q to be empty", base.AllProcesses) | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| } | ||||
|   | ||||
							
								
								
									
										54
									
								
								processor.go
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								processor.go
									
									
									
									
									
								
							| @@ -19,6 +19,8 @@ import ( | ||||
| type processor struct { | ||||
| 	rdb *rdb.RDB | ||||
|  | ||||
| 	pinfo *base.ProcessInfo | ||||
|  | ||||
| 	handler Handler | ||||
|  | ||||
| 	queueConfig map[string]uint | ||||
| @@ -53,25 +55,21 @@ type processor struct { | ||||
| type retryDelayFunc func(n int, err error, task *Task) time.Duration | ||||
|  | ||||
| // newProcessor constructs a new processor. | ||||
| // | ||||
| // r is an instance of RDB used by the processor. | ||||
| // n specifies the max number of concurrenct worker goroutines. | ||||
| // qfcg is a mapping of queue names to associated priority level. | ||||
| // strict specifies whether queue priority should be treated strictly. | ||||
| // fn is a function to compute retry delay. | ||||
| func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, strict bool, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor { | ||||
| func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRequestCh chan<- *syncRequest) *processor { | ||||
| 	qcfg := normalizeQueueCfg(pinfo.Queues) | ||||
| 	orderedQueues := []string(nil) | ||||
| 	if strict { | ||||
| 	if pinfo.StrictPriority { | ||||
| 		orderedQueues = sortByPriority(qcfg) | ||||
| 	} | ||||
| 	return &processor{ | ||||
| 		rdb:            r, | ||||
| 		pinfo:          pinfo, | ||||
| 		queueConfig:    qcfg, | ||||
| 		orderedQueues:  orderedQueues, | ||||
| 		retryDelayFunc: fn, | ||||
| 		syncRequestCh:  syncRequestCh, | ||||
| 		errLogLimiter:  rate.NewLimiter(rate.Every(3*time.Second), 1), | ||||
| 		sema:           make(chan struct{}, n), | ||||
| 		sema:           make(chan struct{}, pinfo.Concurrency), | ||||
| 		done:           make(chan struct{}), | ||||
| 		abort:          make(chan struct{}), | ||||
| 		quit:           make(chan struct{}), | ||||
| @@ -153,8 +151,12 @@ func (p *processor) exec() { | ||||
| 		p.requeue(msg) | ||||
| 		return | ||||
| 	case p.sema <- struct{}{}: // acquire token | ||||
| 		p.pinfo.IncrActiveWorkerCount(1) | ||||
| 		go func() { | ||||
| 			defer func() { <-p.sema /* release token */ }() | ||||
| 			defer func() { | ||||
| 				<-p.sema /* release token */ | ||||
| 				p.pinfo.IncrActiveWorkerCount(-1) | ||||
| 			}() | ||||
|  | ||||
| 			resCh := make(chan error, 1) | ||||
| 			task := NewTask(msg.Type, msg.Payload) | ||||
| @@ -331,3 +333,35 @@ type byPriority []*queue | ||||
| func (x byPriority) Len() int           { return len(x) } | ||||
| func (x byPriority) Less(i, j int) bool { return x[i].priority < x[j].priority } | ||||
| func (x byPriority) Swap(i, j int)      { x[i], x[j] = x[j], x[i] } | ||||
|  | ||||
| // normalizeQueueCfg divides priority numbers by their | ||||
| // greatest common divisor. | ||||
| func normalizeQueueCfg(queueCfg map[string]uint) map[string]uint { | ||||
| 	var xs []uint | ||||
| 	for _, x := range queueCfg { | ||||
| 		xs = append(xs, x) | ||||
| 	} | ||||
| 	d := gcd(xs...) | ||||
| 	res := make(map[string]uint) | ||||
| 	for q, x := range queueCfg { | ||||
| 		res[q] = x / d | ||||
| 	} | ||||
| 	return res | ||||
| } | ||||
|  | ||||
| func gcd(xs ...uint) uint { | ||||
| 	fn := func(x, y uint) uint { | ||||
| 		for y > 0 { | ||||
| 			x, y = y, x%y | ||||
| 		} | ||||
| 		return x | ||||
| 	} | ||||
| 	res := xs[0] | ||||
| 	for i := 0; i < len(xs); i++ { | ||||
| 		res = fn(xs[i], res) | ||||
| 		if res == 1 { | ||||
| 			return 1 | ||||
| 		} | ||||
| 	} | ||||
| 	return res | ||||
| } | ||||
|   | ||||
| @@ -65,7 +65,8 @@ func TestProcessorSuccess(t *testing.T) { | ||||
| 			processed = append(processed, task) | ||||
| 			return nil | ||||
| 		} | ||||
| 		p := newProcessor(rdbClient, 10, defaultQueueConfig, false, defaultDelayFunc, nil) | ||||
| 		pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) | ||||
| 		p := newProcessor(rdbClient, pi, defaultDelayFunc, nil) | ||||
| 		p.handler = HandlerFunc(handler) | ||||
|  | ||||
| 		p.start() | ||||
| @@ -148,7 +149,8 @@ func TestProcessorRetry(t *testing.T) { | ||||
| 		handler := func(task *Task) error { | ||||
| 			return fmt.Errorf(errMsg) | ||||
| 		} | ||||
| 		p := newProcessor(rdbClient, 10, defaultQueueConfig, false, delayFunc, nil) | ||||
| 		pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) | ||||
| 		p := newProcessor(rdbClient, pi, delayFunc, nil) | ||||
| 		p.handler = HandlerFunc(handler) | ||||
|  | ||||
| 		p.start() | ||||
| @@ -207,7 +209,8 @@ func TestProcessorQueues(t *testing.T) { | ||||
| 	} | ||||
|  | ||||
| 	for _, tc := range tests { | ||||
| 		p := newProcessor(nil, 10, tc.queueCfg, false, defaultDelayFunc, nil) | ||||
| 		pi := base.NewProcessInfo("localhost", 1234, 10, tc.queueCfg, false) | ||||
| 		p := newProcessor(nil, pi, defaultDelayFunc, nil) | ||||
| 		got := p.queues() | ||||
| 		if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { | ||||
| 			t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", | ||||
| @@ -273,7 +276,8 @@ func TestProcessorWithStrictPriority(t *testing.T) { | ||||
| 			"low":                 1, | ||||
| 		} | ||||
| 		// Note: Set concurrency to 1 to make sure tasks are processed one at a time. | ||||
| 		p := newProcessor(rdbClient, 1 /*concurrency */, queueCfg, true /* strict */, defaultDelayFunc, nil) | ||||
| 		pi := base.NewProcessInfo("localhost", 1234, 1 /*concurrency */, queueCfg, true /* strict */) | ||||
| 		p := newProcessor(rdbClient, pi, defaultDelayFunc, nil) | ||||
| 		p.handler = HandlerFunc(handler) | ||||
|  | ||||
| 		p.start() | ||||
|   | ||||
| @@ -10,7 +10,6 @@ import ( | ||||
| 	"os" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"text/tabwriter" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/go-redis/redis/v7" | ||||
| @@ -215,18 +214,3 @@ func listDead(r *rdb.RDB) { | ||||
| 	printTable(cols, printRows) | ||||
| 	fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) | ||||
| } | ||||
|  | ||||
| func printTable(cols []string, printRows func(w io.Writer, tmpl string)) { | ||||
| 	format := strings.Repeat("%v\t", len(cols)) + "\n" | ||||
| 	tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) | ||||
| 	var headers []interface{} | ||||
| 	var seps []interface{} | ||||
| 	for _, name := range cols { | ||||
| 		headers = append(headers, name) | ||||
| 		seps = append(seps, strings.Repeat("-", len(name))) | ||||
| 	} | ||||
| 	fmt.Fprintf(tw, format, headers...) | ||||
| 	fmt.Fprintf(tw, format, seps...) | ||||
| 	printRows(tw, format) | ||||
| 	tw.Flush() | ||||
| } | ||||
|   | ||||
							
								
								
									
										89
									
								
								tools/asynqmon/cmd/ps.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								tools/asynqmon/cmd/ps.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,89 @@ | ||||
| // Copyright 2020 Kentaro Hibino. All rights reserved. | ||||
| // Use of this source code is governed by a MIT license | ||||
| // that can be found in the LICENSE file. | ||||
|  | ||||
| package cmd | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/go-redis/redis/v7" | ||||
| 	"github.com/hibiken/asynq/internal/rdb" | ||||
| 	"github.com/spf13/cobra" | ||||
| 	"github.com/spf13/viper" | ||||
| ) | ||||
|  | ||||
| // psCmd represents the ps command | ||||
| var psCmd = &cobra.Command{ | ||||
| 	Use:   "ps", | ||||
| 	Short: "Shows all background worker processes", | ||||
| 	Long: `Ps (asynqmon ps) will show all background worker processes | ||||
| backed by the specified redis instance. | ||||
|  | ||||
| The command shows the following for each process: | ||||
| * Host and PID of the process | ||||
| * Number of active workers out of worker pool | ||||
| * Queues configuration | ||||
| * State of the process ("running" | "stopped") | ||||
|  | ||||
| A "running" process is processing tasks in queues. | ||||
| A "stopped" process are no longer processing new tasks.`, | ||||
| 	Args: cobra.NoArgs, | ||||
| 	Run:  ps, | ||||
| } | ||||
|  | ||||
| func init() { | ||||
| 	rootCmd.AddCommand(psCmd) | ||||
| } | ||||
|  | ||||
| func ps(cmd *cobra.Command, args []string) { | ||||
| 	c := redis.NewClient(&redis.Options{ | ||||
| 		Addr:     viper.GetString("uri"), | ||||
| 		DB:       viper.GetInt("db"), | ||||
| 		Password: viper.GetString("password"), | ||||
| 	}) | ||||
| 	r := rdb.NewRDB(c) | ||||
|  | ||||
| 	processes, err := r.ListProcesses() | ||||
| 	if err != nil { | ||||
| 		fmt.Println(err) | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| 	if len(processes) == 0 { | ||||
| 		fmt.Println("No processes") | ||||
| 		return | ||||
| 	} | ||||
| 	cols := []string{"Host", "PID", "State", "Active Workers", "Queues", "Started"} | ||||
| 	printRows := func(w io.Writer, tmpl string) { | ||||
| 		for _, ps := range processes { | ||||
| 			fmt.Fprintf(w, tmpl, | ||||
| 				ps.Host, ps.PID, ps.State, | ||||
| 				fmt.Sprintf("%d/%d", ps.ActiveWorkerCount, ps.Concurrency), | ||||
| 				formatQueues(ps.Queues), timeAgo(ps.Started)) | ||||
| 		} | ||||
| 	} | ||||
| 	printTable(cols, printRows) | ||||
| } | ||||
|  | ||||
| // timeAgo takes a time and returns a string of the format "<duration> ago". | ||||
| func timeAgo(since time.Time) string { | ||||
| 	d := time.Since(since).Round(time.Second) | ||||
| 	return fmt.Sprintf("%v ago", d) | ||||
| } | ||||
|  | ||||
| func formatQueues(queues map[string]uint) string { | ||||
| 	var b strings.Builder | ||||
| 	l := len(queues) | ||||
| 	for qname, p := range queues { | ||||
| 		fmt.Fprintf(&b, "%s:%d", qname, p) | ||||
| 		l-- | ||||
| 		if l > 0 { | ||||
| 			b.WriteString(" ") | ||||
| 		} | ||||
| 	} | ||||
| 	return b.String() | ||||
| } | ||||
| @@ -6,7 +6,10 @@ package cmd | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"text/tabwriter" | ||||
|  | ||||
| 	"github.com/spf13/cobra" | ||||
|  | ||||
| @@ -81,3 +84,36 @@ func initConfig() { | ||||
| 		fmt.Println("Using config file:", viper.ConfigFileUsed()) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // printTable is a helper function to print data in table format. | ||||
| // | ||||
| // cols is a list of headers and printRow specifies how to print rows. | ||||
| // | ||||
| // Example: | ||||
| // type User struct { | ||||
| //     Name string | ||||
| //     Addr string | ||||
| //     Age  int | ||||
| // } | ||||
| // data := []*User{{"user1", "addr1", 24}, {"user2", "addr2", 42}, ...} | ||||
| // cols := []string{"Name", "Addr", "Age"} | ||||
| // printRows := func(w io.Writer, tmpl string) { | ||||
| //     for _, u := range data { | ||||
| //         fmt.Fprintf(w, tmpl, u.Name, u.Addr, u.Age) | ||||
| //     } | ||||
| // } | ||||
| // printTable(cols, printRows) | ||||
| func printTable(cols []string, printRows func(w io.Writer, tmpl string)) { | ||||
| 	format := strings.Repeat("%v\t", len(cols)) + "\n" | ||||
| 	tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) | ||||
| 	var headers []interface{} | ||||
| 	var seps []interface{} | ||||
| 	for _, name := range cols { | ||||
| 		headers = append(headers, name) | ||||
| 		seps = append(seps, strings.Repeat("-", len(name))) | ||||
| 	} | ||||
| 	fmt.Fprintf(tw, format, headers...) | ||||
| 	fmt.Fprintf(tw, format, seps...) | ||||
| 	printRows(tw, format) | ||||
| 	tw.Flush() | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user