From a718697b1f3466839529cbcd276f0de4c07851f5 Mon Sep 17 00:00:00 2001 From: Herb Stahl Date: Thu, 1 Sep 2022 14:13:39 -0700 Subject: [PATCH] https://github.com/hibiken/asynq/issues/441 --- client.go | 39 ++++++++++++++++++++++++++--- docker/docker-compose.yml | 14 +++++++++++ go.mod | 1 - go.sum | 35 +++----------------------- inspector.go | 37 ++++++++++++++++++++++++++-- internal/base/base.go | 5 ++++ internal/rdb/inspect.go | 16 ++++++------ internal/rdb/rdb.go | 52 ++++++++++++++++++++++++++++++--------- scheduler.go | 15 ++++++++++- server.go | 36 ++++++++++++++++++++++++++- 10 files changed, 191 insertions(+), 59 deletions(-) create mode 100644 docker/docker-compose.yml diff --git a/client.go b/client.go index 7948036..bda4910 100644 --- a/client.go +++ b/client.go @@ -26,14 +26,47 @@ import ( type Client struct { broker base.Broker } +type ClientConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateClientConfig(cfg *ClientConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if *(cfg.MaxArchiveSize) < 0 { + value := 1 + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewClientWithConfig returns a new Client instance given a redis connection option. +func NewClientWithConfig(r RedisConnOpt, cfg ClientConfig) *Client { + validateClientConfig(&cfg) -// NewClient returns a new Client instance given a redis connection option. -func NewClient(r RedisConnOpt) *Client { c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) } - return &Client{broker: rdb.NewRDB(c)} + return &Client{broker: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + })} +} + +// NewClient returns a new Client instance given a redis connection option. +func NewClient(r RedisConnOpt) *Client { + return NewClientWithConfig(r, ClientConfig{}) } type OptionType int diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..889571b --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,14 @@ + +version: '3.8' +services: + cache: + image: redis:6.2-alpine + restart: always + ports: + - '6379:6379' + command: redis-server --save 20 1 --loglevel warning + volumes: + - cache:/data +volumes: + cache: + driver: local \ No newline at end of file diff --git a/go.mod b/go.mod index 1a8aa53..316d002 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/golang/protobuf v1.4.2 github.com/google/go-cmp v0.5.6 github.com/google/uuid v1.2.0 - github.com/kr/pretty v0.1.0 // indirect github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cast v1.3.1 go.uber.org/goleak v1.1.12 diff --git a/go.sum b/go.sum index 2925acd..2461457 100644 --- a/go.sum +++ b/go.sum @@ -1,30 +1,22 @@ -cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473 h1:4cmBvAEBNJaGARUEs3/suWRyfyBfhf7I60WBZq+bv2w= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= -github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-redis/redis/v8 v8.11.2 h1:WqlSpAwz8mxDSMCvbyz1Mkiqe0LE5OY4j3lgkvu1Ts0= github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -45,11 +37,9 @@ github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -65,40 +55,30 @@ github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/yuin/goldmark v1.2.1 h1:ruQGxdhGHe7FWOJPT0mKs5+pD2Xs1Bm/kdGlHO04FmM= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.3.5 h1:dPmz1Snjq0kmkz159iL7S6WzdahUTHnHB5M56WFVifs= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.uber.org/goleak v0.10.0 h1:G3eWbSNIskeRqtsN/1uI5B+eP73y3JUuBsv9AZjehb4= -go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/exp v0.0.0-20190121172915-509febef88a4 h1:c2HOrn5iMezYjSlGPncknSEr/8x5LELb/ilJbXi9DEA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= -golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -109,16 +89,13 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -131,7 +108,6 @@ golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210112080510-489259a85091 h1:DMyOG0U+gKfu8JZzg2UQe9MeaC1X+xQWlAKcRnjxjCw= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= @@ -148,8 +124,8 @@ golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e h1:4nW4NLDYnU28ojHaHO8OVxFHk/aQ33U01a9cjED+pzE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -157,15 +133,12 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= -google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= -google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= -google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= @@ -181,7 +154,6 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= @@ -191,5 +163,4 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= -honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc h1:/hemPrYIhOhy8zYrNj+069zDB68us2sMGsfkFJO0iZs= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/inspector.go b/inspector.go index e583cf6..a59b3cc 100644 --- a/inspector.go +++ b/inspector.go @@ -21,18 +21,51 @@ import ( type Inspector struct { rdb *rdb.RDB } +type InspectorConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateInspectorConfig(cfg *InspectorConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if *(cfg.MaxArchiveSize) < 0 { + value := 1 + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} // New returns a new instance of Inspector. -func NewInspector(r RedisConnOpt) *Inspector { +func NewInspectorWithConfig(r RedisConnOpt, cfg InspectorConfig) *Inspector { + validateInspectorConfig(&cfg) + c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) } return &Inspector{ - rdb: rdb.NewRDB(c), + rdb: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + }), } } +// New returns a new instance of Inspector. +func NewInspector(r RedisConnOpt) *Inspector { + return NewInspectorWithConfig(r, InspectorConfig{}) +} + // Close closes the connection with redis. func (i *Inspector) Close() error { return i.rdb.Close() diff --git a/internal/base/base.go b/internal/base/base.go index ec342f8..9538130 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -31,6 +31,11 @@ const DefaultQueueName = "default" // DefaultQueue is the redis key for the default queue. var DefaultQueue = PendingKey(DefaultQueueName) +const ( + DefaultMaxArchiveSize = 10000 // maximum number of tasks in archive + DefaultArchivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently +) + // Global Redis keys. const ( AllServers = "asynq:servers" // ZSET diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 6deb4f1..101a7a9 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -1181,8 +1181,8 @@ func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), gname, } @@ -1237,8 +1237,8 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), } res, err := archiveAllPendingCmd.Run(context.Background(), r.client, keys, argv...).Result() @@ -1328,8 +1328,8 @@ func (r *RDB) ArchiveTask(qname, id string) error { argv := []interface{}{ id, now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.QueueKeyPrefix(qname), base.GroupKeyPrefix(qname), } @@ -1393,8 +1393,8 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) { now := r.clock.Now() argv := []interface{}{ now.Unix(), - now.AddDate(0, 0, -archivedExpirationInDays).Unix(), - maxArchiveSize, + now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(), + *r.config.MaxArchiveSize, base.TaskKeyPrefix(qname), qname, } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 7b25f15..42b47b9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -28,14 +28,46 @@ const LeaseDuration = 30 * time.Second type RDB struct { client redis.UniversalClient clock timeutil.Clock + config RDBConfig +} +type RDBConfig struct { + MaxArchiveSize *int + ArchivedExpirationInDays *int +} + +func validateRDBConfig(cfg *RDBConfig) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if *(cfg.MaxArchiveSize) < 0 { + value := 1 + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + +// NewRDB returns a new instance of RDB. +func NewRDBWithConfig(client redis.UniversalClient, cfg RDBConfig) *RDB { + validateRDBConfig(&cfg) + + return &RDB{ + client: client, + clock: timeutil.NewRealClock(), + config: cfg, + } } // NewRDB returns a new instance of RDB. func NewRDB(client redis.UniversalClient) *RDB { - return &RDB{ - client: client, - clock: timeutil.NewRealClock(), - } + return NewRDBWithConfig(client, RDBConfig{}) } // Close closes the connection with redis server. @@ -816,11 +848,6 @@ func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.T return r.runScript(ctx, op, retryCmd, keys, argv...) } -const ( - maxArchiveSize = 10000 // maximum number of tasks in archive - archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently -) - // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:lease @@ -869,6 +896,9 @@ return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. // It also trims the archive by timestamp and set size. func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error { + if *(r.config.MaxArchiveSize) <= 0 { + return nil + } var op errors.Op = "rdb.Archive" now := r.clock.Now() modified := *msg @@ -878,7 +908,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err)) } - cutoff := now.AddDate(0, 0, -archivedExpirationInDays) + cutoff := now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)) expireAt := now.Add(statsTTL) keys := []string{ base.TaskKey(msg.Queue, msg.ID), @@ -895,7 +925,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) encoded, now.Unix(), cutoff.Unix(), - maxArchiveSize, + *r.config.MaxArchiveSize, expireAt.Unix(), int64(math.MaxInt64), } diff --git a/scheduler.go b/scheduler.go index 12c702c..940c527 100644 --- a/scheduler.go +++ b/scheduler.go @@ -73,7 +73,10 @@ func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { state: &serverState{value: srvStateNew}, logger: logger, client: NewClient(r), - rdb: rdb.NewRDB(c), + rdb: rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: opts.MaxArchiveSize, + ArchivedExpirationInDays: opts.ArchivedExpirationInDays, + }), cron: cron.New(cron.WithLocation(loc)), location: loc, done: make(chan struct{}), @@ -121,6 +124,16 @@ type SchedulerOpts struct { // EnqueueErrorHandler gets called when scheduler cannot enqueue a registered task // due to an error. EnqueueErrorHandler func(task *Task, opts []Option, err error) + + // MaxArchiveSize specifies the maximum size of the archive that can be created by the server. + // + // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + MaxArchiveSize *int + + // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. + // + // If unset, DefaultArchivedExpirationInDays is used. The value must be greater than zero. + ArchivedExpirationInDays *int } // enqueueJob encapsulates the job of enqueuing a task and recording the event. diff --git a/server.go b/server.go index 4bf04e0..4e7d3b9 100644 --- a/server.go +++ b/server.go @@ -220,6 +220,16 @@ type Config struct { // // If unset or nil, the group aggregation feature will be disabled on the server. GroupAggregator GroupAggregator + + // MaxArchiveSize specifies the maximum size of the archive that can be created by the server. + // + // If unset the DefaultMaxArchiveSize is used. If set to a zero or a negative value, nothing will be archived. + MaxArchiveSize *int + + // ArchivedExpirationInDays specifies the number of days after which archived tasks are deleted. + // + // If unset, DefaultArchivedExpirationInDays is used. The value must be greater than zero. + ArchivedExpirationInDays *int } // GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler. @@ -389,9 +399,30 @@ const ( defaultGroupGracePeriod = 1 * time.Minute ) +func validateConfig(cfg *Config) { + if cfg.MaxArchiveSize == nil { + value := base.DefaultMaxArchiveSize + cfg.MaxArchiveSize = &value + } + if *(cfg.MaxArchiveSize) < 0 { + value := 1 + cfg.MaxArchiveSize = &value + } + if cfg.ArchivedExpirationInDays == nil { + value := base.DefaultArchivedExpirationInDays + cfg.ArchivedExpirationInDays = &value + } + if *(cfg.ArchivedExpirationInDays) < 0 { + value := 1 + cfg.ArchivedExpirationInDays = &value + } +} + // NewServer returns a new Server given a redis connection option // and server configuration. func NewServer(r RedisConnOpt, cfg Config) *Server { + validateConfig(&cfg) + c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("asynq: unsupported RedisConnOpt type %T", r)) @@ -451,7 +482,10 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { } logger.SetLevel(toInternalLogLevel(loglevel)) - rdb := rdb.NewRDB(c) + rdb := rdb.NewRDBWithConfig(c, rdb.RDBConfig{ + MaxArchiveSize: cfg.MaxArchiveSize, + ArchivedExpirationInDays: cfg.ArchivedExpirationInDays, + }) starting := make(chan *workerInfo) finished := make(chan *base.TaskMessage) syncCh := make(chan *syncRequest)