mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-24 23:02:18 +08:00
Rename to CheckAndEnqueue
This commit is contained in:
parent
af1dcf5044
commit
28bfb6d83a
@ -18,7 +18,7 @@ func setup(t *testing.T) *redis.Client {
|
|||||||
t.Helper()
|
t.Helper()
|
||||||
r := redis.NewClient(&redis.Options{
|
r := redis.NewClient(&redis.Options{
|
||||||
Addr: "localhost:6379",
|
Addr: "localhost:6379",
|
||||||
DB: 2,
|
DB: 15,
|
||||||
})
|
})
|
||||||
// Start each test with a clean slate.
|
// Start each test with a clean slate.
|
||||||
if err := r.FlushDB().Err(); err != nil {
|
if err := r.FlushDB().Err(); err != nil {
|
||||||
|
@ -229,9 +229,9 @@ func (r *RDB) RestoreUnfinished() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckScheduled checks for all scheduled tasks and moves any tasks that
|
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
|
||||||
// have to be processed to the queue.
|
// have to be processed.
|
||||||
func (r *RDB) CheckScheduled() error {
|
func (r *RDB) CheckAndEnqueue() error {
|
||||||
delayed := []string{Scheduled, Retry}
|
delayed := []string{Scheduled, Retry}
|
||||||
for _, zset := range delayed {
|
for _, zset := range delayed {
|
||||||
if err := r.forward(zset); err != nil {
|
if err := r.forward(zset); err != nil {
|
||||||
|
@ -323,7 +323,7 @@ func TestRestoreUnfinished(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCheckScheduled(t *testing.T) {
|
func TestCheckAndEnqueue(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
t1 := randomTask("send_email", "default", nil)
|
t1 := randomTask("send_email", "default", nil)
|
||||||
t2 := randomTask("generate_csv", "default", nil)
|
t2 := randomTask("generate_csv", "default", nil)
|
||||||
@ -335,8 +335,8 @@ func TestCheckScheduled(t *testing.T) {
|
|||||||
initScheduled []*redis.Z // tasks to be processed later
|
initScheduled []*redis.Z // tasks to be processed later
|
||||||
initRetry []*redis.Z // tasks to be retired later
|
initRetry []*redis.Z // tasks to be retired later
|
||||||
wantQueued []*TaskMessage // queue after calling forward
|
wantQueued []*TaskMessage // queue after calling forward
|
||||||
wantScheduled []*TaskMessage // tasks in scheduled queue after calling CheckScheduled
|
wantScheduled []*TaskMessage // tasks in scheduled queue after calling the method
|
||||||
wantRetry []*TaskMessage // tasks in retry queue after calling CheckScheduled
|
wantRetry []*TaskMessage // tasks in retry queue after calling the method
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
initScheduled: []*redis.Z{
|
initScheduled: []*redis.Z{
|
||||||
@ -384,7 +384,7 @@ func TestCheckScheduled(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err := r.CheckScheduled()
|
err := r.CheckAndEnqueue()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err)
|
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err)
|
||||||
continue
|
continue
|
||||||
|
@ -48,7 +48,7 @@ func (p *poller) start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *poller) exec() {
|
func (p *poller) exec() {
|
||||||
if err := p.rdb.CheckScheduled(); err != nil {
|
if err := p.rdb.CheckAndEnqueue(); err != nil {
|
||||||
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
|
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user