mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-23 22:26:13 +08:00
This commit is contained in:
@@ -1181,8 +1181,8 @@ func (r *RDB) ArchiveAllAggregatingTasks(qname, gname string) (int64, error) {
|
||||
now := r.clock.Now()
|
||||
argv := []interface{}{
|
||||
now.Unix(),
|
||||
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
|
||||
maxArchiveSize,
|
||||
now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(),
|
||||
*r.config.MaxArchiveSize,
|
||||
base.TaskKeyPrefix(qname),
|
||||
gname,
|
||||
}
|
||||
@@ -1237,8 +1237,8 @@ func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
|
||||
now := r.clock.Now()
|
||||
argv := []interface{}{
|
||||
now.Unix(),
|
||||
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
|
||||
maxArchiveSize,
|
||||
now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(),
|
||||
*r.config.MaxArchiveSize,
|
||||
base.TaskKeyPrefix(qname),
|
||||
}
|
||||
res, err := archiveAllPendingCmd.Run(context.Background(), r.client, keys, argv...).Result()
|
||||
@@ -1328,8 +1328,8 @@ func (r *RDB) ArchiveTask(qname, id string) error {
|
||||
argv := []interface{}{
|
||||
id,
|
||||
now.Unix(),
|
||||
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
|
||||
maxArchiveSize,
|
||||
now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(),
|
||||
*r.config.MaxArchiveSize,
|
||||
base.QueueKeyPrefix(qname),
|
||||
base.GroupKeyPrefix(qname),
|
||||
}
|
||||
@@ -1393,8 +1393,8 @@ func (r *RDB) archiveAll(src, dst, qname string) (int64, error) {
|
||||
now := r.clock.Now()
|
||||
argv := []interface{}{
|
||||
now.Unix(),
|
||||
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
|
||||
maxArchiveSize,
|
||||
now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays)).Unix(),
|
||||
*r.config.MaxArchiveSize,
|
||||
base.TaskKeyPrefix(qname),
|
||||
qname,
|
||||
}
|
||||
|
@@ -28,14 +28,46 @@ const LeaseDuration = 30 * time.Second
|
||||
type RDB struct {
|
||||
client redis.UniversalClient
|
||||
clock timeutil.Clock
|
||||
config RDBConfig
|
||||
}
|
||||
type RDBConfig struct {
|
||||
MaxArchiveSize *int
|
||||
ArchivedExpirationInDays *int
|
||||
}
|
||||
|
||||
func validateRDBConfig(cfg *RDBConfig) {
|
||||
if cfg.MaxArchiveSize == nil {
|
||||
value := base.DefaultMaxArchiveSize
|
||||
cfg.MaxArchiveSize = &value
|
||||
}
|
||||
if *(cfg.MaxArchiveSize) < 0 {
|
||||
value := 1
|
||||
cfg.MaxArchiveSize = &value
|
||||
}
|
||||
if cfg.ArchivedExpirationInDays == nil {
|
||||
value := base.DefaultArchivedExpirationInDays
|
||||
cfg.ArchivedExpirationInDays = &value
|
||||
}
|
||||
if *(cfg.ArchivedExpirationInDays) < 0 {
|
||||
value := 1
|
||||
cfg.ArchivedExpirationInDays = &value
|
||||
}
|
||||
}
|
||||
|
||||
// NewRDB returns a new instance of RDB.
|
||||
func NewRDBWithConfig(client redis.UniversalClient, cfg RDBConfig) *RDB {
|
||||
validateRDBConfig(&cfg)
|
||||
|
||||
return &RDB{
|
||||
client: client,
|
||||
clock: timeutil.NewRealClock(),
|
||||
config: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
// NewRDB returns a new instance of RDB.
|
||||
func NewRDB(client redis.UniversalClient) *RDB {
|
||||
return &RDB{
|
||||
client: client,
|
||||
clock: timeutil.NewRealClock(),
|
||||
}
|
||||
return NewRDBWithConfig(client, RDBConfig{})
|
||||
}
|
||||
|
||||
// Close closes the connection with redis server.
|
||||
@@ -816,11 +848,6 @@ func (r *RDB) Retry(ctx context.Context, msg *base.TaskMessage, processAt time.T
|
||||
return r.runScript(ctx, op, retryCmd, keys, argv...)
|
||||
}
|
||||
|
||||
const (
|
||||
maxArchiveSize = 10000 // maximum number of tasks in archive
|
||||
archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently
|
||||
)
|
||||
|
||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||
// KEYS[2] -> asynq:{<qname>}:active
|
||||
// KEYS[3] -> asynq:{<qname>}:lease
|
||||
@@ -869,6 +896,9 @@ return redis.status_reply("OK")`)
|
||||
// Archive sends the given task to archive, attaching the error message to the task.
|
||||
// It also trims the archive by timestamp and set size.
|
||||
func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string) error {
|
||||
if *(r.config.MaxArchiveSize) <= 0 {
|
||||
return nil
|
||||
}
|
||||
var op errors.Op = "rdb.Archive"
|
||||
now := r.clock.Now()
|
||||
modified := *msg
|
||||
@@ -878,7 +908,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string)
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err))
|
||||
}
|
||||
cutoff := now.AddDate(0, 0, -archivedExpirationInDays)
|
||||
cutoff := now.AddDate(0, 0, -(*r.config.ArchivedExpirationInDays))
|
||||
expireAt := now.Add(statsTTL)
|
||||
keys := []string{
|
||||
base.TaskKey(msg.Queue, msg.ID),
|
||||
@@ -895,7 +925,7 @@ func (r *RDB) Archive(ctx context.Context, msg *base.TaskMessage, errMsg string)
|
||||
encoded,
|
||||
now.Unix(),
|
||||
cutoff.Unix(),
|
||||
maxArchiveSize,
|
||||
*r.config.MaxArchiveSize,
|
||||
expireAt.Unix(),
|
||||
int64(math.MaxInt64),
|
||||
}
|
||||
|
Reference in New Issue
Block a user