mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Extract manager to its own type
This commit is contained in:
		
							
								
								
									
										82
									
								
								asynq.go
									
									
									
									
									
								
							
							
						
						
									
										82
									
								
								asynq.go
									
									
									
									
									
								
							| @@ -12,7 +12,6 @@ TODOs: | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"math" | ||||
| 	"math/rand" | ||||
| 	"strconv" | ||||
| @@ -100,20 +99,18 @@ func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error { | ||||
| 	return zadd(c.rdb, scheduled, float64(executeAt.Unix()), msg) | ||||
| } | ||||
|  | ||||
| //-------------------- Workers -------------------- | ||||
| //-------------------- Launcher -------------------- | ||||
|  | ||||
| // Launcher starts the manager and poller. | ||||
| type Launcher struct { | ||||
| 	rdb *redis.Client | ||||
|  | ||||
| 	// poolTokens is a counting semaphore to ensure the number of active workers | ||||
| 	// does not exceed the limit. | ||||
| 	poolTokens chan struct{} | ||||
|  | ||||
| 	// running indicates whether the workes are currently running. | ||||
| 	running bool | ||||
|  | ||||
| 	poller *poller | ||||
|  | ||||
| 	manager *manager | ||||
| } | ||||
|  | ||||
| // NewLauncher creates and returns a new Launcher. | ||||
| @@ -125,71 +122,38 @@ func NewLauncher(poolSize int, opt *RedisOpt) *Launcher { | ||||
| 		avgInterval: 5 * time.Second, | ||||
| 		zsets:       []string{scheduled, retry}, | ||||
| 	} | ||||
| 	manager := newManager(rdb, poolSize, nil) | ||||
| 	return &Launcher{ | ||||
| 		rdb:        rdb, | ||||
| 		poller:     poller, | ||||
| 		poolTokens: make(chan struct{}, poolSize), | ||||
| 		rdb:     rdb, | ||||
| 		poller:  poller, | ||||
| 		manager: manager, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // TaskHandler handles a given task and report any error. | ||||
| type TaskHandler func(*Task) error | ||||
|  | ||||
| // Start starts the workers and scheduler with a given handler. | ||||
| func (w *Launcher) Start(handler TaskHandler) { | ||||
| 	if w.running { | ||||
| // Start starts the manager and poller. | ||||
| func (l *Launcher) Start(handler TaskHandler) { | ||||
| 	if l.running { | ||||
| 		return | ||||
| 	} | ||||
| 	w.running = true | ||||
| 	l.running = true | ||||
| 	l.manager.handler = handler | ||||
|  | ||||
| 	w.poller.start() | ||||
| 	l.poller.start() | ||||
| 	l.manager.start() | ||||
| } | ||||
|  | ||||
| 	for { | ||||
| 		// pull message out of the queue and process it | ||||
| 		// TODO(hibiken): sort the list of queues in order of priority | ||||
| 		res, err := w.rdb.BLPop(5*time.Second, listQueues(w.rdb)...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added. | ||||
| 		if err != nil { | ||||
| 			if err != redis.Nil { | ||||
| 				log.Printf("BLPOP command failed: %v\n", err) | ||||
| 			} | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		q, data := res[0], res[1] | ||||
| 		fmt.Printf("perform task %v from %s\n", data, q) | ||||
| 		var msg taskMessage | ||||
| 		err = json.Unmarshal([]byte(data), &msg) | ||||
| 		if err != nil { | ||||
| 			log.Printf("[Servere Error] could not parse json encoded message %s: %v", data, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		t := &Task{Type: msg.Type, Payload: msg.Payload} | ||||
| 		w.poolTokens <- struct{}{} // acquire token | ||||
| 		go func(task *Task) { | ||||
| 			defer func() { <-w.poolTokens }() // release token | ||||
| 			err := handler(task) | ||||
| 			if err != nil { | ||||
| 				if msg.Retried >= msg.Retry { | ||||
| 					fmt.Println("Retry exhausted!!!") | ||||
| 					if err := kill(w.rdb, &msg); err != nil { | ||||
| 						log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err) | ||||
| 					} | ||||
| 					return | ||||
| 				} | ||||
| 				fmt.Println("RETRY!!!") | ||||
| 				retryAt := time.Now().Add(delaySeconds((msg.Retried))) | ||||
| 				fmt.Printf("[DEBUG] retying the task in %v\n", retryAt.Sub(time.Now())) | ||||
| 				msg.Retried++ | ||||
| 				msg.ErrorMsg = err.Error() | ||||
| 				if err := zadd(w.rdb, retry, float64(retryAt.Unix()), &msg); err != nil { | ||||
| 					// TODO(hibiken): Not sure how to handle this error | ||||
| 					log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) | ||||
| 					return | ||||
| 				} | ||||
|  | ||||
| 			} | ||||
| 		}(t) | ||||
| // Stop stops both manager and poller. | ||||
| func (l *Launcher) Stop() { | ||||
| 	if !l.running { | ||||
| 		return | ||||
| 	} | ||||
| 	l.running = false | ||||
|  | ||||
| 	l.poller.terminate() | ||||
| 	l.manager.terminate() | ||||
| } | ||||
|  | ||||
| // push pushes the task to the specified queue to get picked up by a worker. | ||||
|   | ||||
							
								
								
									
										101
									
								
								manager.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										101
									
								
								manager.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,101 @@ | ||||
| package asynq | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"log" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/go-redis/redis/v7" | ||||
| ) | ||||
|  | ||||
| type manager struct { | ||||
| 	rdb *redis.Client | ||||
|  | ||||
| 	handler TaskHandler | ||||
|  | ||||
| 	// sema is a counting semaphore to ensure the number of active workers | ||||
| 	// does not exceed the limit | ||||
| 	sema chan struct{} | ||||
|  | ||||
| 	done chan struct{} | ||||
| } | ||||
|  | ||||
| func newManager(rdb *redis.Client, numWorkers int, handler TaskHandler) *manager { | ||||
| 	return &manager{ | ||||
| 		rdb:     rdb, | ||||
| 		handler: handler, | ||||
| 		sema:    make(chan struct{}, numWorkers), | ||||
| 		done:    make(chan struct{}), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (m *manager) terminate() { | ||||
| 	m.done <- struct{}{} | ||||
| } | ||||
|  | ||||
| func (m *manager) start() { | ||||
| 	go func() { | ||||
| 		for { | ||||
| 			select { | ||||
| 			case <-m.done: | ||||
| 				m.shutdown() | ||||
| 			default: | ||||
| 				m.processTasks() | ||||
| 			} | ||||
| 		} | ||||
| 	}() | ||||
| } | ||||
|  | ||||
| func (m *manager) processTasks() { | ||||
| 	// pull message out of the queue and process it | ||||
| 	// TODO(hibiken): sort the list of queues in order of priority | ||||
| 	res, err := m.rdb.BLPop(5*time.Second, listQueues(m.rdb)...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added. | ||||
| 	if err != nil { | ||||
| 		if err != redis.Nil { | ||||
| 			log.Printf("BLPOP command failed: %v\n", err) | ||||
| 		} | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	q, data := res[0], res[1] | ||||
| 	fmt.Printf("perform task %v from %s\n", data, q) | ||||
| 	var msg taskMessage | ||||
| 	err = json.Unmarshal([]byte(data), &msg) | ||||
| 	if err != nil { | ||||
| 		log.Printf("[Servere Error] could not parse json encoded message %s: %v", data, err) | ||||
| 		return | ||||
| 	} | ||||
| 	t := &Task{Type: msg.Type, Payload: msg.Payload} | ||||
| 	m.sema <- struct{}{} // acquire token | ||||
| 	go func(task *Task) { | ||||
| 		defer func() { <-m.sema }() // release token | ||||
| 		err := m.handler(task) | ||||
| 		if err != nil { | ||||
| 			if msg.Retried >= msg.Retry { | ||||
| 				fmt.Println("Retry exhausted!!!") | ||||
| 				if err := kill(m.rdb, &msg); err != nil { | ||||
| 					log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err) | ||||
| 				} | ||||
| 				return | ||||
| 			} | ||||
| 			fmt.Println("RETRY!!!") | ||||
| 			retryAt := time.Now().Add(delaySeconds((msg.Retried))) | ||||
| 			fmt.Printf("[DEBUG] retying the task in %v\n", retryAt.Sub(time.Now())) | ||||
| 			msg.Retried++ | ||||
| 			msg.ErrorMsg = err.Error() | ||||
| 			if err := zadd(m.rdb, retry, float64(retryAt.Unix()), &msg); err != nil { | ||||
| 				// TODO(hibiken): Not sure how to handle this error | ||||
| 				log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) | ||||
| 				return | ||||
| 			} | ||||
| 		} | ||||
| 	}(t) | ||||
| } | ||||
|  | ||||
| func (m *manager) shutdown() { | ||||
| 	// TODO(hibiken): implement this. Gracefully shutdown all active goroutines. | ||||
| 	fmt.Println("-------------[Manager]---------------") | ||||
| 	fmt.Println("Manager shutting down...") | ||||
| 	fmt.Println("------------------------------------") | ||||
| } | ||||
		Reference in New Issue
	
	Block a user