mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Minor improvements
This commit is contained in:
parent
998e761660
commit
afacc31990
@ -30,7 +30,6 @@ func (c *Client) Process(task *Task, processAt time.Time) error {
|
|||||||
return c.enqueue(msg, processAt)
|
return c.enqueue(msg, processAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueue pushes a given task to the specified queue.
|
|
||||||
func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error {
|
func (c *Client) enqueue(msg *rdb.TaskMessage, processAt time.Time) error {
|
||||||
if time.Now().After(processAt) {
|
if time.Now().After(processAt) {
|
||||||
return c.rdb.Enqueue(msg)
|
return c.rdb.Enqueue(msg)
|
||||||
|
@ -38,6 +38,6 @@ func printStats(s *rdb.Stats) {
|
|||||||
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
|
||||||
fmt.Fprintf(tw, format, "Enqueued", "InProgress", "Scheduled", "Retry", "Dead")
|
fmt.Fprintf(tw, format, "Enqueued", "InProgress", "Scheduled", "Retry", "Dead")
|
||||||
fmt.Fprintf(tw, format, "--------", "----------", "---------", "-----", "----")
|
fmt.Fprintf(tw, format, "--------", "----------", "---------", "-----", "----")
|
||||||
fmt.Fprintf(tw, format, s.Queued, s.InProgress, s.Scheduled, s.Retry, s.Dead)
|
fmt.Fprintf(tw, format, s.Enqueued, s.InProgress, s.Scheduled, s.Retry, s.Dead)
|
||||||
tw.Flush()
|
tw.Flush()
|
||||||
}
|
}
|
||||||
|
@ -60,7 +60,7 @@ type TaskMessage struct {
|
|||||||
|
|
||||||
// Stats represents a state of queues at a certain time.
|
// Stats represents a state of queues at a certain time.
|
||||||
type Stats struct {
|
type Stats struct {
|
||||||
Queued int
|
Enqueued int
|
||||||
InProgress int
|
InProgress int
|
||||||
Scheduled int
|
Scheduled int
|
||||||
Retry int
|
Retry int
|
||||||
@ -272,7 +272,7 @@ func (r *RDB) CurrentStats() (*Stats, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Stats{
|
return &Stats{
|
||||||
Queued: int(qlen.Val()),
|
Enqueued: int(qlen.Val()),
|
||||||
InProgress: int(plen.Val()),
|
InProgress: int(plen.Val()),
|
||||||
Scheduled: int(slen.Val()),
|
Scheduled: int(slen.Val()),
|
||||||
Retry: int(rlen.Val()),
|
Retry: int(rlen.Val()),
|
||||||
|
@ -87,7 +87,7 @@ func (p *processor) exec() {
|
|||||||
// the message can be mutated by the time this function is called.
|
// the message can be mutated by the time this function is called.
|
||||||
defer func(msg rdb.TaskMessage) {
|
defer func(msg rdb.TaskMessage) {
|
||||||
if err := p.rdb.Done(&msg); err != nil {
|
if err := p.rdb.Done(&msg); err != nil {
|
||||||
log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, rdb.InProgress, err)
|
log.Printf("[ERROR] could not mark task %+v as done: %v\n", msg, err)
|
||||||
}
|
}
|
||||||
<-p.sema // release token
|
<-p.sema // release token
|
||||||
}(*msg)
|
}(*msg)
|
||||||
@ -103,7 +103,7 @@ func (p *processor) exec() {
|
|||||||
func (p *processor) restore() {
|
func (p *processor) restore() {
|
||||||
err := p.rdb.RestoreUnfinished()
|
err := p.rdb.RestoreUnfinished()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[ERROR] could not move tasks from %q to %q\n", rdb.InProgress, rdb.DefaultQueue)
|
log.Printf("[ERROR] could not restore unfinished tasks: %v\n", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user