mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Minor cleanup
This commit is contained in:
parent
d35d345e2d
commit
60132f3208
@ -54,7 +54,7 @@ func (p *poller) start() {
|
|||||||
func (p *poller) exec() {
|
func (p *poller) exec() {
|
||||||
for _, zset := range p.zsets {
|
for _, zset := range p.zsets {
|
||||||
if err := p.rdb.forward(zset); err != nil {
|
if err := p.rdb.forward(zset); err != nil {
|
||||||
log.Printf("[ERROR] could not forward scheduled tasks from %q: %v", zset, err)
|
log.Printf("[ERROR] could not forward scheduled tasks from %q: %v\n", zset, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,10 +72,10 @@ func (p *processor) exec() {
|
|||||||
// timed out, this is a normal behavior.
|
// timed out, this is a normal behavior.
|
||||||
return
|
return
|
||||||
case errDeserializeTask:
|
case errDeserializeTask:
|
||||||
log.Println("[Servere Error] could not parse json encoded message")
|
log.Println("[Error] could not parse json encoded message")
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
log.Printf("[Servere Error] unexpected error while pulling message out of queues: %v\n", err)
|
log.Printf("[Error] unexpected error while pulling message out of queues: %v\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -85,7 +85,7 @@ func (p *processor) exec() {
|
|||||||
go func(task *Task) {
|
go func(task *Task) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := p.rdb.lrem(inProgress, msg); err != nil {
|
if err := p.rdb.lrem(inProgress, msg); err != nil {
|
||||||
log.Printf("[SERVER ERROR] LREM failed: %v\n", err)
|
log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err)
|
||||||
}
|
}
|
||||||
<-p.sema // release token
|
<-p.sema // release token
|
||||||
}()
|
}()
|
||||||
@ -101,6 +101,6 @@ func (p *processor) exec() {
|
|||||||
func (p *processor) restore() {
|
func (p *processor) restore() {
|
||||||
err := p.rdb.moveAll(inProgress, defaultQueue)
|
err := p.rdb.moveAll(inProgress, defaultQueue)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("[SERVER ERROR] could not move tasks from %q to %q\n", inProgress, defaultQueue)
|
log.Printf("[ERROR] could not move tasks from %q to %q\n", inProgress, defaultQueue)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
4
retry.go
4
retry.go
@ -12,7 +12,7 @@ func retryTask(rdb *rdb, msg *taskMessage, err error) {
|
|||||||
if msg.Retried >= msg.Retry {
|
if msg.Retried >= msg.Retry {
|
||||||
fmt.Println("[DEBUG] Retry exhausted!!!")
|
fmt.Println("[DEBUG] Retry exhausted!!!")
|
||||||
if err := rdb.kill(msg); err != nil {
|
if err := rdb.kill(msg); err != nil {
|
||||||
log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err)
|
log.Printf("[ERROR] could not add task %+v to 'dead' set\n", err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -22,7 +22,7 @@ func retryTask(rdb *rdb, msg *taskMessage, err error) {
|
|||||||
msg.ErrorMsg = err.Error()
|
msg.ErrorMsg = err.Error()
|
||||||
if err := rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil {
|
if err := rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil {
|
||||||
// TODO(hibiken): Not sure how to handle this error
|
// TODO(hibiken): Not sure how to handle this error
|
||||||
log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err)
|
log.Printf("[ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user