From 10fd30ba32396be6753e8f4bc5ad514778e0aa3a Mon Sep 17 00:00:00 2001 From: Chakib Benziane Date: Wed, 12 Dec 2018 20:44:23 +0100 Subject: [PATCH] handle units with same base Type, tests --- README.md | 4 ++- manager.go | 50 ++++++++++++++++++++++++------ manager_test.go | 82 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+), 10 deletions(-) create mode 100644 manager_test.go diff --git a/README.md b/README.md index bf5452a..01049ce 100644 --- a/README.md +++ b/README.md @@ -35,7 +35,7 @@ import ( type Worker struct{} -// Example loop, it will be spwaned in a goroutine +// Example loop, will be spwaned inside a goroutine func (w *Worker) Spawn(um UnitManager) { ticker := time.NewTicker(time.Second) @@ -75,9 +75,11 @@ func main() { // NewWorker returns a type implementing WorkUnit interface unit := worker := NewWorker() + worker2 := NewWorker() // Register the unit with the manager manager.AddUnit(worker) + manager.AddUnit(worker2) // Start the manager go manager.Start() diff --git a/manager.go b/manager.go index 09bae39..2d3a955 100644 --- a/manager.go +++ b/manager.go @@ -5,9 +5,12 @@ import ( "os" "os/signal" "reflect" + "strconv" "strings" ) +var idGen = IdGenerator() + type WorkUnit interface { Spawn(UnitManager) Shutdown() @@ -43,7 +46,9 @@ func (w *WorkUnitManager) Panic(err error) { } type Manager struct { - signal chan os.Signal + signalIn chan os.Signal + + shutdownSigs []os.Signal workers map[string]*WorkUnitManager @@ -62,8 +67,9 @@ func (m *Manager) Start() { for { select { - case sig := <-m.signal: - if sig != os.Interrupt { + case sig := <-m.signalIn: + + if !in(m.shutdownSigs, sig) { break } @@ -120,7 +126,21 @@ func (m *Manager) Start() { } func (m *Manager) ShutdownOn(sig os.Signal) { - signal.Notify(m.signal, sig) + signal.Notify(m.signalIn, sig) + + m.shutdownSigs = append(m.shutdownSigs, sig) +} + +type IDGenerator func(string) int + +func IdGenerator() IDGenerator { + ids := make(map[string]int) + + return func(unit string) int { + ret := ids[unit] + ids[unit]++ + return ret + } } func (m *Manager) AddUnit(unit WorkUnit) { @@ -135,17 +155,29 @@ func (m *Manager) AddUnit(unit WorkUnit) { unitType := reflect.TypeOf(unit) unitName := strings.Split(unitType.String(), ".")[1] + unitId := idGen(unitName) + unitName += strconv.Itoa(unitId) + log.Println("Adding unit ", unitName) m.workers[unitName] = workUnitManager - log.Println(m.workers) } func NewManager() *Manager { return &Manager{ - signal: make(chan os.Signal, 1), - Quit: make(chan bool, 1), - workers: make(map[string]*WorkUnitManager), - panic: make(chan error, 1), + signalIn: make(chan os.Signal, 1), + Quit: make(chan bool, 1), + workers: make(map[string]*WorkUnitManager), + panic: make(chan error, 1), + } +} + +// Test if signal is in array +func in(arr []os.Signal, sig os.Signal) bool { + for _, s := range arr { + if s == sig { + return true + } } + return false } diff --git a/manager_test.go b/manager_test.go new file mode 100644 index 0000000..a21624d --- /dev/null +++ b/manager_test.go @@ -0,0 +1,82 @@ +package gum + +import ( + "log" + "os" + "syscall" + "testing" + "time" +) + +var WorkerID int + +type Worker struct{} + +// Example loop, it will be spwaned in a goroutine +func (w *Worker) Spawn(um UnitManager) { + ticker := time.NewTicker(time.Second) + + // Worker's loop + for { + select { + case <-ticker.C: + log.Println("tick") + + // Read from channel if this worker unit should stop + case <-um.ShouldStop(): + + // Shutdown work for current unit + w.Shutdown() + + // Notify manager that this unit is done. + um.Done() + } + } +} + +func (w *Worker) Shutdown() { + // Do shutdown procedure for worker + return +} + +func NewWorker() *Worker { + return &Worker{} +} + +func DoRunMain(pid chan int, quit chan<- bool) { + + pid <- os.Getpid() + + // Create a unit manager + manager := NewManager() + + // Shutdown all units on SIGINT + manager.ShutdownOn(os.Interrupt) + + // NewWorker returns a type implementing WorkUnit interface unit := + worker1 := NewWorker() + worker2 := NewWorker() + + // Register the unit with the manager + manager.AddUnit(worker1) + manager.AddUnit(worker2) + + // Start the manager + go manager.Start() + + // Wait for all units to shutdown gracefully through their `Shutdown` method + <-manager.Quit + quit <- true +} + +func TestRunMain(t *testing.T) { + mainPid := make(chan int, 1) + quit := make(chan bool) + go DoRunMain(mainPid, quit) + + time.Sleep(3 * time.Second) + + syscall.Kill(<-mainPid, syscall.SIGINT) + <-quit + +}