2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add test for cancelation pubsub

This commit is contained in:
Ken Hibino 2020-02-22 14:30:24 -08:00
parent 5775a5818d
commit 37c6c73d9b
2 changed files with 49 additions and 0 deletions

View File

@ -53,6 +53,13 @@ var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.Proc
return out
})
// SortStringSliceOpt is a cmp.Option to sort string slice.
var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string {
out := append([]string(nil), in...)
sort.Strings(out)
return out
})
// IgnoreIDOpt is an cmp.Option to ignore ID field in task messages when comparing.
var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID")

View File

@ -7,6 +7,7 @@ package rdb
import (
"encoding/json"
"fmt"
"sync"
"testing"
"time"
@ -975,3 +976,44 @@ func TestClearProcessState(t *testing.T) {
t.Errorf("%q contained %v, want %v", base.AllWorkers, gotWorkerKeys, wantWorkerKeys)
}
}
func TestCancelationPubSub(t *testing.T) {
r := setup(t)
pubsub, err := r.CancelationPubSub()
if err != nil {
t.Fatalf("(*RDB).CancelationPubSub() returned an error: %v", err)
}
cancelCh := pubsub.Channel()
var (
mu sync.Mutex
received []string
)
go func() {
for msg := range cancelCh {
mu.Lock()
received = append(received, msg.Payload)
mu.Unlock()
}
}()
publish := []string{"one", "two", "three"}
for _, msg := range publish {
r.PublishCancelation(msg)
}
// allow for message to reach subscribers.
time.Sleep(time.Second)
pubsub.Close()
mu.Lock()
if diff := cmp.Diff(publish, received, h.SortStringSliceOpt); diff != "" {
t.Errorf("subscriber received %v, want %v; (-want,+got)\n%s", received, publish, diff)
}
mu.Unlock()
}