units can panic and shutdown

master
Chakib Benziane 5 years ago
parent 6f3604bb33
commit ba431eb0f1

@ -16,12 +16,15 @@ type WorkUnit interface {
type UnitManager interface { type UnitManager interface {
ShouldStop() <-chan bool ShouldStop() <-chan bool
Done() Done()
Panic(err error)
} }
type WorkUnitManager struct { type WorkUnitManager struct {
stop chan bool stop chan bool
workerQuit chan bool workerQuit chan bool
unit WorkUnit unit WorkUnit
panic chan error
isPaniced bool
} }
func (w *WorkUnitManager) ShouldStop() <-chan bool { func (w *WorkUnitManager) ShouldStop() <-chan bool {
@ -32,12 +35,21 @@ func (w *WorkUnitManager) Done() {
w.workerQuit <- true w.workerQuit <- true
} }
func (w *WorkUnitManager) Panic(err error) {
w.panic <- err
w.isPaniced = true
w.workerQuit <- true
close(w.stop)
}
type Manager struct { type Manager struct {
signal chan os.Signal signal chan os.Signal
workers map[string]*WorkUnitManager workers map[string]*WorkUnitManager
Quit chan bool Quit chan bool
panic chan error // Used for panicing goroutines
} }
func (m *Manager) Start() { func (m *Manager) Start() {
@ -59,7 +71,7 @@ func (m *Manager) Start() {
// send shutdown event to all worker units // send shutdown event to all worker units
for name, w := range m.workers { for name, w := range m.workers {
log.Printf("shuting down <%s>\n", name) log.Printf("shutting down <%s>\n", name)
w.stop <- true w.stop <- true
} }
@ -74,6 +86,35 @@ func (m *Manager) Start() {
m.Quit <- true m.Quit <- true
//TODO: wait on Quit signal from any WorkerUnit
case p := <-m.panic:
//TODO: refactor shutdown procedure and reuse here
// send shutdown event to all worker units
for name, w := range m.workers {
if w.isPaniced {
log.Printf("Panicing for <%s>: %s", name, p)
}
}
for name, w := range m.workers {
log.Printf("shuting down <%s>\n", name)
if !w.isPaniced {
w.stop <- true
}
}
// Wait for all units to quit
for name, w := range m.workers {
<-w.workerQuit
log.Printf("<%s> down", name)
}
// All workers have shutdown
log.Println("All workers have shutdown, shutting down manager ...")
m.Quit <- true
} }
} }
} }
@ -88,6 +129,7 @@ func (m *Manager) AddUnit(unit WorkUnit) {
workerQuit: make(chan bool, 1), workerQuit: make(chan bool, 1),
stop: make(chan bool, 1), stop: make(chan bool, 1),
unit: unit, unit: unit,
panic: m.panic,
} }
unitType := reflect.TypeOf(unit) unitType := reflect.TypeOf(unit)
@ -102,7 +144,8 @@ func (m *Manager) AddUnit(unit WorkUnit) {
func NewManager() *Manager { func NewManager() *Manager {
return &Manager{ return &Manager{
signal: make(chan os.Signal, 1), signal: make(chan os.Signal, 1),
Quit: make(chan bool), Quit: make(chan bool, 1),
workers: make(map[string]*WorkUnitManager), workers: make(map[string]*WorkUnitManager),
panic: make(chan error, 1),
} }
} }

Loading…
Cancel
Save