pubsub: add more ticker

pull/14/head
Edouard Paris 5 years ago
parent b3ea008c43
commit 9fc5ab2fbc

@ -1,13 +1,15 @@
package events
const (
PeerUpdated = "peer.updated"
BlockReceived = "block.received"
InvoiceCreated = "invoice.created"
InvoiceSettled = "invoice.settled"
ChannelPending = "channel.pending"
ChannelActive = "channel.active"
ChannelInactive = "channel.inactive"
PeerUpdated = "peer.updated"
BlockReceived = "block.received"
InvoiceCreated = "invoice.created"
InvoiceSettled = "invoice.settled"
ChannelPending = "channel.pending"
ChannelActive = "channel.active"
ChannelInactive = "channel.inactive"
ChannelBalanceUpdated = "channel.balance.updated"
WalletBalanceUpdated = "wallet.balance.updated"
)
type Event struct {

@ -3,7 +3,6 @@ package pubsub
import (
"context"
"sync"
"time"
"github.com/edouardparis/lntop/events"
"github.com/edouardparis/lntop/logging"
@ -27,49 +26,6 @@ func New(logger logging.Logger, network *network.Network) *PubSub {
}
}
func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event) {
p.wg.Add(1)
ticker := time.NewTicker(3 * time.Second)
go func() {
var old *models.Info
for {
select {
case <-p.stop:
ticker.Stop()
p.wg.Done()
return
case <-ticker.C:
info, err := p.network.Info(ctx)
if err != nil {
p.logger.Error("SubscribeInvoice returned an error", logging.Error(err))
}
if old != nil {
if old.BlockHeight != info.BlockHeight {
sub <- events.New(events.BlockReceived)
}
if old.NumPeers != info.NumPeers {
sub <- events.New(events.PeerUpdated)
}
if old.NumPendingChannels < info.NumPendingChannels {
sub <- events.New(events.ChannelPending)
}
if old.NumActiveChannels < info.NumActiveChannels {
sub <- events.New(events.ChannelActive)
}
if old.NumInactiveChannels < info.NumInactiveChannels {
sub <- events.New(events.ChannelInactive)
}
}
old = info
}
}
}()
}
func (p *PubSub) invoices(ctx context.Context, sub chan *events.Event) {
p.wg.Add(3)
invoices := make(chan *models.Invoice)
@ -113,7 +69,11 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) {
p.logger.Debug("Starting...")
p.invoices(ctx, sub)
p.ticker(ctx, sub)
p.ticker(ctx, sub,
withTickerInfo(),
withTickerChannelsBalance(),
withTickerWalletBalance(),
)
<-p.stop
p.wg.Wait()

@ -0,0 +1,104 @@
package pubsub
import (
"context"
"time"
"github.com/edouardparis/lntop/events"
"github.com/edouardparis/lntop/logging"
"github.com/edouardparis/lntop/network"
"github.com/edouardparis/lntop/network/models"
)
type tickerFunc func(context.Context, logging.Logger, *network.Network, chan *events.Event)
func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event, fn ...tickerFunc) {
p.wg.Add(1)
ticker := time.NewTicker(3 * time.Second)
go func() {
for {
select {
case <-p.stop:
ticker.Stop()
p.wg.Done()
return
case <-ticker.C:
for i := range fn {
fn[i](ctx, p.logger, p.network, sub)
}
}
}
}()
}
// withTickerInfo checks if general information did not changed changed in the ticker interval.
func withTickerInfo() tickerFunc {
var old *models.Info
return func(ctx context.Context, logger logging.Logger, net *network.Network, sub chan *events.Event) {
info, err := net.Info(ctx)
if err != nil {
logger.Error("network info returned an error", logging.Error(err))
}
if old != nil {
if old.BlockHeight != info.BlockHeight {
sub <- events.New(events.BlockReceived)
}
if old.NumPeers != info.NumPeers {
sub <- events.New(events.PeerUpdated)
}
if old.NumPendingChannels < info.NumPendingChannels {
sub <- events.New(events.ChannelPending)
}
if old.NumActiveChannels < info.NumActiveChannels {
sub <- events.New(events.ChannelActive)
}
if old.NumInactiveChannels < info.NumInactiveChannels {
sub <- events.New(events.ChannelInactive)
}
}
old = info
}
}
// withTickerChannelsBalance checks if channels balance and pending balance
// changed in the ticker interval.
func withTickerChannelsBalance() tickerFunc {
var old *models.ChannelsBalance
return func(ctx context.Context, logger logging.Logger, net *network.Network, sub chan *events.Event) {
channelsBalance, err := net.GetChannelsBalance(ctx)
if err != nil {
logger.Error("network channels balance returned an error", logging.Error(err))
}
if old != nil {
if old.Balance != channelsBalance.Balance ||
old.PendingOpenBalance != channelsBalance.PendingOpenBalance {
sub <- events.New(events.ChannelBalanceUpdated)
}
}
old = channelsBalance
}
}
// withTickerWalletBalance checks if wallet balance and pending balance
// changed in the ticker interval.
func withTickerWalletBalance() tickerFunc {
var old *models.WalletBalance
return func(ctx context.Context, logger logging.Logger, net *network.Network, sub chan *events.Event) {
walletBalance, err := net.GetWalletBalance(ctx)
if err != nil {
logger.Error("network wallet balance returned an error", logging.Error(err))
}
if old != nil {
if old.TotalBalance != walletBalance.TotalBalance ||
old.ConfirmedBalance != walletBalance.ConfirmedBalance ||
old.UnconfirmedBalance != walletBalance.UnconfirmedBalance {
sub <- events.New(events.WalletBalanceUpdated)
}
}
old = walletBalance
}
}

@ -91,6 +91,17 @@ func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events.
switch event.Type {
case events.BlockReceived:
refresh(c.models.RefreshInfo)
case events.WalletBalanceUpdated:
refresh(
c.models.RefreshInfo,
c.models.RefreshWalletBalance,
)
case events.ChannelBalanceUpdated:
refresh(
c.models.RefreshInfo,
c.models.RefreshChannelsBalance,
c.models.RefreshChannels,
)
case events.ChannelPending:
refresh(
c.models.RefreshInfo,

Loading…
Cancel
Save