2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00
asynq/subscriber_test.go

65 lines
1.5 KiB
Go
Raw Normal View History

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
2020-02-16 15:14:30 +08:00
"sync"
"testing"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
func TestSubscriber(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
tests := []struct {
registeredID string // ID for which cancel func is registered
publishID string // ID to be published
wantCalled bool // whether cancel func should be called
}{
{"abc123", "abc123", true},
{"abc456", "abc123", false},
}
for _, tc := range tests {
2020-02-17 06:42:21 +08:00
var mu sync.Mutex
called := false
fakeCancelFunc := func() {
2020-02-17 06:42:21 +08:00
mu.Lock()
defer mu.Unlock()
called = true
}
cancelations := base.NewCancelations()
cancelations.Add(tc.registeredID, fakeCancelFunc)
2020-03-09 22:11:16 +08:00
subscriber := newSubscriber(testLogger, rdbClient, cancelations)
2020-02-16 15:14:30 +08:00
var wg sync.WaitGroup
subscriber.start(&wg)
if err := rdbClient.PublishCancelation(tc.publishID); err != nil {
subscriber.terminate()
t.Fatalf("could not publish cancelation message: %v", err)
}
// allow for redis to publish message
time.Sleep(time.Second)
2020-02-17 06:42:21 +08:00
mu.Lock()
if called != tc.wantCalled {
if tc.wantCalled {
t.Errorf("fakeCancelFunc was not called, want the function to be called")
} else {
t.Errorf("fakeCancelFunc was called, want the function to not be called")
}
}
2020-02-17 06:42:21 +08:00
mu.Unlock()
subscriber.terminate()
}
}