From 9fc5ab2fbc52f3f94566559e9e63d18b4a95c062 Mon Sep 17 00:00:00 2001 From: Edouard Paris Date: Mon, 15 Apr 2019 13:34:29 +0200 Subject: [PATCH] pubsub: add more ticker --- events/events.go | 16 ++++---- pubsub/pubsub.go | 50 +++-------------------- pubsub/ticker.go | 104 +++++++++++++++++++++++++++++++++++++++++++++++ ui/controller.go | 11 +++++ 4 files changed, 129 insertions(+), 52 deletions(-) create mode 100644 pubsub/ticker.go diff --git a/events/events.go b/events/events.go index 947372d..dc5c1d9 100644 --- a/events/events.go +++ b/events/events.go @@ -1,13 +1,15 @@ package events const ( - PeerUpdated = "peer.updated" - BlockReceived = "block.received" - InvoiceCreated = "invoice.created" - InvoiceSettled = "invoice.settled" - ChannelPending = "channel.pending" - ChannelActive = "channel.active" - ChannelInactive = "channel.inactive" + PeerUpdated = "peer.updated" + BlockReceived = "block.received" + InvoiceCreated = "invoice.created" + InvoiceSettled = "invoice.settled" + ChannelPending = "channel.pending" + ChannelActive = "channel.active" + ChannelInactive = "channel.inactive" + ChannelBalanceUpdated = "channel.balance.updated" + WalletBalanceUpdated = "wallet.balance.updated" ) type Event struct { diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index e1715ef..b372a88 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -3,7 +3,6 @@ package pubsub import ( "context" "sync" - "time" "github.com/edouardparis/lntop/events" "github.com/edouardparis/lntop/logging" @@ -27,49 +26,6 @@ func New(logger logging.Logger, network *network.Network) *PubSub { } } -func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event) { - p.wg.Add(1) - ticker := time.NewTicker(3 * time.Second) - go func() { - var old *models.Info - for { - select { - case <-p.stop: - ticker.Stop() - p.wg.Done() - return - case <-ticker.C: - info, err := p.network.Info(ctx) - if err != nil { - p.logger.Error("SubscribeInvoice returned an error", logging.Error(err)) - } - if old != nil { - if old.BlockHeight != info.BlockHeight { - sub <- events.New(events.BlockReceived) - } - - if old.NumPeers != info.NumPeers { - sub <- events.New(events.PeerUpdated) - } - - if old.NumPendingChannels < info.NumPendingChannels { - sub <- events.New(events.ChannelPending) - } - - if old.NumActiveChannels < info.NumActiveChannels { - sub <- events.New(events.ChannelActive) - } - - if old.NumInactiveChannels < info.NumInactiveChannels { - sub <- events.New(events.ChannelInactive) - } - } - old = info - } - } - }() -} - func (p *PubSub) invoices(ctx context.Context, sub chan *events.Event) { p.wg.Add(3) invoices := make(chan *models.Invoice) @@ -113,7 +69,11 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) { p.logger.Debug("Starting...") p.invoices(ctx, sub) - p.ticker(ctx, sub) + p.ticker(ctx, sub, + withTickerInfo(), + withTickerChannelsBalance(), + withTickerWalletBalance(), + ) <-p.stop p.wg.Wait() diff --git a/pubsub/ticker.go b/pubsub/ticker.go new file mode 100644 index 0000000..2e90932 --- /dev/null +++ b/pubsub/ticker.go @@ -0,0 +1,104 @@ +package pubsub + +import ( + "context" + "time" + + "github.com/edouardparis/lntop/events" + "github.com/edouardparis/lntop/logging" + "github.com/edouardparis/lntop/network" + "github.com/edouardparis/lntop/network/models" +) + +type tickerFunc func(context.Context, logging.Logger, *network.Network, chan *events.Event) + +func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event, fn ...tickerFunc) { + p.wg.Add(1) + ticker := time.NewTicker(3 * time.Second) + go func() { + for { + select { + case <-p.stop: + ticker.Stop() + p.wg.Done() + return + case <-ticker.C: + for i := range fn { + fn[i](ctx, p.logger, p.network, sub) + } + } + } + }() +} + +// withTickerInfo checks if general information did not changed changed in the ticker interval. +func withTickerInfo() tickerFunc { + var old *models.Info + return func(ctx context.Context, logger logging.Logger, net *network.Network, sub chan *events.Event) { + info, err := net.Info(ctx) + if err != nil { + logger.Error("network info returned an error", logging.Error(err)) + } + if old != nil { + if old.BlockHeight != info.BlockHeight { + sub <- events.New(events.BlockReceived) + } + + if old.NumPeers != info.NumPeers { + sub <- events.New(events.PeerUpdated) + } + + if old.NumPendingChannels < info.NumPendingChannels { + sub <- events.New(events.ChannelPending) + } + + if old.NumActiveChannels < info.NumActiveChannels { + sub <- events.New(events.ChannelActive) + } + + if old.NumInactiveChannels < info.NumInactiveChannels { + sub <- events.New(events.ChannelInactive) + } + } + old = info + } +} + +// withTickerChannelsBalance checks if channels balance and pending balance +// changed in the ticker interval. +func withTickerChannelsBalance() tickerFunc { + var old *models.ChannelsBalance + return func(ctx context.Context, logger logging.Logger, net *network.Network, sub chan *events.Event) { + channelsBalance, err := net.GetChannelsBalance(ctx) + if err != nil { + logger.Error("network channels balance returned an error", logging.Error(err)) + } + if old != nil { + if old.Balance != channelsBalance.Balance || + old.PendingOpenBalance != channelsBalance.PendingOpenBalance { + sub <- events.New(events.ChannelBalanceUpdated) + } + } + old = channelsBalance + } +} + +// withTickerWalletBalance checks if wallet balance and pending balance +// changed in the ticker interval. +func withTickerWalletBalance() tickerFunc { + var old *models.WalletBalance + return func(ctx context.Context, logger logging.Logger, net *network.Network, sub chan *events.Event) { + walletBalance, err := net.GetWalletBalance(ctx) + if err != nil { + logger.Error("network wallet balance returned an error", logging.Error(err)) + } + if old != nil { + if old.TotalBalance != walletBalance.TotalBalance || + old.ConfirmedBalance != walletBalance.ConfirmedBalance || + old.UnconfirmedBalance != walletBalance.UnconfirmedBalance { + sub <- events.New(events.WalletBalanceUpdated) + } + } + old = walletBalance + } +} diff --git a/ui/controller.go b/ui/controller.go index f3f6e20..e0cf8a5 100644 --- a/ui/controller.go +++ b/ui/controller.go @@ -91,6 +91,17 @@ func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events. switch event.Type { case events.BlockReceived: refresh(c.models.RefreshInfo) + case events.WalletBalanceUpdated: + refresh( + c.models.RefreshInfo, + c.models.RefreshWalletBalance, + ) + case events.ChannelBalanceUpdated: + refresh( + c.models.RefreshInfo, + c.models.RefreshChannelsBalance, + c.models.RefreshChannels, + ) case events.ChannelPending: refresh( c.models.RefreshInfo,