diff --git a/heartbeat_test.go b/heartbeat_test.go index e6deb88..a1b4127 100644 --- a/heartbeat_test.go +++ b/heartbeat_test.go @@ -31,17 +31,18 @@ func TestHeartbeater(t *testing.T) { } timeCmpOpt := cmpopts.EquateApproxTime(10 * time.Millisecond) - ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) + ignoreOpt := cmpopts.IgnoreUnexported(base.ServerInfo{}) + ignoreFieldOpt := cmpopts.IgnoreFields(base.ServerInfo{}, "ServerID") for _, tc := range tests { h.FlushDB(t, r) - state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false) + state := base.NewServerState(tc.host, tc.pid, tc.concurrency, tc.queues, false) hb := newHeartbeater(testLogger, rdbClient, state, tc.interval) var wg sync.WaitGroup hb.start(&wg) - want := &base.ProcessInfo{ + want := &base.ServerInfo{ Host: tc.host, PID: tc.pid, Queues: tc.queues, @@ -66,7 +67,7 @@ func TestHeartbeater(t *testing.T) { continue } - if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt); diff != "" { + if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff) hb.terminate() continue @@ -92,7 +93,7 @@ func TestHeartbeater(t *testing.T) { continue } - if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt); diff != "" { + if diff := cmp.Diff(want, ps[0], timeCmpOpt, ignoreOpt, ignoreFieldOpt); diff != "" { t.Errorf("redis stored process status %+v, want %+v; (-want, +got)\n%s", ps[0], want, diff) hb.terminate() continue diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 07de078..03be697 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -41,9 +41,9 @@ var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) [ return out }) -// SortProcessInfoOpt is a cmp.Option to sort base.ProcessInfo for comparing slice of process info. -var SortProcessInfoOpt = cmp.Transformer("SortProcessInfo", func(in []*base.ProcessInfo) []*base.ProcessInfo { - out := append([]*base.ProcessInfo(nil), in...) // Copy input to avoid mutating it +// SortServerInfoOpt is a cmp.Option to sort base.ServerInfo for comparing slice of process info. +var SortServerInfoOpt = cmp.Transformer("SortServerInfo", func(in []*base.ServerInfo) []*base.ServerInfo { + out := append([]*base.ServerInfo(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { if out[i].Host != out[j].Host { return out[i].Host < out[j].Host diff --git a/internal/base/base.go b/internal/base/base.go index abf0fae..460d96a 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -20,10 +20,10 @@ const DefaultQueueName = "default" // Redis keys const ( - AllProcesses = "asynq:ps" // ZSET - psPrefix = "asynq:ps:" // STRING - asynq:ps:: + AllServers = "asynq:servers" // ZSET + serversPrefix = "asynq:servers:" // STRING - asynq:ps::: AllWorkers = "asynq:workers" // ZSET - workersPrefix = "asynq:workers:" // HASH - asynq:workers: + workersPrefix = "asynq:workers:" // HASH - asynq:workers:: processedPrefix = "asynq:processed:" // STRING - asynq:processed: failurePrefix = "asynq:failure:" // STRING - asynq:failure: QueuePrefix = "asynq:queues:" // LIST - asynq:queues: @@ -51,14 +51,14 @@ func FailureKey(t time.Time) string { return failurePrefix + t.UTC().Format("2006-01-02") } -// ProcessInfoKey returns a redis key for process info. -func ProcessInfoKey(hostname string, pid int) string { - return fmt.Sprintf("%s%s:%d", psPrefix, hostname, pid) +// ServerInfoKey returns a redis key for process info. +func ServerInfoKey(hostname string, pid int, sid string) string { + return fmt.Sprintf("%s%s:%d:%s", serversPrefix, hostname, pid, sid) } -// WorkersKey returns a redis key for the workers given hostname and pid. -func WorkersKey(hostname string, pid int) string { - return fmt.Sprintf("%s%s:%d", workersPrefix, hostname, pid) +// WorkersKey returns a redis key for the workers given hostname, pid, and server ID. +func WorkersKey(hostname string, pid int, sid string) string { + return fmt.Sprintf("%s%s:%d:%s", workersPrefix, hostname, pid, sid) } // TaskMessage is the internal representation of a task with additional metadata fields. @@ -109,6 +109,7 @@ type TaskMessage struct { // ServerStates are safe for concurrent use by multiple goroutines. type ServerState struct { mu sync.Mutex // guards all data fields + id xid.ID concurrency int queues map[string]int strictPriority bool @@ -160,6 +161,7 @@ func NewServerState(host string, pid, concurrency int, queues map[string]int, st return &ServerState{ host: host, pid: pid, + id: xid.New(), concurrency: concurrency, queues: cloneQueueConfig(queues), strictPriority: strict, @@ -175,7 +177,7 @@ func (ss *ServerState) SetStatus(status ServerStatus) { ss.status = status } -// GetStatus returns the status of server. +// Status returns the status of server. func (ss *ServerState) Status() ServerStatus { ss.mu.Lock() defer ss.mu.Unlock() @@ -203,13 +205,14 @@ func (ss *ServerState) DeleteWorkerStats(msg *TaskMessage) { delete(ss.workers, msg.ID.String()) } -// Get returns current state of process as a ProcessInfo. -func (ss *ServerState) Get() *ProcessInfo { +// GetInfo returns current state of server as a ServerInfo. +func (ss *ServerState) GetInfo() *ServerInfo { ss.mu.Lock() defer ss.mu.Unlock() - return &ProcessInfo{ + return &ServerInfo{ Host: ss.host, PID: ss.pid, + ServerID: ss.id.String(), Concurrency: ss.concurrency, Queues: cloneQueueConfig(ss.queues), StrictPriority: ss.strictPriority, @@ -254,10 +257,11 @@ func clonePayload(payload map[string]interface{}) map[string]interface{} { return res } -// ProcessInfo holds information about a running background worker process. -type ProcessInfo struct { +// ServerInfo holds information about a running server. +type ServerInfo struct { Host string PID int + ServerID string Concurrency int Queues map[string]int StrictPriority bool diff --git a/internal/base/base_test.go b/internal/base/base_test.go index c475cb4..208f204 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/rs/xid" ) @@ -67,20 +68,21 @@ func TestFailureKey(t *testing.T) { } } -func TestProcessInfoKey(t *testing.T) { +func TestServerInfoKey(t *testing.T) { tests := []struct { hostname string pid int + sid string want string }{ - {"localhost", 9876, "asynq:ps:localhost:9876"}, - {"127.0.0.1", 1234, "asynq:ps:127.0.0.1:1234"}, + {"localhost", 9876, "server123", "asynq:servers:localhost:9876:server123"}, + {"127.0.0.1", 1234, "server987", "asynq:servers:127.0.0.1:1234:server987"}, } for _, tc := range tests { - got := ProcessInfoKey(tc.hostname, tc.pid) + got := ServerInfoKey(tc.hostname, tc.pid, tc.sid) if got != tc.want { - t.Errorf("ProcessInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want) + t.Errorf("ServerInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want) } } } @@ -89,14 +91,15 @@ func TestWorkersKey(t *testing.T) { tests := []struct { hostname string pid int + sid string want string }{ - {"localhost", 9876, "asynq:workers:localhost:9876"}, - {"127.0.0.1", 1234, "asynq:workers:127.0.0.1:1234"}, + {"localhost", 9876, "server1", "asynq:workers:localhost:9876:server1"}, + {"127.0.0.1", 1234, "server2", "asynq:workers:127.0.0.1:1234:server2"}, } for _, tc := range tests { - got := WorkersKey(tc.hostname, tc.pid) + got := WorkersKey(tc.hostname, tc.pid, tc.sid) if got != tc.want { t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want) } @@ -106,7 +109,7 @@ func TestWorkersKey(t *testing.T) { // Test for process state being accessed by multiple goroutines. // Run with -race flag to check for data race. func TestProcessStateConcurrentAccess(t *testing.T) { - ps := NewProcessState("127.0.0.1", 1234, 10, map[string]int{"default": 1}, false) + ss := NewServerState("127.0.0.1", 1234, 10, map[string]int{"default": 1}, false) var wg sync.WaitGroup started := time.Now() msgs := []*TaskMessage{ @@ -119,18 +122,18 @@ func TestProcessStateConcurrentAccess(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - ps.SetStarted(started) - ps.SetStatus(StatusRunning) + ss.SetStarted(started) + ss.SetStatus(StatusRunning) }() // Simulate processor starting worker goroutines. for _, msg := range msgs { wg.Add(1) - ps.AddWorkerStats(msg, time.Now()) + ss.AddWorkerStats(msg, time.Now()) go func(msg *TaskMessage) { defer wg.Done() time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond) - ps.DeleteWorkerStats(msg) + ss.DeleteWorkerStats(msg) }(msg) } @@ -139,15 +142,15 @@ func TestProcessStateConcurrentAccess(t *testing.T) { go func() { wg.Done() for i := 0; i < 5; i++ { - ps.Get() - ps.GetWorkers() + ss.GetInfo() + ss.GetWorkers() time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) } }() wg.Wait() - want := &ProcessInfo{ + want := &ServerInfo{ Host: "127.0.0.1", PID: 1234, Concurrency: 10, @@ -158,9 +161,9 @@ func TestProcessStateConcurrentAccess(t *testing.T) { ActiveWorkerCount: 0, } - got := ps.Get() - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("(*ProcessState).Get() = %+v, want %+v; (-want,+got)\n%s", + got := ss.GetInfo() + if diff := cmp.Diff(want, got, cmpopts.IgnoreFields(ServerInfo{}, "ServerID")); diff != "" { + t.Errorf("(*ServerState).GetInfo() = %+v, want %+v; (-want,+got)\n%s", got, want, diff) } } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 1678136..8fc8a8d 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -758,6 +758,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { return nil } +// TODO: Rename this to listServerInfo. // Note: Script also removes stale keys. var listProcessesCmd = redis.NewScript(` local res = {} @@ -773,9 +774,9 @@ redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) return res`) // ListProcesses returns the list of process statuses. -func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) { +func (r *RDB) ListProcesses() ([]*base.ServerInfo, error) { res, err := listProcessesCmd.Run(r.client, - []string{base.AllProcesses}, time.Now().UTC().Unix()).Result() + []string{base.AllServers}, time.Now().UTC().Unix()).Result() if err != nil { return nil, err } @@ -783,9 +784,9 @@ func (r *RDB) ListProcesses() ([]*base.ProcessInfo, error) { if err != nil { return nil, err } - var processes []*base.ProcessInfo + var processes []*base.ServerInfo for _, s := range data { - var ps base.ProcessInfo + var ps base.ServerInfo err := json.Unmarshal([]byte(s), &ps) if err != nil { continue // skip bad data diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index c3b774e..5c50810 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2055,10 +2055,10 @@ func TestListProcesses(t *testing.T) { r := setup(t) started1 := time.Now().Add(-time.Hour) - ps1 := base.NewProcessState("do.droplet1", 1234, 10, map[string]int{"default": 1}, false) - ps1.SetStarted(started1) - ps1.SetStatus(base.StatusRunning) - info1 := &base.ProcessInfo{ + ss1 := base.NewServerState("do.droplet1", 1234, 10, map[string]int{"default": 1}, false) + ss1.SetStarted(started1) + ss1.SetStatus(base.StatusRunning) + info1 := &base.ServerInfo{ Concurrency: 10, Queues: map[string]int{"default": 1}, Host: "do.droplet1", @@ -2069,11 +2069,11 @@ func TestListProcesses(t *testing.T) { } started2 := time.Now().Add(-2 * time.Hour) - ps2 := base.NewProcessState("do.droplet2", 9876, 20, map[string]int{"email": 1}, false) - ps2.SetStarted(started2) - ps2.SetStatus(base.StatusStopped) - ps2.AddWorkerStats(h.NewTaskMessage("send_email", nil), time.Now()) - info2 := &base.ProcessInfo{ + ss2 := base.NewServerState("do.droplet2", 9876, 20, map[string]int{"email": 1}, false) + ss2.SetStarted(started2) + ss2.SetStatus(base.StatusStopped) + ss2.AddWorkerStats(h.NewTaskMessage("send_email", nil), time.Now()) + info2 := &base.ServerInfo{ Concurrency: 20, Queues: map[string]int{"email": 1}, Host: "do.droplet2", @@ -2084,30 +2084,31 @@ func TestListProcesses(t *testing.T) { } tests := []struct { - processes []*base.ProcessState - want []*base.ProcessInfo + serverStates []*base.ServerState + want []*base.ServerInfo }{ { - processes: []*base.ProcessState{}, - want: []*base.ProcessInfo{}, + serverStates: []*base.ServerState{}, + want: []*base.ServerInfo{}, }, { - processes: []*base.ProcessState{ps1}, - want: []*base.ProcessInfo{info1}, + serverStates: []*base.ServerState{ss1}, + want: []*base.ServerInfo{info1}, }, { - processes: []*base.ProcessState{ps1, ps2}, - want: []*base.ProcessInfo{info1, info2}, + serverStates: []*base.ServerState{ss1, ss2}, + want: []*base.ServerInfo{info1, info2}, }, } - ignoreOpt := cmpopts.IgnoreUnexported(base.ProcessInfo{}) + ignoreOpt := cmpopts.IgnoreUnexported(base.ServerInfo{}) + ignoreFieldOpt := cmpopts.IgnoreFields(base.ServerInfo{}, "ServerID") for _, tc := range tests { h.FlushDB(t, r.client) - for _, ps := range tc.processes { - if err := r.WriteProcessState(ps, 5*time.Second); err != nil { + for _, ss := range tc.serverStates { + if err := r.WriteServerState(ss, 5*time.Second); err != nil { t.Fatal(err) } } @@ -2116,9 +2117,9 @@ func TestListProcesses(t *testing.T) { if err != nil { t.Errorf("r.ListProcesses returned an error: %v", err) } - if diff := cmp.Diff(tc.want, got, h.SortProcessInfoOpt, ignoreOpt); diff != "" { + if diff := cmp.Diff(tc.want, got, h.SortServerInfoOpt, ignoreOpt, ignoreFieldOpt); diff != "" { t.Errorf("r.ListProcesses returned %v, want %v; (-want,+got)\n%s", - got, tc.processes, diff) + got, tc.serverStates, diff) } } } @@ -2164,13 +2165,13 @@ func TestListWorkers(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) - ps := base.NewProcessState(host, pid, 10, map[string]int{"default": 1}, false) + ss := base.NewServerState(host, pid, 10, map[string]int{"default": 1}, false) for _, w := range tc.workers { - ps.AddWorkerStats(w.msg, w.started) + ss.AddWorkerStats(w.msg, w.started) } - err := r.WriteProcessState(ps, time.Minute) + err := r.WriteServerState(ss, time.Minute) if err != nil { t.Errorf("could not write process state to redis: %v", err) continue diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 6e373d2..7fc4481 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -463,9 +463,9 @@ func (r *RDB) forwardSingle(src, dst string) error { []string{src, dst}, now).Err() } -// KEYS[1] -> asynq:ps: -// KEYS[2] -> asynq:ps -// KEYS[3] -> asynq:workers +// KEYS[1] -> asynq:servers: +// KEYS[2] -> asynq:servers +// KEYS[3] -> asynq:workers // keys[4] -> asynq:workers // ARGV[1] -> expiration time // ARGV[2] -> TTL in seconds @@ -486,7 +486,7 @@ return redis.status_reply("OK")`) // WriteServerState writes server state data to redis with expiration set to the value ttl. func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error { - info := ss.Get() + info := ss.GetInfo() bytes, err := json.Marshal(info) if err != nil { return err @@ -502,17 +502,17 @@ func (r *RDB) WriteServerState(ss *base.ServerState, ttl time.Duration) error { } args = append(args, w.ID.String(), bytes) } - pkey := base.ProcessInfoKey(info.Host, info.PID) - wkey := base.WorkersKey(info.Host, info.PID) + skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID) + wkey := base.WorkersKey(info.Host, info.PID, info.ServerID) return writeProcessInfoCmd.Run(r.client, - []string{pkey, base.AllProcesses, wkey, base.AllWorkers}, + []string{skey, base.AllServers, wkey, base.AllWorkers}, args...).Err() } -// KEYS[1] -> asynq:ps -// KEYS[2] -> asynq:ps: +// KEYS[1] -> asynq:servers +// KEYS[2] -> asynq:servers: // KEYS[3] -> asynq:workers -// KEYS[4] -> asynq:workers +// KEYS[4] -> asynq:workers var clearProcessInfoCmd = redis.NewScript(` redis.call("ZREM", KEYS[1], KEYS[2]) redis.call("DEL", KEYS[2]) @@ -522,12 +522,12 @@ return redis.status_reply("OK")`) // ClearServerState deletes server state data from redis. func (r *RDB) ClearServerState(ss *base.ServerState) error { - info := ss.Get() - host, pid := info.Host, info.PID - pkey := base.ProcessInfoKey(host, pid) - wkey := base.WorkersKey(host, pid) + info := ss.GetInfo() + host, pid, id := info.Host, info.PID, info.ServerID + skey := base.ServerInfoKey(host, pid, id) + wkey := base.WorkersKey(host, pid, id) return clearProcessInfoCmd.Run(r.client, - []string{base.AllProcesses, pkey, base.AllWorkers, wkey}).Err() + []string{base.AllServers, skey, base.AllWorkers, wkey}).Err() } // CancelationPubSub returns a pubsub for cancelation messages. diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index dc5cba5..cd0fe00 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -862,60 +862,61 @@ func TestCheckAndEnqueue(t *testing.T) { } } -func TestWriteProcessState(t *testing.T) { +func TestWriteServerState(t *testing.T) { r := setup(t) - host, pid := "localhost", 98765 queues := map[string]int{"default": 2, "email": 5, "low": 1} started := time.Now() - ps := base.NewProcessState(host, pid, 10, queues, false) - ps.SetStarted(started) - ps.SetStatus(base.StatusRunning) + ss := base.NewServerState("localhost", 4242, 10, queues, false) + ss.SetStarted(started) + ss.SetStatus(base.StatusRunning) ttl := 5 * time.Second h.FlushDB(t, r.client) - err := r.WriteProcessState(ps, ttl) + err := r.WriteServerState(ss, ttl) if err != nil { - t.Errorf("r.WriteProcessState returned an error: %v", err) + t.Errorf("r.WriteServerState returned an error: %v", err) } - // Check ProcessInfo was written correctly - pkey := base.ProcessInfoKey(host, pid) - data := r.client.Get(pkey).Val() - var got base.ProcessInfo + // Check ServerInfo was written correctly + info := ss.GetInfo() + skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID) + data := r.client.Get(skey).Val() + var got base.ServerInfo err = json.Unmarshal([]byte(data), &got) if err != nil { t.Fatalf("could not decode json: %v", err) } - want := base.ProcessInfo{ - Host: "localhost", - PID: 98765, - Concurrency: 10, + want := base.ServerInfo{ + Host: info.Host, + PID: info.PID, + Concurrency: info.Concurrency, Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, Status: "running", Started: started, ActiveWorkerCount: 0, } - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("persisted ProcessInfo was %v, want %v; (-want,+got)\n%s", + ignoreOpt := cmpopts.IgnoreFields(base.ServerInfo{}, "ServerID") + if diff := cmp.Diff(want, got, ignoreOpt); diff != "" { + t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s", got, want, diff) } - // Check ProcessInfo TTL was set correctly - gotTTL := r.client.TTL(pkey).Val() + // Check ServerInfo TTL was set correctly + gotTTL := r.client.TTL(skey).Val() if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { - t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) + t.Errorf("TTL of %q was %v, want %v", skey, gotTTL, ttl) } - // Check ProcessInfo key was added to the set correctly - gotProcesses := r.client.ZRange(base.AllProcesses, 0, -1).Val() - wantProcesses := []string{pkey} + // Check ServerInfo key was added to the set correctly + gotProcesses := r.client.ZRange(base.AllServers, 0, -1).Val() + wantProcesses := []string{skey} if diff := cmp.Diff(wantProcesses, gotProcesses); diff != "" { - t.Errorf("%q contained %v, want %v", base.AllProcesses, gotProcesses, wantProcesses) + t.Errorf("%q contained %v, want %v", base.AllServers, gotProcesses, wantProcesses) } // Check WorkersInfo was written correctly - wkey := base.WorkersKey(host, pid) + wkey := base.WorkersKey(info.Host, info.PID, info.ServerID) workerExist := r.client.Exists(wkey).Val() if workerExist != 0 { t.Errorf("%q key exists", wkey) @@ -928,9 +929,8 @@ func TestWriteProcessState(t *testing.T) { } } -func TestWriteProcessStateWithWorkers(t *testing.T) { +func TestWriteServerStateWithWorkers(t *testing.T) { r := setup(t) - host, pid := "localhost", 98765 queues := map[string]int{"default": 2, "email": 5, "low": 1} concurrency := 10 @@ -939,31 +939,33 @@ func TestWriteProcessStateWithWorkers(t *testing.T) { w2Started := time.Now().Add(-time.Second) msg1 := h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"}) msg2 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"}) - ps := base.NewProcessState(host, pid, concurrency, queues, false) - ps.SetStarted(started) - ps.SetStatus(base.StatusRunning) - ps.AddWorkerStats(msg1, w1Started) - ps.AddWorkerStats(msg2, w2Started) + ss := base.NewServerState("127.0.01", 4242, concurrency, queues, false) + ss.SetStarted(started) + ss.SetStatus(base.StatusRunning) + ss.AddWorkerStats(msg1, w1Started) + ss.AddWorkerStats(msg2, w2Started) ttl := 5 * time.Second h.FlushDB(t, r.client) - err := r.WriteProcessState(ps, ttl) + err := r.WriteServerState(ss, ttl) if err != nil { - t.Errorf("r.WriteProcessState returned an error: %v", err) + t.Errorf("r.WriteServerState returned an error: %v", err) } - // Check ProcessInfo was written correctly - pkey := base.ProcessInfoKey(host, pid) - data := r.client.Get(pkey).Val() - var got base.ProcessInfo + // Check ServerInfo was written correctly + info := ss.GetInfo() + skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID) + data := r.client.Get(skey).Val() + var got base.ServerInfo err = json.Unmarshal([]byte(data), &got) if err != nil { t.Fatalf("could not decode json: %v", err) } - want := base.ProcessInfo{ - Host: host, - PID: pid, + want := base.ServerInfo{ + Host: info.Host, + PID: info.PID, + ServerID: info.ServerID, Concurrency: concurrency, Queues: queues, StrictPriority: false, @@ -972,23 +974,23 @@ func TestWriteProcessStateWithWorkers(t *testing.T) { ActiveWorkerCount: 2, } if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("persisted ProcessInfo was %v, want %v; (-want,+got)\n%s", + t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s", got, want, diff) } - // Check ProcessInfo TTL was set correctly - gotTTL := r.client.TTL(pkey).Val() + // Check ServerInfo TTL was set correctly + gotTTL := r.client.TTL(skey).Val() if !cmp.Equal(ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { - t.Errorf("TTL of %q was %v, want %v", pkey, gotTTL, ttl) + t.Errorf("TTL of %q was %v, want %v", skey, gotTTL, ttl) } - // Check ProcessInfo key was added to the set correctly - gotProcesses := r.client.ZRange(base.AllProcesses, 0, -1).Val() - wantProcesses := []string{pkey} + // Check ServerInfo key was added to the set correctly + gotProcesses := r.client.ZRange(base.AllServers, 0, -1).Val() + wantProcesses := []string{skey} if diff := cmp.Diff(wantProcesses, gotProcesses); diff != "" { - t.Errorf("%q contained %v, want %v", base.AllProcesses, gotProcesses, wantProcesses) + t.Errorf("%q contained %v, want %v", base.AllServers, gotProcesses, wantProcesses) } // Check WorkersInfo was written correctly - wkey := base.WorkersKey(host, pid) + wkey := base.WorkersKey(info.Host, info.PID, info.ServerID) wdata := r.client.HGetAll(wkey).Val() if len(wdata) != 2 { t.Fatalf("HGETALL %q returned a hash of size %d, want 2", wkey, len(wdata)) @@ -1003,8 +1005,8 @@ func TestWriteProcessStateWithWorkers(t *testing.T) { } wantWorkers := map[string]*base.WorkerInfo{ msg1.ID.String(): { - Host: host, - PID: pid, + Host: info.Host, + PID: info.PID, ID: msg1.ID, Type: msg1.Type, Queue: msg1.Queue, @@ -1012,8 +1014,8 @@ func TestWriteProcessStateWithWorkers(t *testing.T) { Started: w1Started, }, msg2.ID.String(): { - Host: host, - PID: pid, + Host: info.Host, + PID: info.PID, ID: msg2.ID, Type: msg2.Type, Queue: msg2.Queue, @@ -1039,27 +1041,28 @@ func TestWriteProcessStateWithWorkers(t *testing.T) { } } -func TestClearProcessState(t *testing.T) { +func TestClearServerState(t *testing.T) { r := setup(t) - host, pid := "127.0.0.1", 1234 + ss := base.NewServerState("127.0.01", 4242, 10, map[string]int{"default": 1}, false) + info := ss.GetInfo() h.FlushDB(t, r.client) - pkey := base.ProcessInfoKey(host, pid) - wkey := base.WorkersKey(host, pid) - otherPKey := base.ProcessInfoKey("otherhost", 12345) - otherWKey := base.WorkersKey("otherhost", 12345) + skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID) + wkey := base.WorkersKey(info.Host, info.PID, info.ServerID) + otherSKey := base.ServerInfoKey("otherhost", 12345, "server98") + otherWKey := base.WorkersKey("otherhost", 12345, "server98") // Populate the keys. - if err := r.client.Set(pkey, "process-info", 0).Err(); err != nil { + if err := r.client.Set(skey, "process-info", 0).Err(); err != nil { t.Fatal(err) } if err := r.client.HSet(wkey, "worker-key", "worker-info").Err(); err != nil { t.Fatal(err) } - if err := r.client.ZAdd(base.AllProcesses, &redis.Z{Member: pkey}).Err(); err != nil { + if err := r.client.ZAdd(base.AllServers, &redis.Z{Member: skey}).Err(); err != nil { t.Fatal(err) } - if err := r.client.ZAdd(base.AllProcesses, &redis.Z{Member: otherPKey}).Err(); err != nil { + if err := r.client.ZAdd(base.AllServers, &redis.Z{Member: otherSKey}).Err(); err != nil { t.Fatal(err) } if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Member: wkey}).Err(); err != nil { @@ -1069,24 +1072,22 @@ func TestClearProcessState(t *testing.T) { t.Fatal(err) } - ps := base.NewProcessState(host, pid, 10, map[string]int{"default": 1}, false) - - err := r.ClearProcessState(ps) + err := r.ClearServerState(ss) if err != nil { - t.Fatalf("(*RDB).ClearProcessState failed: %v", err) + t.Fatalf("(*RDB).ClearServerState failed: %v", err) } // Check all keys are cleared - if r.client.Exists(pkey).Val() != 0 { - t.Errorf("Redis key %q exists", pkey) + if r.client.Exists(skey).Val() != 0 { + t.Errorf("Redis key %q exists", skey) } if r.client.Exists(wkey).Val() != 0 { t.Errorf("Redis key %q exists", wkey) } - gotProcessKeys := r.client.ZRange(base.AllProcesses, 0, -1).Val() - wantProcessKeys := []string{otherPKey} + gotProcessKeys := r.client.ZRange(base.AllServers, 0, -1).Val() + wantProcessKeys := []string{otherSKey} if diff := cmp.Diff(wantProcessKeys, gotProcessKeys); diff != "" { - t.Errorf("%q contained %v, want %v", base.AllProcesses, gotProcessKeys, wantProcessKeys) + t.Errorf("%q contained %v, want %v", base.AllServers, gotProcessKeys, wantProcessKeys) } gotWorkerKeys := r.client.ZRange(base.AllWorkers, 0, -1).Val() wantWorkerKeys := []string{otherWKey} diff --git a/processor.go b/processor.go index 33ce503..7ce8766 100644 --- a/processor.go +++ b/processor.go @@ -64,7 +64,7 @@ type retryDelayFunc func(n int, err error, task *Task) time.Duration // newProcessor constructs a new processor. func newProcessor(l Logger, r *rdb.RDB, ss *base.ServerState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor { - info := ss.Get() + info := ss.GetInfo() qcfg := normalizeQueueCfg(info.Queues) orderedQueues := []string(nil) if info.StrictPriority { diff --git a/processor_test.go b/processor_test.go index 3c79102..99c39a8 100644 --- a/processor_test.go +++ b/processor_test.go @@ -67,9 +67,9 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) + ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) + p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil) p.handler = HandlerFunc(handler) var wg sync.WaitGroup @@ -165,9 +165,9 @@ func TestProcessorRetry(t *testing.T) { defer mu.Unlock() n++ } - ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false) + ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) cancelations := base.NewCancelations() - p := newProcessor(testLogger, rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler)) + p := newProcessor(testLogger, rdbClient, ss, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler)) p.handler = tc.handler var wg sync.WaitGroup @@ -232,8 +232,8 @@ func TestProcessorQueues(t *testing.T) { for _, tc := range tests { cancelations := base.NewCancelations() - ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false) - p := newProcessor(testLogger, nil, ps, defaultDelayFunc, nil, cancelations, nil) + ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) + p := newProcessor(testLogger, nil, ss, defaultDelayFunc, nil, cancelations, nil) got := p.queues() if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" { t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s", @@ -300,8 +300,8 @@ func TestProcessorWithStrictPriority(t *testing.T) { } // Note: Set concurrency to 1 to make sure tasks are processed one at a time. cancelations := base.NewCancelations() - ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) - p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil) + ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) + p := newProcessor(testLogger, rdbClient, ss, defaultDelayFunc, nil, cancelations, nil) p.handler = HandlerFunc(handler) var wg sync.WaitGroup diff --git a/server.go b/server.go index 18d31d5..8943aca 100644 --- a/server.go +++ b/server.go @@ -276,7 +276,8 @@ func (srv *Server) Start(handler Handler) error { // Stops the background-task processing. // TODO: do we need to return error? func (srv *Server) Stop() { - if srv.ss.Status() != base.StatusRunning { + switch srv.ss.Status() { + case base.StatusIdle, base.StatusStopped: // server is not running, do nothing and return. return }