From a2590193d0f08db8ff2c671f15e5b4c5f46d0546 Mon Sep 17 00:00:00 2001 From: rkfg Date: Thu, 1 Sep 2022 20:04:51 +0300 Subject: [PATCH] Show closed channels, fix policy order --- network/backend/lnd/lnd.go | 47 ++++++++++++++++++++++++-------------- network/models/channel.go | 4 +++- pubsub/pubsub.go | 30 ++++++++++++++++++++++++ ui/models/models.go | 9 +++++++- ui/views/channels.go | 2 ++ 5 files changed, 73 insertions(+), 19 deletions(-) diff --git a/network/backend/lnd/lnd.go b/network/backend/lnd/lnd.go index e369f9f..35ee23e 100644 --- a/network/backend/lnd/lnd.go +++ b/network/backend/lnd/lnd.go @@ -21,7 +21,7 @@ import ( const ( lndDefaultInvoiceExpiry = 3600 - lndMinPoolCapacity = 4 + lndMinPoolCapacity = 5 ) type Client struct { @@ -91,7 +91,7 @@ func (l Backend) SubscribeInvoice(ctx context.Context, channelInvoice chan *mode for { select { case <-ctx.Done(): - break + return nil default: invoice, err := cltInvoices.Recv() if err != nil { @@ -123,7 +123,7 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models for { select { case <-ctx.Done(): - break + return nil default: transaction, err := cltTransactions.Recv() if err != nil { @@ -141,24 +141,37 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models } func (l Backend) SubscribeChannels(ctx context.Context, events chan *models.ChannelUpdate) error { - _, err := l.Client(ctx) + clt, err := l.Client(ctx) if err != nil { return err } + defer clt.Close() - // events, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{}) - // if err != nil { - // return err - // } + channelEvents, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{}) + if err != nil { + return err + } - // for { - // event, err := events.Recv() - // if err != nil { - // return err - // } - // events <- - //} - return nil + for { + select { + case <-ctx.Done(): + return nil + default: + event, err := channelEvents.Recv() + if err != nil { + st, ok := status.FromError(err) + if ok && st.Code() == codes.Canceled { + l.logger.Debug("stopping subscribe channels: context canceled") + return nil + } + return err + } + if event.Type == lnrpc.ChannelEventUpdate_FULLY_RESOLVED_CHANNEL { + events <- &models.ChannelUpdate{} + } + + } + } } func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan *models.RoutingEvent) error { @@ -176,7 +189,7 @@ func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan for { select { case <-ctx.Done(): - break + return nil default: event, err := cltRoutingEvents.Recv() if err != nil { diff --git a/network/models/channel.go b/network/models/channel.go index 09b34f9..973eac7 100644 --- a/network/models/channel.go +++ b/network/models/channel.go @@ -1,6 +1,7 @@ package models import ( + "strings" "time" "github.com/edouardparis/lntop/logging" @@ -13,6 +14,7 @@ const ( ChannelClosing ChannelForceClosing ChannelWaitingClose + ChannelClosed ) type ChannelsBalance struct { @@ -78,7 +80,7 @@ func (m Channel) ShortAlias() (alias string, forced bool) { } else if m.Node == nil || m.Node.Alias == "" { alias = m.RemotePubKey[:24] } else { - alias = m.Node.Alias + alias = strings.ReplaceAll(m.Node.Alias, "\ufe0f", "") } if len(alias) > 25 { alias = alias[:24] diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 0d32459..2ce67cf 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -117,6 +117,35 @@ func (p *PubSub) routingUpdates(ctx context.Context, sub chan *events.Event) { }() } +func (p *PubSub) channels(ctx context.Context, sub chan *events.Event) { + p.wg.Add(3) + channels := make(chan *models.ChannelUpdate) + ctx, cancel := context.WithCancel(ctx) + + go func() { + for range channels { + p.logger.Debug("channels updated") + sub <- events.New(events.ChannelActive) + } + p.wg.Done() + }() + + go func() { + err := p.network.SubscribeChannels(ctx, channels) + if err != nil { + p.logger.Error("SubscribeChannels returned an error", logging.Error(err)) + } + p.wg.Done() + }() + + go func() { + <-p.stop + cancel() + close(channels) + p.wg.Done() + }() +} + func (p *PubSub) Stop() { p.stop <- true close(p.stop) @@ -129,6 +158,7 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) { p.invoices(ctx, sub) p.transactions(ctx, sub) p.routingUpdates(ctx, sub) + p.channels(ctx, sub) p.ticker(ctx, sub, withTickerInfo(), withTickerChannelsBalance(), diff --git a/ui/models/models.go b/ui/models/models.go index 08faf95..c22c2d1 100644 --- a/ui/models/models.go +++ b/ui/models/models.go @@ -52,14 +52,16 @@ func (m *Models) RefreshChannels(ctx context.Context) error { if err != nil { return err } + index := map[string]*models.Channel{} for i := range channels { + index[channels[i].ChannelPoint] = channels[i] if !m.Channels.Contains(channels[i]) { m.Channels.Add(channels[i]) } channel := m.Channels.GetByChanPoint(channels[i].ChannelPoint) if channel != nil && (channel.UpdatesCount < channels[i].UpdatesCount || - channel.LastUpdate == nil) { + channel.LastUpdate == nil || channel.Policy1 == nil || channel.Policy2 == nil) { err := m.network.GetChannelInfo(ctx, channels[i]) if err != nil { return err @@ -77,6 +79,11 @@ func (m *Models) RefreshChannels(ctx context.Context) error { m.Channels.Update(channels[i]) } + for _, c := range m.Channels.List() { + if _, ok := index[c.ChannelPoint]; !ok { + c.Status = models.ChannelClosed + } + } return nil } diff --git a/ui/views/channels.go b/ui/views/channels.go index d0d81df..d61fd88 100644 --- a/ui/views/channels.go +++ b/ui/views/channels.go @@ -666,6 +666,8 @@ func status(c *netmodels.Channel, opts ...color.Option) string { return color.Yellow(opts...)(fmt.Sprintf("%-13s", "force closing")) case netmodels.ChannelWaitingClose: return color.Yellow(opts...)(fmt.Sprintf("%-13s", "waiting close")) + case netmodels.ChannelClosed: + return color.Red(opts...)(fmt.Sprintf("%-13s", "closed")) } return "" }