diff --git a/client.go b/client.go index 435769e..1ebc6cc 100644 --- a/client.go +++ b/client.go @@ -30,7 +30,6 @@ func (c *Client) Process(task *Task, processAt time.Time) error { return c.enqueue(msg, processAt) } -// enqueue pushes a given task to the specified queue. func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error { if time.Now().After(processAt) { return c.rdb.Enqueue(msg) diff --git a/cmd/asynqmon/main.go b/cmd/asynqmon/main.go index 2ed2fce..6b50c24 100644 --- a/cmd/asynqmon/main.go +++ b/cmd/asynqmon/main.go @@ -38,6 +38,6 @@ func printStats(s *rdb.Stats) { tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) fmt.Fprintf(tw, format, "Enqueued", "InProgress", "Scheduled", "Retry", "Dead") fmt.Fprintf(tw, format, "--------", "----------", "---------", "-----", "----") - fmt.Fprintf(tw, format, s.Queued, s.InProgress, s.Scheduled, s.Retry, s.Dead) + fmt.Fprintf(tw, format, s.Enqueued, s.InProgress, s.Scheduled, s.Retry, s.Dead) tw.Flush() } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 1c8c848..91397b0 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -60,7 +60,7 @@ type TaskMessage struct { // Stats represents a state of queues at a certain time. type Stats struct { - Queued int + Enqueued int InProgress int Scheduled int Retry int @@ -272,7 +272,7 @@ func (r *RDB) CurrentStats() (*Stats, error) { return nil, err } return &Stats{ - Queued: int(qlen.Val()), + Enqueued: int(qlen.Val()), InProgress: int(plen.Val()), Scheduled: int(slen.Val()), Retry: int(rlen.Val()), diff --git a/processor.go b/processor.go index 327792c..2530163 100644 --- a/processor.go +++ b/processor.go @@ -87,7 +87,7 @@ func (p *processor) exec() { // the message can be mutated by the time this function is called. defer func(msg rdb.TaskMessage) { if err := p.rdb.Done(&msg); err != nil { - log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, rdb.InProgress, err) + log.Printf("[ERROR] could not mark task %+v as done: %v\n", msg, err) } <-p.sema // release token }(*msg) @@ -103,7 +103,7 @@ func (p *processor) exec() { func (p *processor) restore() { err := p.rdb.RestoreUnfinished() if err != nil { - log.Printf("[ERROR] could not move tasks from %q to %q\n", rdb.InProgress, rdb.DefaultQueue) + log.Printf("[ERROR] could not restore unfinished tasks: %v\n", err) } }