mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Extract polling logic to poller type
This commit is contained in:
		
							
								
								
									
										54
									
								
								asynq.go
									
									
									
									
									
								
							
							
						
						
									
										54
									
								
								asynq.go
									
									
									
									
									
								
							| @@ -112,13 +112,22 @@ type Workers struct { | ||||
|  | ||||
| 	// running indicates whether the workes are currently running. | ||||
| 	running bool | ||||
|  | ||||
| 	poller *poller | ||||
| } | ||||
|  | ||||
| // NewWorkers creates and returns a new Workers. | ||||
| func NewWorkers(poolSize int, opt *RedisOpt) *Workers { | ||||
| 	rdb := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) | ||||
| 	poller := &poller{ | ||||
| 		rdb:         rdb, | ||||
| 		done:        make(chan struct{}), | ||||
| 		avgInterval: 5 * time.Second, | ||||
| 		zsets:       []string{scheduled, retry}, | ||||
| 	} | ||||
| 	return &Workers{ | ||||
| 		rdb:        rdb, | ||||
| 		poller:     poller, | ||||
| 		poolTokens: make(chan struct{}, poolSize), | ||||
| 	} | ||||
| } | ||||
| @@ -133,7 +142,7 @@ func (w *Workers) Run(handler TaskHandler) { | ||||
| 	} | ||||
| 	w.running = true | ||||
|  | ||||
| 	go w.pollDeferred() | ||||
| 	w.poller.start() | ||||
|  | ||||
| 	for { | ||||
| 		// pull message out of the queue and process it | ||||
| @@ -183,49 +192,6 @@ func (w *Workers) Run(handler TaskHandler) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (w *Workers) pollDeferred() { | ||||
| 	zsets := []string{scheduled, retry} | ||||
| 	for { | ||||
| 		for _, zset := range zsets { | ||||
| 			// Get next items in the queue with scores (time to execute) <= now. | ||||
| 			now := time.Now().Unix() | ||||
| 			fmt.Printf("[DEBUG] polling ZSET %q\n", zset) | ||||
| 			jobs, err := w.rdb.ZRangeByScore(zset, | ||||
| 				&redis.ZRangeBy{ | ||||
| 					Min: "-inf", | ||||
| 					Max: strconv.Itoa(int(now))}).Result() | ||||
| 			fmt.Printf("len(jobs) = %d\n", len(jobs)) | ||||
| 			if err != nil { | ||||
| 				log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err) | ||||
| 				continue | ||||
| 			} | ||||
| 			if len(jobs) == 0 { | ||||
| 				fmt.Println("jobs empty") | ||||
| 				continue | ||||
| 			} | ||||
|  | ||||
| 			for _, j := range jobs { | ||||
| 				var msg taskMessage | ||||
| 				err = json.Unmarshal([]byte(j), &msg) | ||||
| 				if err != nil { | ||||
| 					fmt.Println("unmarshal failed") | ||||
| 					continue | ||||
| 				} | ||||
|  | ||||
| 				if w.rdb.ZRem(zset, j).Val() > 0 { | ||||
| 					err = push(w.rdb, &msg) | ||||
| 					if err != nil { | ||||
| 						log.Printf("could not push task to queue %q: %v", msg.Queue, err) | ||||
| 						// TODO(hibiken): Handle this error properly. Add back to scheduled ZSET? | ||||
| 						continue | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		time.Sleep(5 * time.Second) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // push pushes the task to the specified queue to get picked up by a worker. | ||||
| func push(rdb *redis.Client, msg *taskMessage) error { | ||||
| 	bytes, err := json.Marshal(msg) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user