diff --git a/manager.go b/manager.go index 1162ac9..09bae39 100644 --- a/manager.go +++ b/manager.go @@ -16,12 +16,15 @@ type WorkUnit interface { type UnitManager interface { ShouldStop() <-chan bool Done() + Panic(err error) } type WorkUnitManager struct { stop chan bool workerQuit chan bool unit WorkUnit + panic chan error + isPaniced bool } func (w *WorkUnitManager) ShouldStop() <-chan bool { @@ -32,12 +35,21 @@ func (w *WorkUnitManager) Done() { w.workerQuit <- true } +func (w *WorkUnitManager) Panic(err error) { + w.panic <- err + w.isPaniced = true + w.workerQuit <- true + close(w.stop) +} + type Manager struct { signal chan os.Signal workers map[string]*WorkUnitManager Quit chan bool + + panic chan error // Used for panicing goroutines } func (m *Manager) Start() { @@ -59,7 +71,7 @@ func (m *Manager) Start() { // send shutdown event to all worker units for name, w := range m.workers { - log.Printf("shuting down <%s>\n", name) + log.Printf("shutting down <%s>\n", name) w.stop <- true } @@ -74,6 +86,35 @@ func (m *Manager) Start() { m.Quit <- true + //TODO: wait on Quit signal from any WorkerUnit + case p := <-m.panic: + //TODO: refactor shutdown procedure and reuse here + // send shutdown event to all worker units + + for name, w := range m.workers { + if w.isPaniced { + log.Printf("Panicing for <%s>: %s", name, p) + } + } + + for name, w := range m.workers { + log.Printf("shuting down <%s>\n", name) + if !w.isPaniced { + w.stop <- true + } + } + + // Wait for all units to quit + for name, w := range m.workers { + <-w.workerQuit + log.Printf("<%s> down", name) + } + + // All workers have shutdown + log.Println("All workers have shutdown, shutting down manager ...") + + m.Quit <- true + } } } @@ -88,6 +129,7 @@ func (m *Manager) AddUnit(unit WorkUnit) { workerQuit: make(chan bool, 1), stop: make(chan bool, 1), unit: unit, + panic: m.panic, } unitType := reflect.TypeOf(unit) @@ -102,7 +144,8 @@ func (m *Manager) AddUnit(unit WorkUnit) { func NewManager() *Manager { return &Manager{ signal: make(chan os.Signal, 1), - Quit: make(chan bool), + Quit: make(chan bool, 1), workers: make(map[string]*WorkUnitManager), + panic: make(chan error, 1), } }