From 574017626624315640f4b19cb2d6eb8eed1afc61 Mon Sep 17 00:00:00 2001 From: Edouard Paris Date: Tue, 26 Mar 2019 13:30:39 +0100 Subject: [PATCH] pubsub: watch invoices --- pubsub/pubsub.go | 73 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 69 insertions(+), 4 deletions(-) diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index c039279..1609fb2 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -2,18 +2,83 @@ package pubsub import ( "context" + "os" + "os/signal" "sync" "github.com/edouardparis/lntop/app" "github.com/edouardparis/lntop/events" "github.com/edouardparis/lntop/logging" + "github.com/edouardparis/lntop/network" + "github.com/edouardparis/lntop/network/models" ) +type pubSub struct { + stop chan bool + logger logging.Logger + network *network.Network + wg *sync.WaitGroup +} + +func newPubSub(logger logging.Logger, network *network.Network) *pubSub { + return &pubSub{ + logger: logger.With(logging.String("logger", "pubsub")), + network: network, + wg: &sync.WaitGroup{}, + stop: make(chan bool), + } +} + +func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) { + p.wg.Add(2) + invoices := make(chan *models.Invoice) + + go func() { + for { + select { + case <-p.stop: + p.wg.Done() + return + default: + invoice := <-invoices + p.logger.Debug("receive invoice", logging.Object("invoice", invoice)) + } + } + }() + + go func() { + select { + case <-p.stop: + p.wg.Done() + return + default: + err := p.network.SubscribeInvoice(ctx, invoices) + if err != nil { + p.logger.Error("SubscribeInvoice returned an error", logging.Error(err)) + } + } + }() +} + +func (p *pubSub) wait() { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + p.wg.Add(1) + go func() { + for sig := range c { + p.logger.Debug("Received signal, gracefully stopping", logging.String("sig", sig.String())) + p.wg.Done() + close(p.stop) + } + }() + p.wg.Wait() +} + func Run(ctx context.Context, app *app.App, sub chan *events.Event) error { - logger := app.Logger.With(logging.String("logger", "pubsub")) - wg := &sync.WaitGroup{} + pubSub := newPubSub(app.Logger, app.Network) + pubSub.logger.Debug("Starting...") - logger.Debug("Starting...") - wg.Wait() + pubSub.invoices(ctx, sub) + pubSub.wait() return nil }