mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Add "Queue" column to the output of asynqmon ls
This commit is contained in:
parent
c5b215e3b9
commit
cf78a12866
@ -41,6 +41,7 @@ type EnqueuedTask struct {
|
|||||||
ID xid.ID
|
ID xid.ID
|
||||||
Type string
|
Type string
|
||||||
Payload map[string]interface{}
|
Payload map[string]interface{}
|
||||||
|
Queue string
|
||||||
}
|
}
|
||||||
|
|
||||||
// InProgressTask is a task that's currently being processed.
|
// InProgressTask is a task that's currently being processed.
|
||||||
@ -57,6 +58,7 @@ type ScheduledTask struct {
|
|||||||
Payload map[string]interface{}
|
Payload map[string]interface{}
|
||||||
ProcessAt time.Time
|
ProcessAt time.Time
|
||||||
Score int64
|
Score int64
|
||||||
|
Queue string
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetryTask is a task that's in retry queue because worker failed to process the task.
|
// RetryTask is a task that's in retry queue because worker failed to process the task.
|
||||||
@ -70,6 +72,7 @@ type RetryTask struct {
|
|||||||
Retried int
|
Retried int
|
||||||
Retry int
|
Retry int
|
||||||
Score int64
|
Score int64
|
||||||
|
Queue string
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeadTask is a task in that has exhausted all retries.
|
// DeadTask is a task in that has exhausted all retries.
|
||||||
@ -80,6 +83,7 @@ type DeadTask struct {
|
|||||||
LastFailedAt time.Time
|
LastFailedAt time.Time
|
||||||
ErrorMsg string
|
ErrorMsg string
|
||||||
Score int64
|
Score int64
|
||||||
|
Queue string
|
||||||
}
|
}
|
||||||
|
|
||||||
// CurrentStats returns a current state of the queues.
|
// CurrentStats returns a current state of the queues.
|
||||||
@ -263,6 +267,7 @@ func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
|
|||||||
ID: msg.ID,
|
ID: msg.ID,
|
||||||
Type: msg.Type,
|
Type: msg.Type,
|
||||||
Payload: msg.Payload,
|
Payload: msg.Payload,
|
||||||
|
Queue: msg.Queue,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return tasks, nil
|
return tasks, nil
|
||||||
@ -313,6 +318,7 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
|
|||||||
ID: msg.ID,
|
ID: msg.ID,
|
||||||
Type: msg.Type,
|
Type: msg.Type,
|
||||||
Payload: msg.Payload,
|
Payload: msg.Payload,
|
||||||
|
Queue: msg.Queue,
|
||||||
ProcessAt: processAt,
|
ProcessAt: processAt,
|
||||||
Score: int64(z.Score),
|
Score: int64(z.Score),
|
||||||
})
|
})
|
||||||
@ -346,6 +352,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
|
|||||||
ErrorMsg: msg.ErrorMsg,
|
ErrorMsg: msg.ErrorMsg,
|
||||||
Retry: msg.Retry,
|
Retry: msg.Retry,
|
||||||
Retried: msg.Retried,
|
Retried: msg.Retried,
|
||||||
|
Queue: msg.Queue,
|
||||||
ProcessAt: processAt,
|
ProcessAt: processAt,
|
||||||
Score: int64(z.Score),
|
Score: int64(z.Score),
|
||||||
})
|
})
|
||||||
@ -376,6 +383,7 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
|
|||||||
Type: msg.Type,
|
Type: msg.Type,
|
||||||
Payload: msg.Payload,
|
Payload: msg.Payload,
|
||||||
ErrorMsg: msg.ErrorMsg,
|
ErrorMsg: msg.ErrorMsg,
|
||||||
|
Queue: msg.Queue,
|
||||||
LastFailedAt: lastFailedAt,
|
LastFailedAt: lastFailedAt,
|
||||||
Score: int64(z.Score),
|
Score: int64(z.Score),
|
||||||
})
|
})
|
||||||
|
@ -135,6 +135,7 @@ func TestCurrentStatsWithoutData(t *testing.T) {
|
|||||||
Processed: 0,
|
Processed: 0,
|
||||||
Failed: 0,
|
Failed: 0,
|
||||||
Timestamp: time.Now(),
|
Timestamp: time.Now(),
|
||||||
|
Queues: map[string]int{},
|
||||||
}
|
}
|
||||||
|
|
||||||
got, err := r.CurrentStats()
|
got, err := r.CurrentStats()
|
||||||
@ -227,10 +228,10 @@ func TestListEnqueued(t *testing.T) {
|
|||||||
m2 := h.NewTaskMessage("reindex", nil)
|
m2 := h.NewTaskMessage("reindex", nil)
|
||||||
m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
|
m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
|
||||||
m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
|
m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
|
||||||
t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload}
|
t1 := &EnqueuedTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, Queue: m1.Queue}
|
||||||
t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload}
|
t2 := &EnqueuedTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, Queue: m2.Queue}
|
||||||
t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload}
|
t3 := &EnqueuedTask{ID: m3.ID, Type: m3.Type, Payload: m3.Payload, Queue: m3.Queue}
|
||||||
t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload}
|
t4 := &EnqueuedTask{ID: m4.ID, Type: m4.Type, Payload: m4.Payload, Queue: m4.Queue}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
enqueued map[string][]*base.TaskMessage
|
enqueued map[string][]*base.TaskMessage
|
||||||
want []*EnqueuedTask
|
want []*EnqueuedTask
|
||||||
@ -332,8 +333,8 @@ func TestListScheduled(t *testing.T) {
|
|||||||
m2 := h.NewTaskMessage("reindex", nil)
|
m2 := h.NewTaskMessage("reindex", nil)
|
||||||
p1 := time.Now().Add(30 * time.Minute)
|
p1 := time.Now().Add(30 * time.Minute)
|
||||||
p2 := time.Now().Add(24 * time.Hour)
|
p2 := time.Now().Add(24 * time.Hour)
|
||||||
t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix()}
|
t1 := &ScheduledTask{ID: m1.ID, Type: m1.Type, Payload: m1.Payload, ProcessAt: p1, Score: p1.Unix(), Queue: m1.Queue}
|
||||||
t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix()}
|
t2 := &ScheduledTask{ID: m2.ID, Type: m2.Type, Payload: m2.Payload, ProcessAt: p2, Score: p2.Unix(), Queue: m2.Queue}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
scheduled []h.ZSetEntry
|
scheduled []h.ZSetEntry
|
||||||
@ -406,6 +407,7 @@ func TestListRetry(t *testing.T) {
|
|||||||
Retried: m1.Retried,
|
Retried: m1.Retried,
|
||||||
Retry: m1.Retry,
|
Retry: m1.Retry,
|
||||||
Score: p1.Unix(),
|
Score: p1.Unix(),
|
||||||
|
Queue: m1.Queue,
|
||||||
}
|
}
|
||||||
t2 := &RetryTask{
|
t2 := &RetryTask{
|
||||||
ID: m2.ID,
|
ID: m2.ID,
|
||||||
@ -416,6 +418,7 @@ func TestListRetry(t *testing.T) {
|
|||||||
Retried: m2.Retried,
|
Retried: m2.Retried,
|
||||||
Retry: m2.Retry,
|
Retry: m2.Retry,
|
||||||
Score: p2.Unix(),
|
Score: p2.Unix(),
|
||||||
|
Queue: m1.Queue,
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -484,6 +487,7 @@ func TestListDead(t *testing.T) {
|
|||||||
LastFailedAt: f1,
|
LastFailedAt: f1,
|
||||||
ErrorMsg: m1.ErrorMsg,
|
ErrorMsg: m1.ErrorMsg,
|
||||||
Score: f1.Unix(),
|
Score: f1.Unix(),
|
||||||
|
Queue: m1.Queue,
|
||||||
}
|
}
|
||||||
t2 := &DeadTask{
|
t2 := &DeadTask{
|
||||||
ID: m2.ID,
|
ID: m2.ID,
|
||||||
@ -492,6 +496,7 @@ func TestListDead(t *testing.T) {
|
|||||||
LastFailedAt: f2,
|
LastFailedAt: f2,
|
||||||
ErrorMsg: m2.ErrorMsg,
|
ErrorMsg: m2.ErrorMsg,
|
||||||
Score: f2.Unix(),
|
Score: f2.Unix(),
|
||||||
|
Queue: m2.Queue,
|
||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -115,10 +115,10 @@ func listEnqueued(r *rdb.RDB) {
|
|||||||
fmt.Println("No enqueued tasks")
|
fmt.Println("No enqueued tasks")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cols := []string{"ID", "Type", "Payload"}
|
cols := []string{"ID", "Type", "Payload", "Queue"}
|
||||||
printRows := func(w io.Writer, tmpl string) {
|
printRows := func(w io.Writer, tmpl string) {
|
||||||
for _, t := range tasks {
|
for _, t := range tasks {
|
||||||
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
|
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
printTable(cols, printRows)
|
printTable(cols, printRows)
|
||||||
@ -153,11 +153,11 @@ func listScheduled(r *rdb.RDB) {
|
|||||||
fmt.Println("No scheduled tasks")
|
fmt.Println("No scheduled tasks")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cols := []string{"ID", "Type", "Payload", "Process In"}
|
cols := []string{"ID", "Type", "Payload", "Process In", "Queue"}
|
||||||
printRows := func(w io.Writer, tmpl string) {
|
printRows := func(w io.Writer, tmpl string) {
|
||||||
for _, t := range tasks {
|
for _, t := range tasks {
|
||||||
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
|
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
|
||||||
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn)
|
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn, t.Queue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
printTable(cols, printRows)
|
printTable(cols, printRows)
|
||||||
@ -173,11 +173,11 @@ func listRetry(r *rdb.RDB) {
|
|||||||
fmt.Println("No retry tasks")
|
fmt.Println("No retry tasks")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cols := []string{"ID", "Type", "Payload", "Retry In", "Last Error", "Retried", "Max Retry"}
|
cols := []string{"ID", "Type", "Payload", "Retry In", "Last Error", "Retried", "Max Retry", "Queue"}
|
||||||
printRows := func(w io.Writer, tmpl string) {
|
printRows := func(w io.Writer, tmpl string) {
|
||||||
for _, t := range tasks {
|
for _, t := range tasks {
|
||||||
retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
|
retryIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
|
||||||
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry)
|
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, retryIn, t.ErrorMsg, t.Retried, t.Retry, t.Queue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
printTable(cols, printRows)
|
printTable(cols, printRows)
|
||||||
@ -193,10 +193,10 @@ func listDead(r *rdb.RDB) {
|
|||||||
fmt.Println("No dead tasks")
|
fmt.Println("No dead tasks")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error"}
|
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
|
||||||
printRows := func(w io.Writer, tmpl string) {
|
printRows := func(w io.Writer, tmpl string) {
|
||||||
for _, t := range tasks {
|
for _, t := range tasks {
|
||||||
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg)
|
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
printTable(cols, printRows)
|
printTable(cols, printRows)
|
||||||
|
Loading…
Reference in New Issue
Block a user