diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0c1c7ff --- /dev/null +++ b/Makefile @@ -0,0 +1,7 @@ +ROOT_DIR:=$(shell dirname $(realpath $(firstword $(MAKEFILE_LIST)))) + +proto: internal/proto/asynq.proto + protoc -I=$(ROOT_DIR)/internal/proto \ + --go_out=$(ROOT_DIR)/internal/proto \ + --go_opt=module=github.com/hibiken/asynq/internal/proto \ + $(ROOT_DIR)/internal/proto/asynq.proto \ No newline at end of file diff --git a/go.mod b/go.mod index 77bb2ed..c30a1d3 100644 --- a/go.mod +++ b/go.mod @@ -4,12 +4,14 @@ go 1.13 require ( github.com/go-redis/redis/v7 v7.4.0 - github.com/google/go-cmp v0.4.0 + github.com/golang/protobuf v1.4.1 + github.com/google/go-cmp v0.5.0 github.com/google/uuid v1.2.0 github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cast v1.3.1 go.uber.org/goleak v0.10.0 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 + google.golang.org/protobuf v1.25.0 gopkg.in/yaml.v2 v2.2.7 // indirect ) diff --git a/go.sum b/go.sum index d391383..dad6de7 100644 --- a/go.sum +++ b/go.sum @@ -1,16 +1,36 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs= github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-redis/redis/v7 v7.4.0 h1:7obg6wUoj05T0EpY0o8B59S9w5yeMWql7sw2kwNW1x4= github.com/go-redis/redis/v7 v7.4.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1 h1:ZFgWrT+bLgsYPirOnRfKLYJLvssAegOj/hgyMFdJZe0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= @@ -29,6 +49,7 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= @@ -38,11 +59,23 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4= go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd h1:nTDtHvHSdCn1m6ITfMRqtOd/9+7a3s8RBNOZ3eYZzJA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190923162816-aa69164e4478 h1:l5EDrHhldLYb3ZRHDUhXF7Om7MvYXnkV9/iQNo1lX6g= golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -56,8 +89,29 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0 h1:Ejskq+SyPohKW+1uil0JJMtmHCgJPJ/qWTxr8qp+R4c= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -70,3 +124,5 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo= gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 8ee0193..082ec73 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -6,7 +6,6 @@ package asynqtest import ( - "encoding/json" "math" "sort" "testing" @@ -130,7 +129,7 @@ func TaskMessageWithError(t base.TaskMessage, errMsg string) *base.TaskMessage { // Calling test will fail if marshaling errors out. func MustMarshal(tb testing.TB, msg *base.TaskMessage) string { tb.Helper() - data, err := json.Marshal(msg) + data, err := base.EncodeMessage(msg) if err != nil { tb.Fatal(err) } @@ -141,12 +140,11 @@ func MustMarshal(tb testing.TB, msg *base.TaskMessage) string { // Calling test will fail if unmarshaling errors out. func MustUnmarshal(tb testing.TB, data string) *base.TaskMessage { tb.Helper() - var msg base.TaskMessage - err := json.Unmarshal([]byte(data), &msg) + msg, err := base.DecodeMessage([]byte(data)) if err != nil { tb.Fatal(err) } - return &msg + return msg } // MustMarshalSlice marshals a slice of task messages and return a slice of diff --git a/internal/base/base.go b/internal/base/base.go index e412256..77d6dbb 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -6,6 +6,7 @@ package base import ( + "bytes" "context" "encoding/json" "fmt" @@ -15,7 +16,10 @@ import ( "time" "github.com/go-redis/redis/v7" + "github.com/golang/protobuf/ptypes" "github.com/google/uuid" + pb "github.com/hibiken/asynq/internal/proto" + "google.golang.org/protobuf/proto" ) // Version of asynq library and CLI. @@ -194,24 +198,51 @@ type TaskMessage struct { UniqueKey string } -// EncodeMessage marshals the given task message in JSON and returns an encoded string. -func EncodeMessage(msg *TaskMessage) (string, error) { - b, err := json.Marshal(msg) - if err != nil { - return "", err +// EncodeMessage marshals the given task message and returns an encoded bytes. +func EncodeMessage(msg *TaskMessage) ([]byte, error) { + if msg == nil { + return nil, fmt.Errorf("cannot encode nil message") } - return string(b), nil -} - -// DecodeMessage unmarshals the given encoded string and returns a decoded task message. -func DecodeMessage(s string) (*TaskMessage, error) { - d := json.NewDecoder(strings.NewReader(s)) - d.UseNumber() - var msg TaskMessage - if err := d.Decode(&msg); err != nil { + payload, err := json.Marshal(msg.Payload) + if err != nil { return nil, err } - return &msg, nil + return proto.Marshal(&pb.TaskMessage{ + Type: msg.Type, + Payload: payload, + Id: msg.ID.String(), + Queue: msg.Queue, + Retry: int32(msg.Retry), + Retried: int32(msg.Retried), + ErrorMsg: msg.ErrorMsg, + Timeout: msg.Timeout, + Deadline: msg.Deadline, + UniqueKey: msg.UniqueKey, + }) +} + +// DecodeMessage unmarshals the given bytes and returns a decoded task message. +func DecodeMessage(data []byte) (*TaskMessage, error) { + var pbmsg pb.TaskMessage + if err := proto.Unmarshal(data, &pbmsg); err != nil { + return nil, err + } + payload, err := decodePayload(pbmsg.GetPayload()) + if err != nil { + return nil, err + } + return &TaskMessage{ + Type: pbmsg.GetType(), + Payload: payload, + ID: uuid.MustParse(pbmsg.GetId()), + Queue: pbmsg.GetQueue(), + Retry: int(pbmsg.GetRetry()), + Retried: int(pbmsg.GetRetried()), + ErrorMsg: pbmsg.GetErrorMsg(), + Timeout: pbmsg.GetTimeout(), + Deadline: pbmsg.GetDeadline(), + UniqueKey: pbmsg.GetUniqueKey(), + }, nil } // Z represents sorted set member. @@ -292,6 +323,59 @@ type ServerInfo struct { ActiveWorkerCount int } +// EncodeServerInfo marshals the given ServerInfo and returns the encoded bytes. +func EncodeServerInfo(info *ServerInfo) ([]byte, error) { + if info == nil { + return nil, fmt.Errorf("cannot encode nil server info") + } + queues := make(map[string]int32) + for q, p := range info.Queues { + queues[q] = int32(p) + } + started, err := ptypes.TimestampProto(info.Started) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.ServerInfo{ + Host: info.Host, + Pid: int32(info.PID), + ServerId: info.ServerID, + Concurrency: int32(info.Concurrency), + Queues: queues, + StrictPriority: info.StrictPriority, + Status: info.Status, + StartTime: started, + ActiveWorkerCount: int32(info.ActiveWorkerCount), + }) +} + +// DecodeServerInfo decodes the given bytes into ServerInfo. +func DecodeServerInfo(b []byte) (*ServerInfo, error) { + var pbmsg pb.ServerInfo + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + queues := make(map[string]int) + for q, p := range pbmsg.GetQueues() { + queues[q] = int(p) + } + startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) + if err != nil { + return nil, err + } + return &ServerInfo{ + Host: pbmsg.GetHost(), + PID: int(pbmsg.GetPid()), + ServerID: pbmsg.GetServerId(), + Concurrency: int(pbmsg.GetConcurrency()), + Queues: queues, + StrictPriority: pbmsg.GetStrictPriority(), + Status: pbmsg.GetStatus(), + Started: startTime, + ActiveWorkerCount: int(pbmsg.GetActiveWorkerCount()), + }, nil +} + // WorkerInfo holds information about a running worker. type WorkerInfo struct { Host string @@ -299,12 +383,83 @@ type WorkerInfo struct { ServerID string ID string Type string - Queue string Payload map[string]interface{} + Queue string Started time.Time Deadline time.Time } +// EncodeWorkerInfo marshals the given WorkerInfo and returns the encoded bytes. +func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { + if info == nil { + return nil, fmt.Errorf("cannot encode nil worker info") + } + payload, err := json.Marshal(info.Payload) + if err != nil { + return nil, err + } + startTime, err := ptypes.TimestampProto(info.Started) + if err != nil { + return nil, err + } + deadline, err := ptypes.TimestampProto(info.Deadline) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.WorkerInfo{ + Host: info.Host, + Pid: int32(info.PID), + ServerId: info.ServerID, + TaskId: info.ID, + TaskType: info.Type, + TaskPayload: payload, + Queue: info.Queue, + StartTime: startTime, + Deadline: deadline, + }) +} + +func decodePayload(b []byte) (map[string]interface{}, error) { + d := json.NewDecoder(bytes.NewReader(b)) + d.UseNumber() + payload := make(map[string]interface{}) + if err := d.Decode(&payload); err != nil { + return nil, err + } + return payload, nil +} + +// DecodeWorkerInfo decodes the given bytes into WorkerInfo. +func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { + var pbmsg pb.WorkerInfo + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + payload, err := decodePayload(pbmsg.GetTaskPayload()) + if err != nil { + return nil, err + } + startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) + if err != nil { + return nil, err + } + deadline, err := ptypes.Timestamp(pbmsg.GetDeadline()) + if err != nil { + return nil, err + } + return &WorkerInfo{ + Host: pbmsg.GetHost(), + PID: int(pbmsg.GetPid()), + ServerID: pbmsg.GetServerId(), + ID: pbmsg.GetTaskId(), + Type: pbmsg.GetTaskType(), + Payload: payload, + Queue: pbmsg.GetQueue(), + Started: startTime, + Deadline: deadline, + }, nil +} + // SchedulerEntry holds information about a periodic task registered with a scheduler. type SchedulerEntry struct { // Identifier of this entry. @@ -330,6 +485,63 @@ type SchedulerEntry struct { Prev time.Time } +// EncodeSchedulerEntry marshals the given entry and returns an encoded bytes. +func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { + if entry == nil { + return nil, fmt.Errorf("cannot encode nil scheduler entry") + } + payload, err := json.Marshal(entry.Payload) + if err != nil { + return nil, err + } + next, err := ptypes.TimestampProto(entry.Next) + if err != nil { + return nil, err + } + prev, err := ptypes.TimestampProto(entry.Prev) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.SchedulerEntry{ + Id: entry.ID, + Spec: entry.Spec, + TaskType: entry.Type, + TaskPayload: payload, + EnqueueOptions: entry.Opts, + NextEnqueueTime: next, + PrevEnqueueTime: prev, + }) +} + +// DecodeSchedulerEntry unmarshals the given bytes and returns a decoded SchedulerEntry. +func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { + var pbmsg pb.SchedulerEntry + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + payload, err := decodePayload(pbmsg.GetTaskPayload()) + if err != nil { + return nil, err + } + next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) + if err != nil { + return nil, err + } + prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime()) + if err != nil { + return nil, err + } + return &SchedulerEntry{ + ID: pbmsg.GetId(), + Spec: pbmsg.GetSpec(), + Type: pbmsg.GetTaskType(), + Payload: payload, + Opts: pbmsg.GetEnqueueOptions(), + Next: next, + Prev: prev, + }, nil +} + // SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. type SchedulerEnqueueEvent struct { // ID of the task that was enqueued. @@ -339,6 +551,39 @@ type SchedulerEnqueueEvent struct { EnqueuedAt time.Time } +// EncodeSchedulerEnqueueEvent marshals the given event +// and returns an encoded bytes. +func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) { + if event == nil { + return nil, fmt.Errorf("cannot encode nil enqueue event") + } + enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt) + if err != nil { + return nil, err + } + return proto.Marshal(&pb.SchedulerEnqueueEvent{ + TaskId: event.TaskID, + EnqueueTime: enqueuedAt, + }) +} + +// DecodeSchedulerEnqueueEvent unmarshals the given bytes +// and returns a decoded SchedulerEnqueueEvent. +func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) { + var pbmsg pb.SchedulerEnqueueEvent + if err := proto.Unmarshal(b, &pbmsg); err != nil { + return nil, err + } + enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime()) + if err != nil { + return nil, err + } + return &SchedulerEnqueueEvent{ + TaskID: pbmsg.GetTaskId(), + EnqueuedAt: enqueuedAt, + }, nil +} + // Cancelations is a collection that holds cancel functions for all active tasks. // // Cancelations are safe for concurrent use by multipel goroutines. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index f99757f..bfebef5 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -372,6 +372,145 @@ func TestMessageEncoding(t *testing.T) { } } +func TestServerInfoEncoding(t *testing.T) { + tests := []struct { + info ServerInfo + }{ + { + info: ServerInfo{ + Host: "127.0.0.1", + PID: 9876, + ServerID: "abc123", + Concurrency: 10, + Queues: map[string]int{"default": 1, "critical": 2}, + StrictPriority: false, + Status: "running", + Started: time.Now().Add(-3 * time.Hour), + ActiveWorkerCount: 8, + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeServerInfo(&tc.info) + if err != nil { + t.Errorf("EncodeServerInfo(info) returned error: %v", err) + continue + } + decoded, err := DecodeServerInfo(encoded) + if err != nil { + t.Errorf("DecodeServerInfo(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.info, decoded); diff != "" { + t.Errorf("Decoded ServerInfo == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.info, diff) + } + } +} + +func TestWorkerInfoEncoding(t *testing.T) { + tests := []struct { + info WorkerInfo + }{ + { + info: WorkerInfo{ + Host: "127.0.0.1", + PID: 9876, + ServerID: "abc123", + ID: uuid.NewString(), + Type: "taskA", + Payload: map[string]interface{}{"foo": "bar"}, + Queue: "default", + Started: time.Now().Add(-3 * time.Hour), + Deadline: time.Now().Add(30 * time.Second), + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeWorkerInfo(&tc.info) + if err != nil { + t.Errorf("EncodeWorkerInfo(info) returned error: %v", err) + continue + } + decoded, err := DecodeWorkerInfo(encoded) + if err != nil { + t.Errorf("DecodeWorkerInfo(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.info, decoded); diff != "" { + t.Errorf("Decoded WorkerInfo == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.info, diff) + } + } +} + +func TestSchedulerEntryEncoding(t *testing.T) { + tests := []struct { + entry SchedulerEntry + }{ + { + entry: SchedulerEntry{ + ID: uuid.NewString(), + Spec: "* * * * *", + Type: "task_A", + Payload: map[string]interface{}{"foo": "bar"}, + Opts: []string{"Queue('email')"}, + Next: time.Now().Add(30 * time.Second).UTC(), + Prev: time.Now().Add(-2 * time.Minute).UTC(), + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeSchedulerEntry(&tc.entry) + if err != nil { + t.Errorf("EncodeSchedulerEntry(entry) returned error: %v", err) + continue + } + decoded, err := DecodeSchedulerEntry(encoded) + if err != nil { + t.Errorf("DecodeSchedulerEntry(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.entry, decoded); diff != "" { + t.Errorf("Decoded SchedulerEntry == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.entry, diff) + } + } +} + +func TestSchedulerEnqueueEventEncoding(t *testing.T) { + tests := []struct { + event SchedulerEnqueueEvent + }{ + { + event: SchedulerEnqueueEvent{ + TaskID: uuid.NewString(), + EnqueuedAt: time.Now().Add(-30 * time.Second).UTC(), + }, + }, + } + + for _, tc := range tests { + encoded, err := EncodeSchedulerEnqueueEvent(&tc.event) + if err != nil { + t.Errorf("EncodeSchedulerEnqueueEvent(event) returned error: %v", err) + continue + } + decoded, err := DecodeSchedulerEnqueueEvent(encoded) + if err != nil { + t.Errorf("DecodeSchedulerEnqueueEvent(encoded) returned error: %v", err) + continue + } + if diff := cmp.Diff(&tc.event, decoded); diff != "" { + t.Errorf("Decoded SchedulerEnqueueEvent == %+v, want %+v;(-want,+got)\n%s", + decoded, tc.event, diff) + } + } +} + // Test for status being accessed by multiple goroutines. // Run with -race flag to check for data race. func TestStatusConcurrentAccess(t *testing.T) { diff --git a/internal/proto/asynq.pb.go b/internal/proto/asynq.pb.go new file mode 100644 index 0000000..fba1820 --- /dev/null +++ b/internal/proto/asynq.pb.go @@ -0,0 +1,755 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.14.0 +// source: asynq.proto + +package proto + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +type TaskMessage struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + Id string `protobuf:"bytes,3,opt,name=id,proto3" json:"id,omitempty"` + Queue string `protobuf:"bytes,4,opt,name=queue,proto3" json:"queue,omitempty"` + Retry int32 `protobuf:"varint,5,opt,name=retry,proto3" json:"retry,omitempty"` + Retried int32 `protobuf:"varint,6,opt,name=retried,proto3" json:"retried,omitempty"` + ErrorMsg string `protobuf:"bytes,7,opt,name=error_msg,json=errorMsg,proto3" json:"error_msg,omitempty"` + Timeout int64 `protobuf:"varint,8,opt,name=timeout,proto3" json:"timeout,omitempty"` + Deadline int64 `protobuf:"varint,9,opt,name=deadline,proto3" json:"deadline,omitempty"` + UniqueKey string `protobuf:"bytes,10,opt,name=unique_key,json=uniqueKey,proto3" json:"unique_key,omitempty"` +} + +func (x *TaskMessage) Reset() { + *x = TaskMessage{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TaskMessage) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskMessage) ProtoMessage() {} + +func (x *TaskMessage) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskMessage.ProtoReflect.Descriptor instead. +func (*TaskMessage) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{0} +} + +func (x *TaskMessage) GetType() string { + if x != nil { + return x.Type + } + return "" +} + +func (x *TaskMessage) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +func (x *TaskMessage) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *TaskMessage) GetQueue() string { + if x != nil { + return x.Queue + } + return "" +} + +func (x *TaskMessage) GetRetry() int32 { + if x != nil { + return x.Retry + } + return 0 +} + +func (x *TaskMessage) GetRetried() int32 { + if x != nil { + return x.Retried + } + return 0 +} + +func (x *TaskMessage) GetErrorMsg() string { + if x != nil { + return x.ErrorMsg + } + return "" +} + +func (x *TaskMessage) GetTimeout() int64 { + if x != nil { + return x.Timeout + } + return 0 +} + +func (x *TaskMessage) GetDeadline() int64 { + if x != nil { + return x.Deadline + } + return 0 +} + +func (x *TaskMessage) GetUniqueKey() string { + if x != nil { + return x.UniqueKey + } + return "" +} + +// ServerInfo holds information about a running server. +type ServerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` + Pid int32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"` + ServerId string `protobuf:"bytes,3,opt,name=server_id,json=serverId,proto3" json:"server_id,omitempty"` + Concurrency int32 `protobuf:"varint,4,opt,name=concurrency,proto3" json:"concurrency,omitempty"` + Queues map[string]int32 `protobuf:"bytes,5,rep,name=queues,proto3" json:"queues,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"varint,2,opt,name=value,proto3"` + StrictPriority bool `protobuf:"varint,6,opt,name=strict_priority,json=strictPriority,proto3" json:"strict_priority,omitempty"` + Status string `protobuf:"bytes,7,opt,name=status,proto3" json:"status,omitempty"` + StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` + ActiveWorkerCount int32 `protobuf:"varint,9,opt,name=active_worker_count,json=activeWorkerCount,proto3" json:"active_worker_count,omitempty"` +} + +func (x *ServerInfo) Reset() { + *x = ServerInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ServerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ServerInfo) ProtoMessage() {} + +func (x *ServerInfo) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ServerInfo.ProtoReflect.Descriptor instead. +func (*ServerInfo) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{1} +} + +func (x *ServerInfo) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *ServerInfo) GetPid() int32 { + if x != nil { + return x.Pid + } + return 0 +} + +func (x *ServerInfo) GetServerId() string { + if x != nil { + return x.ServerId + } + return "" +} + +func (x *ServerInfo) GetConcurrency() int32 { + if x != nil { + return x.Concurrency + } + return 0 +} + +func (x *ServerInfo) GetQueues() map[string]int32 { + if x != nil { + return x.Queues + } + return nil +} + +func (x *ServerInfo) GetStrictPriority() bool { + if x != nil { + return x.StrictPriority + } + return false +} + +func (x *ServerInfo) GetStatus() string { + if x != nil { + return x.Status + } + return "" +} + +func (x *ServerInfo) GetStartTime() *timestamppb.Timestamp { + if x != nil { + return x.StartTime + } + return nil +} + +func (x *ServerInfo) GetActiveWorkerCount() int32 { + if x != nil { + return x.ActiveWorkerCount + } + return 0 +} + +// WorkerInfo holds information about a running worker. +type WorkerInfo struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Host string `protobuf:"bytes,1,opt,name=host,proto3" json:"host,omitempty"` + Pid int32 `protobuf:"varint,2,opt,name=pid,proto3" json:"pid,omitempty"` + ServerId string `protobuf:"bytes,3,opt,name=server_id,json=serverId,proto3" json:"server_id,omitempty"` + TaskId string `protobuf:"bytes,4,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` + TaskType string `protobuf:"bytes,5,opt,name=task_type,json=taskType,proto3" json:"task_type,omitempty"` + TaskPayload []byte `protobuf:"bytes,6,opt,name=task_payload,json=taskPayload,proto3" json:"task_payload,omitempty"` + Queue string `protobuf:"bytes,7,opt,name=queue,proto3" json:"queue,omitempty"` + StartTime *timestamppb.Timestamp `protobuf:"bytes,8,opt,name=start_time,json=startTime,proto3" json:"start_time,omitempty"` + Deadline *timestamppb.Timestamp `protobuf:"bytes,9,opt,name=deadline,proto3" json:"deadline,omitempty"` +} + +func (x *WorkerInfo) Reset() { + *x = WorkerInfo{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WorkerInfo) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WorkerInfo) ProtoMessage() {} + +func (x *WorkerInfo) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WorkerInfo.ProtoReflect.Descriptor instead. +func (*WorkerInfo) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{2} +} + +func (x *WorkerInfo) GetHost() string { + if x != nil { + return x.Host + } + return "" +} + +func (x *WorkerInfo) GetPid() int32 { + if x != nil { + return x.Pid + } + return 0 +} + +func (x *WorkerInfo) GetServerId() string { + if x != nil { + return x.ServerId + } + return "" +} + +func (x *WorkerInfo) GetTaskId() string { + if x != nil { + return x.TaskId + } + return "" +} + +func (x *WorkerInfo) GetTaskType() string { + if x != nil { + return x.TaskType + } + return "" +} + +func (x *WorkerInfo) GetTaskPayload() []byte { + if x != nil { + return x.TaskPayload + } + return nil +} + +func (x *WorkerInfo) GetQueue() string { + if x != nil { + return x.Queue + } + return "" +} + +func (x *WorkerInfo) GetStartTime() *timestamppb.Timestamp { + if x != nil { + return x.StartTime + } + return nil +} + +func (x *WorkerInfo) GetDeadline() *timestamppb.Timestamp { + if x != nil { + return x.Deadline + } + return nil +} + +// SchedulerEntry holds information about a periodic task registered with a scheduler. +type SchedulerEntry struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Identifier of the scheduler entry. + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + // Periodic schedule spec of the entry. + Spec string `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"` + // Task type of the periodic task. + TaskType string `protobuf:"bytes,3,opt,name=task_type,json=taskType,proto3" json:"task_type,omitempty"` + // Task payload of the periodic task. + TaskPayload []byte `protobuf:"bytes,4,opt,name=task_payload,json=taskPayload,proto3" json:"task_payload,omitempty"` + // Options used to enqueue the periodic task. + EnqueueOptions []string `protobuf:"bytes,5,rep,name=enqueue_options,json=enqueueOptions,proto3" json:"enqueue_options,omitempty"` + // Next time the task will be enqueued. + NextEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=next_enqueue_time,json=nextEnqueueTime,proto3" json:"next_enqueue_time,omitempty"` + // Last time the task was enqueued. + // Zero time if task was never enqueued. + PrevEnqueueTime *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=prev_enqueue_time,json=prevEnqueueTime,proto3" json:"prev_enqueue_time,omitempty"` +} + +func (x *SchedulerEntry) Reset() { + *x = SchedulerEntry{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SchedulerEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SchedulerEntry) ProtoMessage() {} + +func (x *SchedulerEntry) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SchedulerEntry.ProtoReflect.Descriptor instead. +func (*SchedulerEntry) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{3} +} + +func (x *SchedulerEntry) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *SchedulerEntry) GetSpec() string { + if x != nil { + return x.Spec + } + return "" +} + +func (x *SchedulerEntry) GetTaskType() string { + if x != nil { + return x.TaskType + } + return "" +} + +func (x *SchedulerEntry) GetTaskPayload() []byte { + if x != nil { + return x.TaskPayload + } + return nil +} + +func (x *SchedulerEntry) GetEnqueueOptions() []string { + if x != nil { + return x.EnqueueOptions + } + return nil +} + +func (x *SchedulerEntry) GetNextEnqueueTime() *timestamppb.Timestamp { + if x != nil { + return x.NextEnqueueTime + } + return nil +} + +func (x *SchedulerEntry) GetPrevEnqueueTime() *timestamppb.Timestamp { + if x != nil { + return x.PrevEnqueueTime + } + return nil +} + +// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. +type SchedulerEnqueueEvent struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // ID of the task that was enqueued. + TaskId string `protobuf:"bytes,1,opt,name=task_id,json=taskId,proto3" json:"task_id,omitempty"` + // Time the task was enqueued. + EnqueueTime *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=enqueue_time,json=enqueueTime,proto3" json:"enqueue_time,omitempty"` +} + +func (x *SchedulerEnqueueEvent) Reset() { + *x = SchedulerEnqueueEvent{} + if protoimpl.UnsafeEnabled { + mi := &file_asynq_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SchedulerEnqueueEvent) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SchedulerEnqueueEvent) ProtoMessage() {} + +func (x *SchedulerEnqueueEvent) ProtoReflect() protoreflect.Message { + mi := &file_asynq_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SchedulerEnqueueEvent.ProtoReflect.Descriptor instead. +func (*SchedulerEnqueueEvent) Descriptor() ([]byte, []int) { + return file_asynq_proto_rawDescGZIP(), []int{4} +} + +func (x *SchedulerEnqueueEvent) GetTaskId() string { + if x != nil { + return x.TaskId + } + return "" +} + +func (x *SchedulerEnqueueEvent) GetEnqueueTime() *timestamppb.Timestamp { + if x != nil { + return x.EnqueueTime + } + return nil +} + +var File_asynq_proto protoreflect.FileDescriptor + +var file_asynq_proto_rawDesc = []byte{ + 0x0a, 0x0b, 0x61, 0x73, 0x79, 0x6e, 0x71, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x08, 0x74, + 0x75, 0x74, 0x6f, 0x72, 0x69, 0x61, 0x6c, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, + 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x83, 0x02, 0x0a, 0x0b, 0x54, 0x61, 0x73, + 0x6b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, + 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, + 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x72, 0x65, 0x74, 0x72, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x72, 0x65, 0x74, + 0x72, 0x79, 0x12, 0x18, 0x0a, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x18, 0x06, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x07, 0x72, 0x65, 0x74, 0x72, 0x69, 0x65, 0x64, 0x12, 0x1b, 0x0a, 0x09, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x08, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x4d, 0x73, 0x67, 0x12, 0x18, 0x0a, 0x07, 0x74, 0x69, 0x6d, + 0x65, 0x6f, 0x75, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x03, 0x52, 0x07, 0x74, 0x69, 0x6d, 0x65, + 0x6f, 0x75, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, + 0x09, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x12, + 0x1d, 0x0a, 0x0a, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x18, 0x0a, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x4b, 0x65, 0x79, 0x22, 0x92, + 0x03, 0x0a, 0x0a, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, + 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x6f, 0x73, + 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x03, + 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x69, 0x64, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x64, + 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, + 0x63, 0x79, 0x12, 0x38, 0x0a, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x74, 0x75, 0x74, 0x6f, 0x72, 0x69, 0x61, 0x6c, 0x2e, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x49, 0x6e, 0x66, 0x6f, 0x2e, 0x51, 0x75, 0x65, 0x75, 0x65, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x71, 0x75, 0x65, 0x75, 0x65, 0x73, 0x12, 0x27, 0x0a, 0x0f, + 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x5f, 0x70, 0x72, 0x69, 0x6f, 0x72, 0x69, 0x74, 0x79, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0e, 0x73, 0x74, 0x72, 0x69, 0x63, 0x74, 0x50, 0x72, 0x69, + 0x6f, 0x72, 0x69, 0x74, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, + 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x39, 0x0a, + 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x2e, 0x0a, 0x13, 0x61, 0x63, 0x74, 0x69, + 0x76, 0x65, 0x5f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, + 0x09, 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x61, 0x63, 0x74, 0x69, 0x76, 0x65, 0x57, 0x6f, 0x72, + 0x6b, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x1a, 0x39, 0x0a, 0x0b, 0x51, 0x75, 0x65, 0x75, + 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, + 0x02, 0x38, 0x01, 0x22, 0xb1, 0x02, 0x0a, 0x0a, 0x57, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x49, 0x6e, + 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x04, 0x68, 0x6f, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x05, 0x52, 0x03, 0x70, 0x69, 0x64, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x49, 0x64, 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, + 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x1b, + 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, + 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x14, + 0x0a, 0x05, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x12, 0x39, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x74, 0x69, + 0x6d, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, + 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, + 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, + 0x36, 0x0a, 0x08, 0x64, 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x18, 0x09, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x08, 0x64, + 0x65, 0x61, 0x64, 0x6c, 0x69, 0x6e, 0x65, 0x22, 0xad, 0x02, 0x0a, 0x0e, 0x53, 0x63, 0x68, 0x65, + 0x64, 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x70, + 0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x12, 0x1b, + 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x74, 0x61, 0x73, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x74, + 0x61, 0x73, 0x6b, 0x5f, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x0b, 0x74, 0x61, 0x73, 0x6b, 0x50, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x27, + 0x0a, 0x0f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0e, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x46, 0x0a, 0x11, 0x6e, 0x65, 0x78, 0x74, 0x5f, + 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, + 0x6e, 0x65, 0x78, 0x74, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, + 0x46, 0x0a, 0x11, 0x70, 0x72, 0x65, 0x76, 0x5f, 0x65, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x5f, + 0x74, 0x69, 0x6d, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, + 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0f, 0x70, 0x72, 0x65, 0x76, 0x45, 0x6e, 0x71, 0x75, + 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x6f, 0x0a, 0x15, 0x53, 0x63, 0x68, 0x65, 0x64, + 0x75, 0x6c, 0x65, 0x72, 0x45, 0x6e, 0x71, 0x75, 0x65, 0x75, 0x65, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x12, 0x17, 0x0a, 0x07, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x06, 0x74, 0x61, 0x73, 0x6b, 0x49, 0x64, 0x12, 0x3d, 0x0a, 0x0c, 0x65, 0x6e, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x65, 0x6e, 0x71, + 0x75, 0x65, 0x75, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x42, 0x29, 0x5a, 0x27, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x68, 0x69, 0x62, 0x69, 0x6b, 0x65, 0x6e, 0x2f, 0x61, + 0x73, 0x79, 0x6e, 0x71, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_asynq_proto_rawDescOnce sync.Once + file_asynq_proto_rawDescData = file_asynq_proto_rawDesc +) + +func file_asynq_proto_rawDescGZIP() []byte { + file_asynq_proto_rawDescOnce.Do(func() { + file_asynq_proto_rawDescData = protoimpl.X.CompressGZIP(file_asynq_proto_rawDescData) + }) + return file_asynq_proto_rawDescData +} + +var file_asynq_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_asynq_proto_goTypes = []interface{}{ + (*TaskMessage)(nil), // 0: tutorial.TaskMessage + (*ServerInfo)(nil), // 1: tutorial.ServerInfo + (*WorkerInfo)(nil), // 2: tutorial.WorkerInfo + (*SchedulerEntry)(nil), // 3: tutorial.SchedulerEntry + (*SchedulerEnqueueEvent)(nil), // 4: tutorial.SchedulerEnqueueEvent + nil, // 5: tutorial.ServerInfo.QueuesEntry + (*timestamppb.Timestamp)(nil), // 6: google.protobuf.Timestamp +} +var file_asynq_proto_depIdxs = []int32{ + 5, // 0: tutorial.ServerInfo.queues:type_name -> tutorial.ServerInfo.QueuesEntry + 6, // 1: tutorial.ServerInfo.start_time:type_name -> google.protobuf.Timestamp + 6, // 2: tutorial.WorkerInfo.start_time:type_name -> google.protobuf.Timestamp + 6, // 3: tutorial.WorkerInfo.deadline:type_name -> google.protobuf.Timestamp + 6, // 4: tutorial.SchedulerEntry.next_enqueue_time:type_name -> google.protobuf.Timestamp + 6, // 5: tutorial.SchedulerEntry.prev_enqueue_time:type_name -> google.protobuf.Timestamp + 6, // 6: tutorial.SchedulerEnqueueEvent.enqueue_time:type_name -> google.protobuf.Timestamp + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name +} + +func init() { file_asynq_proto_init() } +func file_asynq_proto_init() { + if File_asynq_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_asynq_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TaskMessage); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_asynq_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ServerInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_asynq_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WorkerInfo); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_asynq_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SchedulerEntry); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_asynq_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SchedulerEnqueueEvent); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_asynq_proto_rawDesc, + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_asynq_proto_goTypes, + DependencyIndexes: file_asynq_proto_depIdxs, + MessageInfos: file_asynq_proto_msgTypes, + }.Build() + File_asynq_proto = out.File + file_asynq_proto_rawDesc = nil + file_asynq_proto_goTypes = nil + file_asynq_proto_depIdxs = nil +} diff --git a/internal/proto/asynq.proto b/internal/proto/asynq.proto new file mode 100644 index 0000000..788f43d --- /dev/null +++ b/internal/proto/asynq.proto @@ -0,0 +1,148 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +syntax = "proto3"; +package asynq; + +import "google/protobuf/timestamp.proto"; + +option go_package = "github.com/hibiken/asynq/internal/proto"; + +// TaskMessage is the internal representation of a task with additional +// metadata fields. +message TaskMessage { + // Type indicates the kind of the task to be performed. + string type = 1; + + // Payload holds data needed to process the task. + bytes payload = 2; + + // Unique identifier for the task. + string id = 3; + + // Name of the queue to which this task belongs. + string queue = 4; + + // Max number of retries for this task. + int32 retry = 5; + + // Number of times this task has been retried so far. + int32 retried = 6; + + // Error message from the last failure. + string error_msg = 7; + + // Timeout specifies timeout in seconds. + // Use zero to indicate no timeout. + int64 timeout = 8; + + // Deadline specifies the deadline for the task in Unix time, + // the number of seconds elapsed since January 1, 1970 UTC. + // Use zero to indicate no deadline. + int64 deadline = 9; + + // UniqueKey holds the redis key used for uniqueness lock for this task. + // Empty string indicates that no uniqueness lock was used. + string unique_key = 10; +}; + +// ServerInfo holds information about a running server. +message ServerInfo { + // Host machine the server is running on. + string host = 1; + + // PID of the server process. + int32 pid = 2; + + // Unique identifier for this server. + string server_id = 3; + + // Maximum number of concurrency this server will use. + int32 concurrency = 4; + + // List of queue names with their priorities. + // The server will consume tasks from the queues and prioritize + // queues with higher priority numbers. + map queues = 5; + + // If set, the server will always consume tasks from a queue with higher + // priority. + bool strict_priority = 6; + + // Status indicates the status of the server. + string status = 7; + + // Time this server was started. + google.protobuf.Timestamp start_time = 8; + + // Number of workers currently processing tasks. + int32 active_worker_count = 9; +}; + +// WorkerInfo holds information about a running worker. +message WorkerInfo { + // Host matchine this worker is running on. + string host = 1; + + // PID of the process in which this worker is running. + int32 pid = 2; + + // ID of the server in which this worker is running. + string server_id = 3; + + // ID of the task this worker is processing. + string task_id = 4; + + // Type of the task this worker is processing. + string task_type = 5; + + // Payload of the task this worker is processing. + bytes task_payload = 6; + + // Name of the queue the task the worker is processing belongs. + string queue = 7; + + // Time this worker started processing the task. + google.protobuf.Timestamp start_time = 8; + + // Deadline by which the worker needs to complete processing + // the task. If worker exceeds the deadline, the task will fail. + google.protobuf.Timestamp deadline = 9; +}; + +// SchedulerEntry holds information about a periodic task registered +// with a scheduler. +message SchedulerEntry { + // Identifier of the scheduler entry. + string id = 1; + + // Periodic schedule spec of the entry. + string spec = 2; + + // Task type of the periodic task. + string task_type = 3; + + // Task payload of the periodic task. + bytes task_payload = 4; + + // Options used to enqueue the periodic task. + repeated string enqueue_options = 5; + + // Next time the task will be enqueued. + google.protobuf.Timestamp next_enqueue_time = 6; + + // Last time the task was enqueued. + // Zero time if task was never enqueued. + google.protobuf.Timestamp prev_enqueue_time = 7; +}; + +// SchedulerEnqueueEvent holds information about an enqueue event +// by a scheduler. +message SchedulerEnqueueEvent { + // ID of the task that was enqueued. + string task_id = 1; + + // Time the task was enqueued. + google.protobuf.Timestamp enqueue_time = 2; +}; \ No newline at end of file diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index de19fb5..36effb1 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -5,7 +5,6 @@ package rdb import ( - "encoding/json" "fmt" "strings" "time" @@ -343,7 +342,7 @@ func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessa reverse(data) var msgs []*base.TaskMessage for _, s := range data { - m, err := base.DecodeMessage(s) + m, err := base.DecodeMessage([]byte(s)) if err != nil { continue // bad data, ignore and continue } @@ -419,7 +418,7 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro if err != nil { return nil, err } - msg, err := base.DecodeMessage(s) + msg, err := base.DecodeMessage([]byte(s)) if err != nil { continue // bad data, ignore and continue } @@ -1013,46 +1012,47 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) { if err != nil { continue // skip bad data } - var info base.ServerInfo - if err := json.Unmarshal([]byte(data), &info); err != nil { + info, err := base.DecodeServerInfo([]byte(data)) + if err != nil { continue // skip bad data } - servers = append(servers, &info) + servers = append(servers, info) } return servers, nil } // Note: Script also removes stale keys. -var listWorkerKeysCmd = redis.NewScript(` +var listWorkersCmd = redis.NewScript(` local now = tonumber(ARGV[1]) local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) -return keys`) +local res = {} +for _, key in ipairs(keys) do + local vals = redis.call("HVALS", key) + for _, v in ipairs(vals) do + table.insert(res, v) + end +end +return res`) // ListWorkers returns the list of worker stats. func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { now := time.Now() - res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() + res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() if err != nil { return nil, err } - keys, err := cast.ToStringSliceE(res) + data, err := cast.ToStringSliceE(res) if err != nil { return nil, err } var workers []*base.WorkerInfo - for _, key := range keys { - data, err := r.client.HVals(key).Result() + for _, s := range data { + w, err := base.DecodeWorkerInfo([]byte(s)) if err != nil { continue // skip bad data } - for _, s := range data { - var w base.WorkerInfo - if err := json.Unmarshal([]byte(s), &w); err != nil { - continue // skip bad data - } - workers = append(workers, &w) - } + workers = append(workers, w) } return workers, nil } @@ -1082,11 +1082,11 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) { continue // skip bad data } for _, s := range data { - var e base.SchedulerEntry - if err := json.Unmarshal([]byte(s), &e); err != nil { + e, err := base.DecodeSchedulerEntry([]byte(s)) + if err != nil { continue // skip bad data } - entries = append(entries, &e) + entries = append(entries, e) } } return entries, nil @@ -1105,11 +1105,11 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas if err != nil { return nil, err } - var e base.SchedulerEnqueueEvent - if err := json.Unmarshal([]byte(data), &e); err != nil { + e, err := base.DecodeSchedulerEnqueueEvent([]byte(data)) + if err != nil { return nil, err } - events = append(events, &e) + events = append(events, e) } return events, nil } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 58025c9..0e89673 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -6,7 +6,6 @@ package rdb import ( - "encoding/json" "errors" "fmt" "time" @@ -146,7 +145,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti if err != nil { return nil, time.Time{}, err } - if msg, err = base.DecodeMessage(encoded); err != nil { + if msg, err = base.DecodeMessage([]byte(encoded)); err != nil { return nil, time.Time{}, err } return msg, time.Unix(d, 0), nil @@ -615,7 +614,7 @@ func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*bas return nil, err } for _, s := range data { - msg, err := base.DecodeMessage(s) + msg, err := base.DecodeMessage([]byte(s)) if err != nil { return nil, err } @@ -643,14 +642,14 @@ return redis.status_reply("OK")`) // WriteServerState writes server state data to redis with expiration set to the value ttl. func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error { - bytes, err := json.Marshal(info) + bytes, err := base.EncodeServerInfo(info) if err != nil { return err } exp := time.Now().Add(ttl).UTC() args := []interface{}{ttl.Seconds(), bytes} // args to the lua script for _, w := range workers { - bytes, err := json.Marshal(w) + bytes, err := base.EncodeWorkerInfo(w) if err != nil { continue // skip bad data } @@ -702,7 +701,7 @@ return redis.status_reply("OK")`) func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error { args := []interface{}{ttl.Seconds()} for _, e := range entries { - bytes, err := json.Marshal(e) + bytes, err := base.EncodeSchedulerEntry(e) if err != nil { continue // skip bad data } @@ -757,7 +756,7 @@ const maxEvents = 1000 // RecordSchedulerEnqueueEvent records the time when the given task was enqueued. func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { key := base.SchedulerHistoryKey(entryID) - data, err := json.Marshal(event) + data, err := base.EncodeSchedulerEnqueueEvent(event) if err != nil { return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 780cc56..23c32ea 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -101,7 +101,7 @@ func TestEnqueueUnique(t *testing.T) { m1 := base.TaskMessage{ ID: uuid.New(), Type: "email", - Payload: map[string]interface{}{"user_id": float64(123)}, + Payload: map[string]interface{}{"user_id": json.Number("123")}, Queue: base.DefaultQueueName, UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), } @@ -1474,7 +1474,7 @@ func TestWriteServerState(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, - Started: time.Now(), + Started: time.Now().UTC(), Status: "running", ActiveWorkerCount: 0, } @@ -1487,12 +1487,11 @@ func TestWriteServerState(t *testing.T) { // Check ServerInfo was written correctly. skey := base.ServerInfoKey(host, pid, serverID) data := r.client.Get(skey).Val() - var got base.ServerInfo - err = json.Unmarshal([]byte(data), &got) + got, err := base.DecodeServerInfo([]byte(data)) if err != nil { - t.Fatalf("could not decode json: %v", err) + t.Fatalf("could not decode server info: %v", err) } - if diff := cmp.Diff(info, got); diff != "" { + if diff := cmp.Diff(info, *got); diff != "" { t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s", got, info, diff) } @@ -1565,7 +1564,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { Concurrency: 10, Queues: map[string]int{"default": 2, "email": 5, "low": 1}, StrictPriority: false, - Started: time.Now().Add(-10 * time.Minute), + Started: time.Now().Add(-10 * time.Minute).UTC(), Status: "running", ActiveWorkerCount: len(workers), } @@ -1578,12 +1577,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) { // Check ServerInfo was written correctly. skey := base.ServerInfoKey(host, pid, serverID) data := r.client.Get(skey).Val() - var got base.ServerInfo - err = json.Unmarshal([]byte(data), &got) + got, err := base.DecodeServerInfo([]byte(data)) if err != nil { - t.Fatalf("could not decode json: %v", err) + t.Fatalf("could not decode server info: %v", err) } - if diff := cmp.Diff(serverInfo, got); diff != "" { + if diff := cmp.Diff(serverInfo, *got); diff != "" { t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s", got, serverInfo, diff) } @@ -1607,11 +1605,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) { } var gotWorkers []*base.WorkerInfo for _, val := range wdata { - var w base.WorkerInfo - if err := json.Unmarshal([]byte(val), &w); err != nil { + w, err := base.DecodeWorkerInfo([]byte(val)) + if err != nil { t.Fatalf("could not unmarshal worker's data: %v", err) } - gotWorkers = append(gotWorkers, &w) + gotWorkers = append(gotWorkers, w) } if diff := cmp.Diff(workers, gotWorkers, h.SortWorkerInfoOpt); diff != "" { t.Errorf("persisted workers info was %v, want %v; (-want,+got)\n%s",