You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

190 lines
3.6 KiB
Go

5 years ago
package gum
import (
"fmt"
5 years ago
"log"
"os"
"os/signal"
"reflect"
"strings"
)
7 months ago
var idGenerator = genID()
// The WorkUnit interface is used to define a unit of work.
// The Run method will be called in a goroutine.
5 years ago
type WorkUnit interface {
5 years ago
Run(UnitManager)
5 years ago
}
// The UnitManager interface is used to manage a unit of work.
// The ShouldStop method returns a channel that will be closed when the unit
// should stop.
// The Done method should be called when the unit is done.
5 years ago
type UnitManager interface {
ShouldStop() <-chan bool
Done()
Panic(err error)
5 years ago
}
type WorkUnitManager struct {
stop chan bool
workerQuit chan bool
unit WorkUnit
panic chan error
isPaniced bool
5 years ago
}
func (w *WorkUnitManager) ShouldStop() <-chan bool {
return w.stop
}
func (w *WorkUnitManager) Done() {
w.workerQuit <- true
}
func (w *WorkUnitManager) Panic(err error) {
w.panic <- err
w.isPaniced = true
w.workerQuit <- true
close(w.stop)
}
5 years ago
type Manager struct {
signalIn chan os.Signal
shutdownSigs []os.Signal
5 years ago
workers map[string]*WorkUnitManager
Quit chan bool
panic chan error // Used for panicing goroutines
5 years ago
}
5 years ago
func (m *Manager) Run() {
5 years ago
log.Println("Starting manager ...")
for unitName, w := range m.workers {
log.Printf("Starting <%s>\n", unitName)
5 years ago
go w.unit.Run(w)
5 years ago
}
for {
select {
case sig := <-m.signalIn:
if !in(m.shutdownSigs, sig) {
5 years ago
break
}
log.Println("shutting event received ... ")
// send shutdown event to all worker units
for name, w := range m.workers {
log.Printf("shutting down <%s>\n", name)
5 years ago
w.stop <- true
}
// Wait for all units to quit
for name, w := range m.workers {
<-w.workerQuit
log.Printf("<%s> down", name)
}
// All workers have shutdown
log.Println("All workers have shutdown, shutting down manager ...")
m.Quit <- true
case p := <-m.panic:
for name, w := range m.workers {
if w.isPaniced {
log.Printf("Panicing for <%s>: %s", name, p)
}
}
for name, w := range m.workers {
log.Printf("shuting down <%s>\n", name)
if !w.isPaniced {
w.stop <- true
}
}
// Wait for all units to quit
for name, w := range m.workers {
<-w.workerQuit
log.Printf("<%s> down", name)
}
// All workers have shutdown
log.Println("All workers have shutdown, shutting down manager ...")
m.Quit <- true
5 years ago
}
}
}
func (m *Manager) ShutdownOn(sig ...os.Signal) {
for _, s := range sig {
log.Printf("Registering shutdown signal: %s\n", s)
signal.Notify(m.signalIn, s)
}
m.shutdownSigs = append(m.shutdownSigs, sig...)
}
type IDGenerator func(string) int
7 months ago
func genID() IDGenerator {
ids := make(map[string]int)
return func(unit string) int {
ret := ids[unit]
ids[unit]++
return ret
}
5 years ago
}
func (m *Manager) AddUnit(unit WorkUnit, name string) {
5 years ago
workUnitManager := &WorkUnitManager{
workerQuit: make(chan bool, 1),
stop: make(chan bool, 1),
unit: unit,
panic: m.panic,
5 years ago
}
unitType := reflect.TypeOf(unit)
unitClass := strings.Split(unitType.String(), ".")[1]
unitName := fmt.Sprintf("%s[%s", name, unitClass)
unitID := idGenerator(unitName)
unitName = fmt.Sprintf("%s#%d]", unitName, unitID)
5 years ago
log.Println("Adding unit ", unitName)
m.workers[unitName] = workUnitManager
}
func NewManager() *Manager {
return &Manager{
signalIn: make(chan os.Signal, 1),
Quit: make(chan bool, 1),
workers: make(map[string]*WorkUnitManager),
panic: make(chan error, 1),
}
}
// Test if signal is in array
func in(arr []os.Signal, sig os.Signal) bool {
for _, s := range arr {
if s == sig {
return true
}
5 years ago
}
return false
5 years ago
}