diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 5a3a496..29cb80b 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -41,6 +41,7 @@ type EnqueuedTask struct { ID xid.ID Type string Payload map[string]interface{} + Queue string } // InProgressTask is a task that's currently being processed. @@ -57,6 +58,7 @@ type ScheduledTask struct { Payload map[string]interface{} ProcessAt time.Time Score int64 + Queue string } // RetryTask is a task that's in retry queue because worker failed to process the task. @@ -70,6 +72,7 @@ type RetryTask struct { Retried int Retry int Score int64 + Queue string } // DeadTask is a task in that has exhausted all retries. @@ -80,6 +83,7 @@ type DeadTask struct { LastFailedAt time.Time ErrorMsg string Score int64 + Queue string } // CurrentStats returns a current state of the queues. @@ -263,6 +267,7 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { ID: msg.ID, Type: msg.Type, Payload: msg.Payload, + Queue: msg.Queue, }) } return tasks, nil @@ -313,6 +318,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { ID: msg.ID, Type: msg.Type, Payload: msg.Payload, + Queue: msg.Queue, ProcessAt: processAt, Score: int64(z.Score), }) @@ -346,6 +352,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) { ErrorMsg: msg.ErrorMsg, Retry: msg.Retry, Retried: msg.Retried, + Queue: msg.Queue, ProcessAt: processAt, Score: int64(z.Score), }) @@ -376,6 +383,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) { Type: msg.Type, Payload: msg.Payload, ErrorMsg: msg.ErrorMsg, + Queue: msg.Queue, LastFailedAt: lastFailedAt, Score: int64(z.Score), }) diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 6ee2448..0a6c104 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -135,6 +135,7 @@ func TestCurrentStatsWithoutData(t *testing.T) { Processed: 0, Failed: 0, Timestamp: time.Now(), + Queues: map[string]int{}, } got, err := r.CurrentStats() @@ -227,10 +228,10 @@ func TestListEnqueued(t *testing.T) { m2 := h.NewTaskMessage("reindex", nil) m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") - t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload} - t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload} - t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload} - t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload} + t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, Queue: m1.Queue} + t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, Queue: m2.Queue} + t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload, Queue: m3.Queue} + t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload, Queue: m4.Queue} tests := []struct { enqueued map[string][]*base.TaskMessage want []*EnqueuedTask @@ -332,8 +333,8 @@ func TestListScheduled(t *testing.T) { m2 := h.NewTaskMessage("reindex", nil) p1 := time.Now().Add(30 * time.Minute) p2 := time.Now().Add(24 * time.Hour) - t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix()} - t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix()} + t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix(), Queue: m1.Queue} + t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix(), Queue: m2.Queue} tests := []struct { scheduled []h.ZSetEntry @@ -406,6 +407,7 @@ func TestListRetry(t *testing.T) { Retried: m1.Retried, Retry: m1.Retry, Score: p1.Unix(), + Queue: m1.Queue, } t2 := &RetryTask{ ID: m2.ID, @@ -416,6 +418,7 @@ func TestListRetry(t *testing.T) { Retried: m2.Retried, Retry: m2.Retry, Score: p2.Unix(), + Queue: m1.Queue, } tests := []struct { @@ -484,6 +487,7 @@ func TestListDead(t *testing.T) { LastFailedAt: f1, ErrorMsg: m1.ErrorMsg, Score: f1.Unix(), + Queue: m1.Queue, } t2 := &DeadTask{ ID: m2.ID, @@ -492,6 +496,7 @@ func TestListDead(t *testing.T) { LastFailedAt: f2, ErrorMsg: m2.ErrorMsg, Score: f2.Unix(), + Queue: m2.Queue, } tests := []struct { diff --git a/tools/asynqmon/cmd/ls.go b/tools/asynqmon/cmd/ls.go index 009bbff..2014003 100644 --- a/tools/asynqmon/cmd/ls.go +++ b/tools/asynqmon/cmd/ls.go @@ -115,10 +115,10 @@ func listEnqueued(r *rdb.RDB) { fmt.Println("No enqueued tasks") return } - cols := []string{"ID", "Type", "Payload"} + cols := []string{"ID", "Type", "Payload", "Queue"} printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload) + fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue) } } printTable(cols, printRows) @@ -153,11 +153,11 @@ func listScheduled(r *rdb.RDB) { fmt.Println("No scheduled tasks") return } - cols := []string{"ID", "Type", "Payload", "Process In"} + cols := []string{"ID", "Type", "Payload", "Process In", "Queue"} printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn, t.Queue) } } printTable(cols, printRows) @@ -173,11 +173,11 @@ func listRetry(r *rdb.RDB) { fmt.Println("No retry tasks") return } - cols := []string{"ID", "Type", "Payload", "Retry In", "Last Error", "Retried", "Max Retry"} + cols := []string{"ID", "Type", "Payload", "Retry In", "Last Error", "Retried", "Max Retry", "Queue"} printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds()) - fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry, t.Queue) } } printTable(cols, printRows) @@ -193,10 +193,10 @@ func listDead(r *rdb.RDB) { fmt.Println("No dead tasks") return } - cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error"} + cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error", "Queue"} printRows := func(w io.Writer, tmpl string) { for _, t := range tasks { - fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg) + fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue) } } printTable(cols, printRows)