diff --git a/cli/cli.go b/cli/cli.go index 68fdf91..b2989b6 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -2,6 +2,8 @@ package cli import ( "context" + "os" + "os/signal" cli "gopkg.in/urfave/cli.v2" @@ -53,16 +55,20 @@ func run(c *cli.Context) error { return err } + ctx := context.Background() + events := make(chan *events.Event) + ps := pubsub.New(app.Logger, app.Network) go func() { - err := ui.Run(context.Background(), app, events) + err := ui.Run(ctx, app, events) if err != nil { app.Logger.Debug("ui", logging.String("error", err.Error())) } + ps.Stop() }() - pubsub.Run(context.Background(), app, events) + ps.Run(ctx, events) return nil } @@ -78,9 +84,15 @@ func pubsubRun(c *cli.Context) error { } events := make(chan *events.Event) - pubsub.Run(context.Background(), app, events) - //ev := <-events - //app.Logger.Info("events quit ", logging.String("type", ev.Type)) + ps := pubsub.New(app.Logger, app.Network) + ps.Run(context.Background(), events) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, os.Interrupt) + go func() { + <-sig + ps.Stop() + }() return nil } diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 625e7a4..e1715ef 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -2,27 +2,24 @@ package pubsub import ( "context" - "os" - "os/signal" "sync" "time" - "github.com/edouardparis/lntop/app" "github.com/edouardparis/lntop/events" "github.com/edouardparis/lntop/logging" "github.com/edouardparis/lntop/network" "github.com/edouardparis/lntop/network/models" ) -type pubSub struct { +type PubSub struct { stop chan bool logger logging.Logger network *network.Network wg *sync.WaitGroup } -func newPubSub(logger logging.Logger, network *network.Network) *pubSub { - return &pubSub{ +func New(logger logging.Logger, network *network.Network) *PubSub { + return &PubSub{ logger: logger.With(logging.String("logger", "pubsub")), network: network, wg: &sync.WaitGroup{}, @@ -30,7 +27,7 @@ func newPubSub(logger logging.Logger, network *network.Network) *pubSub { } } -func (p *pubSub) ticker(ctx context.Context, sub chan *events.Event) { +func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event) { p.wg.Add(1) ticker := time.NewTicker(3 * time.Second) go func() { @@ -73,7 +70,7 @@ func (p *pubSub) ticker(ctx context.Context, sub chan *events.Event) { }() } -func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) { +func (p *PubSub) invoices(ctx context.Context, sub chan *events.Event) { p.wg.Add(3) invoices := make(chan *models.Invoice) ctx, cancel := context.WithCancel(ctx) @@ -106,25 +103,18 @@ func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) { }() } -func (p *pubSub) wait() { - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt) - p.wg.Add(1) - go func() { - sig := <-c - p.logger.Debug("Received signal, gracefully stopping", logging.String("sig", sig.String())) - p.stop <- true - close(p.stop) - p.wg.Done() - }() - p.wg.Wait() +func (p *PubSub) Stop() { + p.stop <- true + close(p.stop) + p.logger.Debug("Received signal, gracefully stopping") } -func Run(ctx context.Context, app *app.App, sub chan *events.Event) { - pubSub := newPubSub(app.Logger, app.Network) - pubSub.logger.Debug("Starting...") +func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) { + p.logger.Debug("Starting...") + + p.invoices(ctx, sub) + p.ticker(ctx, sub) - pubSub.invoices(ctx, sub) - pubSub.ticker(ctx, sub) - pubSub.wait() + <-p.stop + p.wg.Wait() }