mirror of
https://github.com/hibiken/asynq.git
synced 2025-02-23 12:20:19 +08:00
Rename internal ProcessState to ServerState
This commit is contained in:
parent
4f11e52558
commit
aafd8a5b74
@ -31,17 +31,18 @@ func TestHeartbeater(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond)
|
timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond)
|
||||||
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
|
ignoreOpt := cmpopts.IgnoreUnexported(base.ServerInfo{})
|
||||||
|
ignoreFieldOpt := cmpopts.IgnoreFields(base.ServerInfo{}, "ServerID")
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r)
|
h.FlushDB(t, r)
|
||||||
|
|
||||||
state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
|
state := base.NewServerState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
|
||||||
hb := newHeartbeater(testLogger, rdbClient, state, tc.interval)
|
hb := newHeartbeater(testLogger, rdbClient, state, tc.interval)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
hb.start(&wg)
|
hb.start(&wg)
|
||||||
|
|
||||||
want := &base.ProcessInfo{
|
want := &base.ServerInfo{
|
||||||
Host: tc.host,
|
Host: tc.host,
|
||||||
PID: tc.pid,
|
PID: tc.pid,
|
||||||
Queues: tc.queues,
|
Queues: tc.queues,
|
||||||
@ -66,7 +67,7 @@ func TestHeartbeater(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt); diff != "" {
|
if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
|
||||||
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff)
|
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff)
|
||||||
hb.terminate()
|
hb.terminate()
|
||||||
continue
|
continue
|
||||||
@ -92,7 +93,7 @@ func TestHeartbeater(t *testing.T) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt); diff != "" {
|
if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
|
||||||
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff)
|
t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff)
|
||||||
hb.terminate()
|
hb.terminate()
|
||||||
continue
|
continue
|
||||||
|
@ -41,9 +41,9 @@ var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) [
|
|||||||
return out
|
return out
|
||||||
})
|
})
|
||||||
|
|
||||||
// SortProcessInfoOpt is a cmp.Option to sort base.ProcessInfo for comparing slice of process info.
|
// SortServerInfoOpt is a cmp.Option to sort base.ServerInfo for comparing slice of process info.
|
||||||
var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.ProcessInfo) []*base.ProcessInfo {
|
var SortServerInfoOpt = cmp.Transformer("SortServerInfo", func(in []*base.ServerInfo) []*base.ServerInfo {
|
||||||
out := append([]*base.ProcessInfo(nil), in...) // Copy input to avoid mutating it
|
out := append([]*base.ServerInfo(nil), in...) // Copy input to avoid mutating it
|
||||||
sort.Slice(out, func(i, j int) bool {
|
sort.Slice(out, func(i, j int) bool {
|
||||||
if out[i].Host != out[j].Host {
|
if out[i].Host != out[j].Host {
|
||||||
return out[i].Host < out[j].Host
|
return out[i].Host < out[j].Host
|
||||||
|
@ -20,10 +20,10 @@ const DefaultQueueName = "default"
|
|||||||
|
|
||||||
// Redis keys
|
// Redis keys
|
||||||
const (
|
const (
|
||||||
AllProcesses = "asynq:ps" // ZSET
|
AllServers = "asynq:servers" // ZSET
|
||||||
psPrefix = "asynq:ps:" // STRING - asynq:ps:<host>:<pid>
|
serversPrefix = "asynq:servers:" // STRING - asynq:ps:<host>:<pid>:<serverid>
|
||||||
AllWorkers = "asynq:workers" // ZSET
|
AllWorkers = "asynq:workers" // ZSET
|
||||||
workersPrefix = "asynq:workers:" // HASH - asynq:workers:<host:<pid>
|
workersPrefix = "asynq:workers:" // HASH - asynq:workers:<host:<pid>:<serverid>
|
||||||
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
|
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
|
||||||
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
|
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
|
||||||
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
|
||||||
@ -51,14 +51,14 @@ func FailureKey(t time.Time) string {
|
|||||||
return failurePrefix + t.UTC().Format("2006-01-02")
|
return failurePrefix + t.UTC().Format("2006-01-02")
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessInfoKey returns a redis key for process info.
|
// ServerInfoKey returns a redis key for process info.
|
||||||
func ProcessInfoKey(hostname string, pid int) string {
|
func ServerInfoKey(hostname string, pid int, sid string) string {
|
||||||
return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid)
|
return fmt.Sprintf("%s%s:%d:%s", serversPrefix, hostname, pid, sid)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WorkersKey returns a redis key for the workers given hostname and pid.
|
// WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
|
||||||
func WorkersKey(hostname string, pid int) string {
|
func WorkersKey(hostname string, pid int, sid string) string {
|
||||||
return fmt.Sprintf("%s%s:%d", workersPrefix, hostname, pid)
|
return fmt.Sprintf("%s%s:%d:%s", workersPrefix, hostname, pid, sid)
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskMessage is the internal representation of a task with additional metadata fields.
|
// TaskMessage is the internal representation of a task with additional metadata fields.
|
||||||
@ -109,6 +109,7 @@ type TaskMessage struct {
|
|||||||
// ServerStates are safe for concurrent use by multiple goroutines.
|
// ServerStates are safe for concurrent use by multiple goroutines.
|
||||||
type ServerState struct {
|
type ServerState struct {
|
||||||
mu sync.Mutex // guards all data fields
|
mu sync.Mutex // guards all data fields
|
||||||
|
id xid.ID
|
||||||
concurrency int
|
concurrency int
|
||||||
queues map[string]int
|
queues map[string]int
|
||||||
strictPriority bool
|
strictPriority bool
|
||||||
@ -160,6 +161,7 @@ func NewServerState(host string, pid, concurrency int, queues map[string]int, st
|
|||||||
return &ServerState{
|
return &ServerState{
|
||||||
host: host,
|
host: host,
|
||||||
pid: pid,
|
pid: pid,
|
||||||
|
id: xid.New(),
|
||||||
concurrency: concurrency,
|
concurrency: concurrency,
|
||||||
queues: cloneQueueConfig(queues),
|
queues: cloneQueueConfig(queues),
|
||||||
strictPriority: strict,
|
strictPriority: strict,
|
||||||
@ -175,7 +177,7 @@ func (ss *ServerState) SetStatus(status ServerStatus) {
|
|||||||
ss.status = status
|
ss.status = status
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStatus returns the status of server.
|
// Status returns the status of server.
|
||||||
func (ss *ServerState) Status() ServerStatus {
|
func (ss *ServerState) Status() ServerStatus {
|
||||||
ss.mu.Lock()
|
ss.mu.Lock()
|
||||||
defer ss.mu.Unlock()
|
defer ss.mu.Unlock()
|
||||||
@ -203,13 +205,14 @@ func (ss *ServerState) DeleteWorkerStats(msg *TaskMessage) {
|
|||||||
delete(ss.workers, msg.ID.String())
|
delete(ss.workers, msg.ID.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get returns current state of process as a ProcessInfo.
|
// GetInfo returns current state of server as a ServerInfo.
|
||||||
func (ss *ServerState) Get() *ProcessInfo {
|
func (ss *ServerState) GetInfo() *ServerInfo {
|
||||||
ss.mu.Lock()
|
ss.mu.Lock()
|
||||||
defer ss.mu.Unlock()
|
defer ss.mu.Unlock()
|
||||||
return &ProcessInfo{
|
return &ServerInfo{
|
||||||
Host: ss.host,
|
Host: ss.host,
|
||||||
PID: ss.pid,
|
PID: ss.pid,
|
||||||
|
ServerID: ss.id.String(),
|
||||||
Concurrency: ss.concurrency,
|
Concurrency: ss.concurrency,
|
||||||
Queues: cloneQueueConfig(ss.queues),
|
Queues: cloneQueueConfig(ss.queues),
|
||||||
StrictPriority: ss.strictPriority,
|
StrictPriority: ss.strictPriority,
|
||||||
@ -254,10 +257,11 @@ func clonePayload(payload map[string]interface{}) map[string]interface{} {
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessInfo holds information about a running background worker process.
|
// ServerInfo holds information about a running server.
|
||||||
type ProcessInfo struct {
|
type ServerInfo struct {
|
||||||
Host string
|
Host string
|
||||||
PID int
|
PID int
|
||||||
|
ServerID string
|
||||||
Concurrency int
|
Concurrency int
|
||||||
Queues map[string]int
|
Queues map[string]int
|
||||||
StrictPriority bool
|
StrictPriority bool
|
||||||
|
@ -12,6 +12,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
|
"github.com/google/go-cmp/cmp/cmpopts"
|
||||||
"github.com/rs/xid"
|
"github.com/rs/xid"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -67,20 +68,21 @@ func TestFailureKey(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestProcessInfoKey(t *testing.T) {
|
func TestServerInfoKey(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
hostname string
|
hostname string
|
||||||
pid int
|
pid int
|
||||||
|
sid string
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"localhost", 9876, "asynq:ps:localhost:9876"},
|
{"localhost", 9876, "server123", "asynq:servers:localhost:9876:server123"},
|
||||||
{"127.0.0.1", 1234, "asynq:ps:127.0.0.1:1234"},
|
{"127.0.0.1", 1234, "server987", "asynq:servers:127.0.0.1:1234:server987"},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
got := ProcessInfoKey(tc.hostname, tc.pid)
|
got := ServerInfoKey(tc.hostname, tc.pid, tc.sid)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("ProcessInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want)
|
t.Errorf("ServerInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -89,14 +91,15 @@ func TestWorkersKey(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
hostname string
|
hostname string
|
||||||
pid int
|
pid int
|
||||||
|
sid string
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"localhost", 9876, "asynq:workers:localhost:9876"},
|
{"localhost", 9876, "server1", "asynq:workers:localhost:9876:server1"},
|
||||||
{"127.0.0.1", 1234, "asynq:workers:127.0.0.1:1234"},
|
{"127.0.0.1", 1234, "server2", "asynq:workers:127.0.0.1:1234:server2"},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
got := WorkersKey(tc.hostname, tc.pid)
|
got := WorkersKey(tc.hostname, tc.pid, tc.sid)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want)
|
t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want)
|
||||||
}
|
}
|
||||||
@ -106,7 +109,7 @@ func TestWorkersKey(t *testing.T) {
|
|||||||
// Test for process state being accessed by multiple goroutines.
|
// Test for process state being accessed by multiple goroutines.
|
||||||
// Run with -race flag to check for data race.
|
// Run with -race flag to check for data race.
|
||||||
func TestProcessStateConcurrentAccess(t *testing.T) {
|
func TestProcessStateConcurrentAccess(t *testing.T) {
|
||||||
ps := NewProcessState("127.0.0.1", 1234, 10, map[string]int{"default": 1}, false)
|
ss := NewServerState("127.0.0.1", 1234, 10, map[string]int{"default": 1}, false)
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
msgs := []*TaskMessage{
|
msgs := []*TaskMessage{
|
||||||
@ -119,18 +122,18 @@ func TestProcessStateConcurrentAccess(t *testing.T) {
|
|||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
ps.SetStarted(started)
|
ss.SetStarted(started)
|
||||||
ps.SetStatus(StatusRunning)
|
ss.SetStatus(StatusRunning)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Simulate processor starting worker goroutines.
|
// Simulate processor starting worker goroutines.
|
||||||
for _, msg := range msgs {
|
for _, msg := range msgs {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
ps.AddWorkerStats(msg, time.Now())
|
ss.AddWorkerStats(msg, time.Now())
|
||||||
go func(msg *TaskMessage) {
|
go func(msg *TaskMessage) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
|
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
|
||||||
ps.DeleteWorkerStats(msg)
|
ss.DeleteWorkerStats(msg)
|
||||||
}(msg)
|
}(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,15 +142,15 @@ func TestProcessStateConcurrentAccess(t *testing.T) {
|
|||||||
go func() {
|
go func() {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
for i := 0; i < 5; i++ {
|
for i := 0; i < 5; i++ {
|
||||||
ps.Get()
|
ss.GetInfo()
|
||||||
ps.GetWorkers()
|
ss.GetWorkers()
|
||||||
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
want := &ProcessInfo{
|
want := &ServerInfo{
|
||||||
Host: "127.0.0.1",
|
Host: "127.0.0.1",
|
||||||
PID: 1234,
|
PID: 1234,
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
@ -158,9 +161,9 @@ func TestProcessStateConcurrentAccess(t *testing.T) {
|
|||||||
ActiveWorkerCount: 0,
|
ActiveWorkerCount: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
got := ps.Get()
|
got := ss.GetInfo()
|
||||||
if diff := cmp.Diff(want, got); diff != "" {
|
if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(ServerInfo{}, "ServerID")); diff != "" {
|
||||||
t.Errorf("(*ProcessState).Get() = %+v, want %+v; (-want,+got)\n%s",
|
t.Errorf("(*ServerState).GetInfo() = %+v, want %+v; (-want,+got)\n%s",
|
||||||
got, want, diff)
|
got, want, diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -758,6 +758,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Rename this to listServerInfo.
|
||||||
// Note: Script also removes stale keys.
|
// Note: Script also removes stale keys.
|
||||||
var listProcessesCmd = redis.NewScript(`
|
var listProcessesCmd = redis.NewScript(`
|
||||||
local res = {}
|
local res = {}
|
||||||
@ -773,9 +774,9 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
|
|||||||
return res`)
|
return res`)
|
||||||
|
|
||||||
// ListProcesses returns the list of process statuses.
|
// ListProcesses returns the list of process statuses.
|
||||||
func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) {
|
func (r *RDB) ListProcesses() ([]*base.ServerInfo, error) {
|
||||||
res, err := listProcessesCmd.Run(r.client,
|
res, err := listProcessesCmd.Run(r.client,
|
||||||
[]string{base.AllProcesses}, time.Now().UTC().Unix()).Result()
|
[]string{base.AllServers}, time.Now().UTC().Unix()).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -783,9 +784,9 @@ func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var processes []*base.ProcessInfo
|
var processes []*base.ServerInfo
|
||||||
for _, s := range data {
|
for _, s := range data {
|
||||||
var ps base.ProcessInfo
|
var ps base.ServerInfo
|
||||||
err := json.Unmarshal([]byte(s), &ps)
|
err := json.Unmarshal([]byte(s), &ps)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue // skip bad data
|
continue // skip bad data
|
||||||
|
@ -2055,10 +2055,10 @@ func TestListProcesses(t *testing.T) {
|
|||||||
r := setup(t)
|
r := setup(t)
|
||||||
|
|
||||||
started1 := time.Now().Add(-time.Hour)
|
started1 := time.Now().Add(-time.Hour)
|
||||||
ps1 := base.NewProcessState("do.droplet1", 1234, 10, map[string]int{"default": 1}, false)
|
ss1 := base.NewServerState("do.droplet1", 1234, 10, map[string]int{"default": 1}, false)
|
||||||
ps1.SetStarted(started1)
|
ss1.SetStarted(started1)
|
||||||
ps1.SetStatus(base.StatusRunning)
|
ss1.SetStatus(base.StatusRunning)
|
||||||
info1 := &base.ProcessInfo{
|
info1 := &base.ServerInfo{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
Queues: map[string]int{"default": 1},
|
Queues: map[string]int{"default": 1},
|
||||||
Host: "do.droplet1",
|
Host: "do.droplet1",
|
||||||
@ -2069,11 +2069,11 @@ func TestListProcesses(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
started2 := time.Now().Add(-2 * time.Hour)
|
started2 := time.Now().Add(-2 * time.Hour)
|
||||||
ps2 := base.NewProcessState("do.droplet2", 9876, 20, map[string]int{"email": 1}, false)
|
ss2 := base.NewServerState("do.droplet2", 9876, 20, map[string]int{"email": 1}, false)
|
||||||
ps2.SetStarted(started2)
|
ss2.SetStarted(started2)
|
||||||
ps2.SetStatus(base.StatusStopped)
|
ss2.SetStatus(base.StatusStopped)
|
||||||
ps2.AddWorkerStats(h.NewTaskMessage("send_email", nil), time.Now())
|
ss2.AddWorkerStats(h.NewTaskMessage("send_email", nil), time.Now())
|
||||||
info2 := &base.ProcessInfo{
|
info2 := &base.ServerInfo{
|
||||||
Concurrency: 20,
|
Concurrency: 20,
|
||||||
Queues: map[string]int{"email": 1},
|
Queues: map[string]int{"email": 1},
|
||||||
Host: "do.droplet2",
|
Host: "do.droplet2",
|
||||||
@ -2084,30 +2084,31 @@ func TestListProcesses(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
processes []*base.ProcessState
|
serverStates []*base.ServerState
|
||||||
want []*base.ProcessInfo
|
want []*base.ServerInfo
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
processes: []*base.ProcessState{},
|
serverStates: []*base.ServerState{},
|
||||||
want: []*base.ProcessInfo{},
|
want: []*base.ServerInfo{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
processes: []*base.ProcessState{ps1},
|
serverStates: []*base.ServerState{ss1},
|
||||||
want: []*base.ProcessInfo{info1},
|
want: []*base.ServerInfo{info1},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
processes: []*base.ProcessState{ps1, ps2},
|
serverStates: []*base.ServerState{ss1, ss2},
|
||||||
want: []*base.ProcessInfo{info1, info2},
|
want: []*base.ServerInfo{info1, info2},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{})
|
ignoreOpt := cmpopts.IgnoreUnexported(base.ServerInfo{})
|
||||||
|
ignoreFieldOpt := cmpopts.IgnoreFields(base.ServerInfo{}, "ServerID")
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
for _, ps := range tc.processes {
|
for _, ss := range tc.serverStates {
|
||||||
if err := r.WriteProcessState(ps, 5*time.Second); err != nil {
|
if err := r.WriteServerState(ss, 5*time.Second); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2116,9 +2117,9 @@ func TestListProcesses(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("r.ListProcesses returned an error: %v", err)
|
t.Errorf("r.ListProcesses returned an error: %v", err)
|
||||||
}
|
}
|
||||||
if diff := cmp.Diff(tc.want, got, h.SortProcessInfoOpt, ignoreOpt); diff != "" {
|
if diff := cmp.Diff(tc.want, got, h.SortServerInfoOpt, ignoreOpt, ignoreFieldOpt); diff != "" {
|
||||||
t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s",
|
t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s",
|
||||||
got, tc.processes, diff)
|
got, tc.serverStates, diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2164,13 +2165,13 @@ func TestListWorkers(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
ps := base.NewProcessState(host, pid, 10, map[string]int{"default": 1}, false)
|
ss := base.NewServerState(host, pid, 10, map[string]int{"default": 1}, false)
|
||||||
|
|
||||||
for _, w := range tc.workers {
|
for _, w := range tc.workers {
|
||||||
ps.AddWorkerStats(w.msg, w.started)
|
ss.AddWorkerStats(w.msg, w.started)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := r.WriteProcessState(ps, time.Minute)
|
err := r.WriteServerState(ss, time.Minute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("could not write process state to redis: %v", err)
|
t.Errorf("could not write process state to redis: %v", err)
|
||||||
continue
|
continue
|
||||||
|
@ -463,9 +463,9 @@ func (r *RDB) forwardSingle(src, dst string) error {
|
|||||||
[]string{src, dst}, now).Err()
|
[]string{src, dst}, now).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:ps:<host:pid>
|
// KEYS[1] -> asynq:servers:<host:pid:sid>
|
||||||
// KEYS[2] -> asynq:ps
|
// KEYS[2] -> asynq:servers
|
||||||
// KEYS[3] -> asynq:workers<host:pid>
|
// KEYS[3] -> asynq:workers<host:pid:sid>
|
||||||
// keys[4] -> asynq:workers
|
// keys[4] -> asynq:workers
|
||||||
// ARGV[1] -> expiration time
|
// ARGV[1] -> expiration time
|
||||||
// ARGV[2] -> TTL in seconds
|
// ARGV[2] -> TTL in seconds
|
||||||
@ -486,7 +486,7 @@ return redis.status_reply("OK")`)
|
|||||||
|
|
||||||
// WriteServerState writes server state data to redis with expiration set to the value ttl.
|
// WriteServerState writes server state data to redis with expiration set to the value ttl.
|
||||||
func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error {
|
func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error {
|
||||||
info := ss.Get()
|
info := ss.GetInfo()
|
||||||
bytes, err := json.Marshal(info)
|
bytes, err := json.Marshal(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -502,17 +502,17 @@ func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error {
|
|||||||
}
|
}
|
||||||
args = append(args, w.ID.String(), bytes)
|
args = append(args, w.ID.String(), bytes)
|
||||||
}
|
}
|
||||||
pkey := base.ProcessInfoKey(info.Host, info.PID)
|
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
||||||
wkey := base.WorkersKey(info.Host, info.PID)
|
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
||||||
return writeProcessInfoCmd.Run(r.client,
|
return writeProcessInfoCmd.Run(r.client,
|
||||||
[]string{pkey, base.AllProcesses, wkey, base.AllWorkers},
|
[]string{skey, base.AllServers, wkey, base.AllWorkers},
|
||||||
args...).Err()
|
args...).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:ps
|
// KEYS[1] -> asynq:servers
|
||||||
// KEYS[2] -> asynq:ps:<host:pid>
|
// KEYS[2] -> asynq:servers:<host:pid:sid>
|
||||||
// KEYS[3] -> asynq:workers
|
// KEYS[3] -> asynq:workers
|
||||||
// KEYS[4] -> asynq:workers<host:pid>
|
// KEYS[4] -> asynq:workers<host:pid:sid>
|
||||||
var clearProcessInfoCmd = redis.NewScript(`
|
var clearProcessInfoCmd = redis.NewScript(`
|
||||||
redis.call("ZREM", KEYS[1], KEYS[2])
|
redis.call("ZREM", KEYS[1], KEYS[2])
|
||||||
redis.call("DEL", KEYS[2])
|
redis.call("DEL", KEYS[2])
|
||||||
@ -522,12 +522,12 @@ return redis.status_reply("OK")`)
|
|||||||
|
|
||||||
// ClearServerState deletes server state data from redis.
|
// ClearServerState deletes server state data from redis.
|
||||||
func (r *RDB) ClearServerState(ss *base.ServerState) error {
|
func (r *RDB) ClearServerState(ss *base.ServerState) error {
|
||||||
info := ss.Get()
|
info := ss.GetInfo()
|
||||||
host, pid := info.Host, info.PID
|
host, pid, id := info.Host, info.PID, info.ServerID
|
||||||
pkey := base.ProcessInfoKey(host, pid)
|
skey := base.ServerInfoKey(host, pid, id)
|
||||||
wkey := base.WorkersKey(host, pid)
|
wkey := base.WorkersKey(host, pid, id)
|
||||||
return clearProcessInfoCmd.Run(r.client,
|
return clearProcessInfoCmd.Run(r.client,
|
||||||
[]string{base.AllProcesses, pkey, base.AllWorkers, wkey}).Err()
|
[]string{base.AllServers, skey, base.AllWorkers, wkey}).Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CancelationPubSub returns a pubsub for cancelation messages.
|
// CancelationPubSub returns a pubsub for cancelation messages.
|
||||||
|
@ -862,60 +862,61 @@ func TestCheckAndEnqueue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteProcessState(t *testing.T) {
|
func TestWriteServerState(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
host, pid := "localhost", 98765
|
|
||||||
queues := map[string]int{"default": 2, "email": 5, "low": 1}
|
queues := map[string]int{"default": 2, "email": 5, "low": 1}
|
||||||
|
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
ps := base.NewProcessState(host, pid, 10, queues, false)
|
ss := base.NewServerState("localhost", 4242, 10, queues, false)
|
||||||
ps.SetStarted(started)
|
ss.SetStarted(started)
|
||||||
ps.SetStatus(base.StatusRunning)
|
ss.SetStatus(base.StatusRunning)
|
||||||
ttl := 5 * time.Second
|
ttl := 5 * time.Second
|
||||||
|
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
err := r.WriteProcessState(ps, ttl)
|
err := r.WriteServerState(ss, ttl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("r.WriteProcessState returned an error: %v", err)
|
t.Errorf("r.WriteServerState returned an error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check ProcessInfo was written correctly
|
// Check ServerInfo was written correctly
|
||||||
pkey := base.ProcessInfoKey(host, pid)
|
info := ss.GetInfo()
|
||||||
data := r.client.Get(pkey).Val()
|
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
||||||
var got base.ProcessInfo
|
data := r.client.Get(skey).Val()
|
||||||
|
var got base.ServerInfo
|
||||||
err = json.Unmarshal([]byte(data), &got)
|
err = json.Unmarshal([]byte(data), &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not decode json: %v", err)
|
t.Fatalf("could not decode json: %v", err)
|
||||||
}
|
}
|
||||||
want := base.ProcessInfo{
|
want := base.ServerInfo{
|
||||||
Host: "localhost",
|
Host: info.Host,
|
||||||
PID: 98765,
|
PID: info.PID,
|
||||||
Concurrency: 10,
|
Concurrency: info.Concurrency,
|
||||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||||
StrictPriority: false,
|
StrictPriority: false,
|
||||||
Status: "running",
|
Status: "running",
|
||||||
Started: started,
|
Started: started,
|
||||||
ActiveWorkerCount: 0,
|
ActiveWorkerCount: 0,
|
||||||
}
|
}
|
||||||
if diff := cmp.Diff(want, got); diff != "" {
|
ignoreOpt := cmpopts.IgnoreFields(base.ServerInfo{}, "ServerID")
|
||||||
t.Errorf("persisted ProcessInfo was %v, want %v; (-want,+got)\n%s",
|
if diff := cmp.Diff(want, got, ignoreOpt); diff != "" {
|
||||||
|
t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s",
|
||||||
got, want, diff)
|
got, want, diff)
|
||||||
}
|
}
|
||||||
// Check ProcessInfo TTL was set correctly
|
// Check ServerInfo TTL was set correctly
|
||||||
gotTTL := r.client.TTL(pkey).Val()
|
gotTTL := r.client.TTL(skey).Val()
|
||||||
if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
||||||
t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl)
|
t.Errorf("TTL of %q was %v, want %v", skey, gotTTL, ttl)
|
||||||
}
|
}
|
||||||
// Check ProcessInfo key was added to the set correctly
|
// Check ServerInfo key was added to the set correctly
|
||||||
gotProcesses := r.client.ZRange(base.AllProcesses, 0, -1).Val()
|
gotProcesses := r.client.ZRange(base.AllServers, 0, -1).Val()
|
||||||
wantProcesses := []string{pkey}
|
wantProcesses := []string{skey}
|
||||||
if diff := cmp.Diff(wantProcesses, gotProcesses); diff != "" {
|
if diff := cmp.Diff(wantProcesses, gotProcesses); diff != "" {
|
||||||
t.Errorf("%q contained %v, want %v", base.AllProcesses, gotProcesses, wantProcesses)
|
t.Errorf("%q contained %v, want %v", base.AllServers, gotProcesses, wantProcesses)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check WorkersInfo was written correctly
|
// Check WorkersInfo was written correctly
|
||||||
wkey := base.WorkersKey(host, pid)
|
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
||||||
workerExist := r.client.Exists(wkey).Val()
|
workerExist := r.client.Exists(wkey).Val()
|
||||||
if workerExist != 0 {
|
if workerExist != 0 {
|
||||||
t.Errorf("%q key exists", wkey)
|
t.Errorf("%q key exists", wkey)
|
||||||
@ -928,9 +929,8 @@ func TestWriteProcessState(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWriteProcessStateWithWorkers(t *testing.T) {
|
func TestWriteServerStateWithWorkers(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
host, pid := "localhost", 98765
|
|
||||||
queues := map[string]int{"default": 2, "email": 5, "low": 1}
|
queues := map[string]int{"default": 2, "email": 5, "low": 1}
|
||||||
concurrency := 10
|
concurrency := 10
|
||||||
|
|
||||||
@ -939,31 +939,33 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
|
|||||||
w2Started := time.Now().Add(-time.Second)
|
w2Started := time.Now().Add(-time.Second)
|
||||||
msg1 := h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"})
|
msg1 := h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"})
|
||||||
msg2 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"})
|
msg2 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"})
|
||||||
ps := base.NewProcessState(host, pid, concurrency, queues, false)
|
ss := base.NewServerState("127.0.01", 4242, concurrency, queues, false)
|
||||||
ps.SetStarted(started)
|
ss.SetStarted(started)
|
||||||
ps.SetStatus(base.StatusRunning)
|
ss.SetStatus(base.StatusRunning)
|
||||||
ps.AddWorkerStats(msg1, w1Started)
|
ss.AddWorkerStats(msg1, w1Started)
|
||||||
ps.AddWorkerStats(msg2, w2Started)
|
ss.AddWorkerStats(msg2, w2Started)
|
||||||
ttl := 5 * time.Second
|
ttl := 5 * time.Second
|
||||||
|
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
err := r.WriteProcessState(ps, ttl)
|
err := r.WriteServerState(ss, ttl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("r.WriteProcessState returned an error: %v", err)
|
t.Errorf("r.WriteServerState returned an error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check ProcessInfo was written correctly
|
// Check ServerInfo was written correctly
|
||||||
pkey := base.ProcessInfoKey(host, pid)
|
info := ss.GetInfo()
|
||||||
data := r.client.Get(pkey).Val()
|
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
||||||
var got base.ProcessInfo
|
data := r.client.Get(skey).Val()
|
||||||
|
var got base.ServerInfo
|
||||||
err = json.Unmarshal([]byte(data), &got)
|
err = json.Unmarshal([]byte(data), &got)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("could not decode json: %v", err)
|
t.Fatalf("could not decode json: %v", err)
|
||||||
}
|
}
|
||||||
want := base.ProcessInfo{
|
want := base.ServerInfo{
|
||||||
Host: host,
|
Host: info.Host,
|
||||||
PID: pid,
|
PID: info.PID,
|
||||||
|
ServerID: info.ServerID,
|
||||||
Concurrency: concurrency,
|
Concurrency: concurrency,
|
||||||
Queues: queues,
|
Queues: queues,
|
||||||
StrictPriority: false,
|
StrictPriority: false,
|
||||||
@ -972,23 +974,23 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
|
|||||||
ActiveWorkerCount: 2,
|
ActiveWorkerCount: 2,
|
||||||
}
|
}
|
||||||
if diff := cmp.Diff(want, got); diff != "" {
|
if diff := cmp.Diff(want, got); diff != "" {
|
||||||
t.Errorf("persisted ProcessInfo was %v, want %v; (-want,+got)\n%s",
|
t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s",
|
||||||
got, want, diff)
|
got, want, diff)
|
||||||
}
|
}
|
||||||
// Check ProcessInfo TTL was set correctly
|
// Check ServerInfo TTL was set correctly
|
||||||
gotTTL := r.client.TTL(pkey).Val()
|
gotTTL := r.client.TTL(skey).Val()
|
||||||
if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
||||||
t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl)
|
t.Errorf("TTL of %q was %v, want %v", skey, gotTTL, ttl)
|
||||||
}
|
}
|
||||||
// Check ProcessInfo key was added to the set correctly
|
// Check ServerInfo key was added to the set correctly
|
||||||
gotProcesses := r.client.ZRange(base.AllProcesses, 0, -1).Val()
|
gotProcesses := r.client.ZRange(base.AllServers, 0, -1).Val()
|
||||||
wantProcesses := []string{pkey}
|
wantProcesses := []string{skey}
|
||||||
if diff := cmp.Diff(wantProcesses, gotProcesses); diff != "" {
|
if diff := cmp.Diff(wantProcesses, gotProcesses); diff != "" {
|
||||||
t.Errorf("%q contained %v, want %v", base.AllProcesses, gotProcesses, wantProcesses)
|
t.Errorf("%q contained %v, want %v", base.AllServers, gotProcesses, wantProcesses)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check WorkersInfo was written correctly
|
// Check WorkersInfo was written correctly
|
||||||
wkey := base.WorkersKey(host, pid)
|
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
||||||
wdata := r.client.HGetAll(wkey).Val()
|
wdata := r.client.HGetAll(wkey).Val()
|
||||||
if len(wdata) != 2 {
|
if len(wdata) != 2 {
|
||||||
t.Fatalf("HGETALL %q returned a hash of size %d, want 2", wkey, len(wdata))
|
t.Fatalf("HGETALL %q returned a hash of size %d, want 2", wkey, len(wdata))
|
||||||
@ -1003,8 +1005,8 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
wantWorkers := map[string]*base.WorkerInfo{
|
wantWorkers := map[string]*base.WorkerInfo{
|
||||||
msg1.ID.String(): {
|
msg1.ID.String(): {
|
||||||
Host: host,
|
Host: info.Host,
|
||||||
PID: pid,
|
PID: info.PID,
|
||||||
ID: msg1.ID,
|
ID: msg1.ID,
|
||||||
Type: msg1.Type,
|
Type: msg1.Type,
|
||||||
Queue: msg1.Queue,
|
Queue: msg1.Queue,
|
||||||
@ -1012,8 +1014,8 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
|
|||||||
Started: w1Started,
|
Started: w1Started,
|
||||||
},
|
},
|
||||||
msg2.ID.String(): {
|
msg2.ID.String(): {
|
||||||
Host: host,
|
Host: info.Host,
|
||||||
PID: pid,
|
PID: info.PID,
|
||||||
ID: msg2.ID,
|
ID: msg2.ID,
|
||||||
Type: msg2.Type,
|
Type: msg2.Type,
|
||||||
Queue: msg2.Queue,
|
Queue: msg2.Queue,
|
||||||
@ -1039,27 +1041,28 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestClearProcessState(t *testing.T) {
|
func TestClearServerState(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
host, pid := "127.0.0.1", 1234
|
ss := base.NewServerState("127.0.01", 4242, 10, map[string]int{"default": 1}, false)
|
||||||
|
info := ss.GetInfo()
|
||||||
|
|
||||||
h.FlushDB(t, r.client)
|
h.FlushDB(t, r.client)
|
||||||
|
|
||||||
pkey := base.ProcessInfoKey(host, pid)
|
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
||||||
wkey := base.WorkersKey(host, pid)
|
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
||||||
otherPKey := base.ProcessInfoKey("otherhost", 12345)
|
otherSKey := base.ServerInfoKey("otherhost", 12345, "server98")
|
||||||
otherWKey := base.WorkersKey("otherhost", 12345)
|
otherWKey := base.WorkersKey("otherhost", 12345, "server98")
|
||||||
// Populate the keys.
|
// Populate the keys.
|
||||||
if err := r.client.Set(pkey, "process-info", 0).Err(); err != nil {
|
if err := r.client.Set(skey, "process-info", 0).Err(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := r.client.HSet(wkey, "worker-key", "worker-info").Err(); err != nil {
|
if err := r.client.HSet(wkey, "worker-key", "worker-info").Err(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := r.client.ZAdd(base.AllProcesses, &redis.Z{Member: pkey}).Err(); err != nil {
|
if err := r.client.ZAdd(base.AllServers, &redis.Z{Member: skey}).Err(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := r.client.ZAdd(base.AllProcesses, &redis.Z{Member: otherPKey}).Err(); err != nil {
|
if err := r.client.ZAdd(base.AllServers, &redis.Z{Member: otherSKey}).Err(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Member: wkey}).Err(); err != nil {
|
if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Member: wkey}).Err(); err != nil {
|
||||||
@ -1069,24 +1072,22 @@ func TestClearProcessState(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ps := base.NewProcessState(host, pid, 10, map[string]int{"default": 1}, false)
|
err := r.ClearServerState(ss)
|
||||||
|
|
||||||
err := r.ClearProcessState(ps)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("(*RDB).ClearProcessState failed: %v", err)
|
t.Fatalf("(*RDB).ClearServerState failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check all keys are cleared
|
// Check all keys are cleared
|
||||||
if r.client.Exists(pkey).Val() != 0 {
|
if r.client.Exists(skey).Val() != 0 {
|
||||||
t.Errorf("Redis key %q exists", pkey)
|
t.Errorf("Redis key %q exists", skey)
|
||||||
}
|
}
|
||||||
if r.client.Exists(wkey).Val() != 0 {
|
if r.client.Exists(wkey).Val() != 0 {
|
||||||
t.Errorf("Redis key %q exists", wkey)
|
t.Errorf("Redis key %q exists", wkey)
|
||||||
}
|
}
|
||||||
gotProcessKeys := r.client.ZRange(base.AllProcesses, 0, -1).Val()
|
gotProcessKeys := r.client.ZRange(base.AllServers, 0, -1).Val()
|
||||||
wantProcessKeys := []string{otherPKey}
|
wantProcessKeys := []string{otherSKey}
|
||||||
if diff := cmp.Diff(wantProcessKeys, gotProcessKeys); diff != "" {
|
if diff := cmp.Diff(wantProcessKeys, gotProcessKeys); diff != "" {
|
||||||
t.Errorf("%q contained %v, want %v", base.AllProcesses, gotProcessKeys, wantProcessKeys)
|
t.Errorf("%q contained %v, want %v", base.AllServers, gotProcessKeys, wantProcessKeys)
|
||||||
}
|
}
|
||||||
gotWorkerKeys := r.client.ZRange(base.AllWorkers, 0, -1).Val()
|
gotWorkerKeys := r.client.ZRange(base.AllWorkers, 0, -1).Val()
|
||||||
wantWorkerKeys := []string{otherWKey}
|
wantWorkerKeys := []string{otherWKey}
|
||||||
|
@ -64,7 +64,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration
|
|||||||
// newProcessor constructs a new processor.
|
// newProcessor constructs a new processor.
|
||||||
func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc,
|
func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc,
|
||||||
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
|
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
|
||||||
info := ss.Get()
|
info := ss.GetInfo()
|
||||||
qcfg := normalizeQueueCfg(info.Queues)
|
qcfg := normalizeQueueCfg(info.Queues)
|
||||||
orderedQueues := []string(nil)
|
orderedQueues := []string(nil)
|
||||||
if info.StrictPriority {
|
if info.StrictPriority {
|
||||||
|
@ -67,9 +67,9 @@ func TestProcessorSuccess(t *testing.T) {
|
|||||||
processed = append(processed, task)
|
processed = append(processed, task)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil)
|
||||||
p.handler = HandlerFunc(handler)
|
p.handler = HandlerFunc(handler)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -165,9 +165,9 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
defer mu.Unlock()
|
defer mu.Unlock()
|
||||||
n++
|
n++
|
||||||
}
|
}
|
||||||
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
|
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
p := newProcessor(testLogger, rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
|
p := newProcessor(testLogger, rdbClient, ss, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
|
||||||
p.handler = tc.handler
|
p.handler = tc.handler
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -232,8 +232,8 @@ func TestProcessorQueues(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
|
ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false)
|
||||||
p := newProcessor(testLogger, nil, ps, defaultDelayFunc, nil, cancelations, nil)
|
p := newProcessor(testLogger, nil, ss, defaultDelayFunc, nil, cancelations, nil)
|
||||||
got := p.queues()
|
got := p.queues()
|
||||||
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
||||||
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
|
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
|
||||||
@ -300,8 +300,8 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
|
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
|
||||||
cancelations := base.NewCancelations()
|
cancelations := base.NewCancelations()
|
||||||
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
|
ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
|
||||||
p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
|
p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil)
|
||||||
p.handler = HandlerFunc(handler)
|
p.handler = HandlerFunc(handler)
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
@ -276,7 +276,8 @@ func (srv *Server) Start(handler Handler) error {
|
|||||||
// Stops the background-task processing.
|
// Stops the background-task processing.
|
||||||
// TODO: do we need to return error?
|
// TODO: do we need to return error?
|
||||||
func (srv *Server) Stop() {
|
func (srv *Server) Stop() {
|
||||||
if srv.ss.Status() != base.StatusRunning {
|
switch srv.ss.Status() {
|
||||||
|
case base.StatusIdle, base.StatusStopped:
|
||||||
// server is not running, do nothing and return.
|
// server is not running, do nothing and return.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user