diff --git a/events/events.go b/events/events.go index e20d295..8cb7db0 100644 --- a/events/events.go +++ b/events/events.go @@ -12,6 +12,7 @@ const ( TransactionCreated = "transaction.created" WalletBalanceUpdated = "wallet.balance.updated" RoutingEventUpdated = "routing.event.updated" + GraphUpdated = "graph.updated" ) type Event struct { diff --git a/network/backend/backend.go b/network/backend/backend.go index 0f27d55..5e1c830 100644 --- a/network/backend/backend.go +++ b/network/backend/backend.go @@ -41,4 +41,6 @@ type Backend interface { SubscribeTransactions(context.Context, chan *models.Transaction) error SubscribeRoutingEvents(context.Context, chan *models.RoutingEvent) error + + SubscribeGraphEvents(context.Context, chan *models.ChannelEdgeUpdate) error } diff --git a/network/backend/lnd/lnd.go b/network/backend/lnd/lnd.go index e369f9f..8ca7442 100644 --- a/network/backend/lnd/lnd.go +++ b/network/backend/lnd/lnd.go @@ -2,6 +2,7 @@ package lnd import ( "context" + "encoding/hex" "fmt" "time" @@ -21,7 +22,7 @@ import ( const ( lndDefaultInvoiceExpiry = 3600 - lndMinPoolCapacity = 4 + lndMinPoolCapacity = 6 ) type Client struct { @@ -91,7 +92,7 @@ func (l Backend) SubscribeInvoice(ctx context.Context, channelInvoice chan *mode for { select { case <-ctx.Done(): - break + return nil default: invoice, err := cltInvoices.Recv() if err != nil { @@ -123,7 +124,7 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models for { select { case <-ctx.Done(): - break + return nil default: transaction, err := cltTransactions.Recv() if err != nil { @@ -141,24 +142,84 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models } func (l Backend) SubscribeChannels(ctx context.Context, events chan *models.ChannelUpdate) error { - _, err := l.Client(ctx) + clt, err := l.Client(ctx) if err != nil { return err } + defer clt.Close() - // events, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{}) - // if err != nil { - // return err - // } + channelEvents, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{}) + if err != nil { + return err + } - // for { - // event, err := events.Recv() - // if err != nil { - // return err - // } - // events <- - //} - return nil + for { + select { + case <-ctx.Done(): + return nil + default: + event, err := channelEvents.Recv() + if err != nil { + st, ok := status.FromError(err) + if ok && st.Code() == codes.Canceled { + l.logger.Debug("stopping subscribe channels: context canceled") + return nil + } + return err + } + if event.Type == lnrpc.ChannelEventUpdate_FULLY_RESOLVED_CHANNEL { + events <- &models.ChannelUpdate{} + } + + } + } +} + +func chanpointToString(c *lnrpc.ChannelPoint) string { + hash := c.GetFundingTxidBytes() + for i := 0; i < len(hash)/2; i++ { + hash[i], hash[len(hash)-i-1] = hash[len(hash)-i-1], hash[i] + } + output := c.OutputIndex + result := fmt.Sprintf("%s:%d", hex.EncodeToString(hash), output) + return result +} + +func (l Backend) SubscribeGraphEvents(ctx context.Context, events chan *models.ChannelEdgeUpdate) error { + clt, err := l.Client(ctx) + if err != nil { + return err + } + defer clt.Close() + + graphEvents, err := clt.SubscribeChannelGraph(ctx, &lnrpc.GraphTopologySubscription{}) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return nil + default: + event, err := graphEvents.Recv() + if err != nil { + st, ok := status.FromError(err) + if ok && st.Code() == codes.Canceled { + l.logger.Debug("stopping subscribe graph: context canceled") + return nil + } + return err + } + chanPoints := []string{} + for _, c := range event.ChannelUpdates { + chanPoints = append(chanPoints, chanpointToString(c.ChanPoint)) + } + if len(chanPoints) > 0 { + events <- &models.ChannelEdgeUpdate{ChanPoints: chanPoints} + } + } + } } func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan *models.RoutingEvent) error { @@ -176,7 +237,7 @@ func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan for { select { case <-ctx.Done(): - break + return nil default: event, err := cltRoutingEvents.Recv() if err != nil { @@ -351,15 +412,15 @@ func (l Backend) GetChannelInfo(ctx context.Context, channel *models.Channel) er t := time.Unix(int64(uint64(resp.LastUpdate)), 0) channel.LastUpdate = &t - channel.Policy1 = protoToRoutingPolicy(resp.Node1Policy) - channel.Policy2 = protoToRoutingPolicy(resp.Node2Policy) + channel.LocalPolicy = protoToRoutingPolicy(resp.Node1Policy) + channel.RemotePolicy = protoToRoutingPolicy(resp.Node2Policy) info, err := clt.GetInfo(ctx, &lnrpc.GetInfoRequest{}) if err != nil { return errors.WithStack(err) } - if info != nil { - channel.WeFirst = resp.Node1Pub == info.IdentityPubkey + if info != nil && resp.Node1Pub != info.IdentityPubkey { + channel.LocalPolicy, channel.RemotePolicy = channel.RemotePolicy, channel.LocalPolicy } return nil diff --git a/network/backend/lnd/proto.go b/network/backend/lnd/proto.go index fa4ac7d..1ece758 100644 --- a/network/backend/lnd/proto.go +++ b/network/backend/lnd/proto.go @@ -274,11 +274,11 @@ func nodeProtoToNode(resp *lnrpc.NodeInfo) *models.Node { ID: c.ChannelId, ChannelPoint: c.ChanPoint, Capacity: c.Capacity, - Policy1: protoToRoutingPolicy(c.Node1Policy), - Policy2: protoToRoutingPolicy(c.Node2Policy), + LocalPolicy: protoToRoutingPolicy(c.Node1Policy), + RemotePolicy: protoToRoutingPolicy(c.Node2Policy), } if c.Node1Pub != resp.Node.PubKey { - ch.Policy1, ch.Policy2 = ch.Policy2, ch.Policy1 + ch.LocalPolicy, ch.RemotePolicy = ch.RemotePolicy, ch.LocalPolicy } channels = append(channels, ch) } diff --git a/network/backend/mock/mock.go b/network/backend/mock/mock.go index 5d059bd..4fdc076 100644 --- a/network/backend/mock/mock.go +++ b/network/backend/mock/mock.go @@ -54,6 +54,10 @@ func (b *Backend) SubscribeRoutingEvents(ctx context.Context, channel chan *mode return nil } +func (b *Backend) SubscribeGraphEvents(ctx context.Context, channel chan *models.ChannelEdgeUpdate) error { + return nil +} + func (b *Backend) GetNode(ctx context.Context, pubkey string, includeChannels bool) (*models.Node, error) { return &models.Node{}, nil } diff --git a/network/models/channel.go b/network/models/channel.go index 09b34f9..4a3e1f2 100644 --- a/network/models/channel.go +++ b/network/models/channel.go @@ -1,6 +1,7 @@ package models import ( + "strings" "time" "github.com/edouardparis/lntop/logging" @@ -13,6 +14,7 @@ const ( ChannelClosing ChannelForceClosing ChannelWaitingClose + ChannelClosed ) type ChannelsBalance struct { @@ -47,9 +49,8 @@ type Channel struct { PendingHTLC []*HTLC LastUpdate *time.Time Node *Node - WeFirst bool - Policy1 *RoutingPolicy - Policy2 *RoutingPolicy + LocalPolicy *RoutingPolicy + RemotePolicy *RoutingPolicy } func (m Channel) MarshalLogObject(enc logging.ObjectEncoder) error { @@ -78,7 +79,7 @@ func (m Channel) ShortAlias() (alias string, forced bool) { } else if m.Node == nil || m.Node.Alias == "" { alias = m.RemotePubKey[:24] } else { - alias = m.Node.Alias + alias = strings.ReplaceAll(m.Node.Alias, "\ufe0f", "") } if len(alias) > 25 { alias = alias[:24] @@ -89,6 +90,9 @@ func (m Channel) ShortAlias() (alias string, forced bool) { type ChannelUpdate struct { } +type ChannelEdgeUpdate struct { + ChanPoints []string +} type RoutingPolicy struct { TimeLockDelta uint32 MinHtlc int64 diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 0d32459..5364183 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -117,6 +117,64 @@ func (p *PubSub) routingUpdates(ctx context.Context, sub chan *events.Event) { }() } +func (p *PubSub) graphUpdates(ctx context.Context, sub chan *events.Event) { + p.wg.Add(3) + graphUpdates := make(chan *models.ChannelEdgeUpdate) + ctx, cancel := context.WithCancel(ctx) + + go func() { + for gu := range graphUpdates { + p.logger.Debug("receive graph update") + sub <- events.NewWithData(events.GraphUpdated, gu) + } + p.wg.Done() + }() + + go func() { + err := p.network.SubscribeGraphEvents(ctx, graphUpdates) + if err != nil { + p.logger.Error("SubscribeGraphEvents returned an error", logging.Error(err)) + } + p.wg.Done() + }() + + go func() { + <-p.stop + cancel() + close(graphUpdates) + p.wg.Done() + }() +} + +func (p *PubSub) channels(ctx context.Context, sub chan *events.Event) { + p.wg.Add(3) + channels := make(chan *models.ChannelUpdate) + ctx, cancel := context.WithCancel(ctx) + + go func() { + for range channels { + p.logger.Debug("channels updated") + sub <- events.New(events.ChannelActive) + } + p.wg.Done() + }() + + go func() { + err := p.network.SubscribeChannels(ctx, channels) + if err != nil { + p.logger.Error("SubscribeChannels returned an error", logging.Error(err)) + } + p.wg.Done() + }() + + go func() { + <-p.stop + cancel() + close(channels) + p.wg.Done() + }() +} + func (p *PubSub) Stop() { p.stop <- true close(p.stop) @@ -129,6 +187,8 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) { p.invoices(ctx, sub) p.transactions(ctx, sub) p.routingUpdates(ctx, sub) + p.channels(ctx, sub) + p.graphUpdates(ctx, sub) p.ticker(ctx, sub, withTickerInfo(), withTickerChannelsBalance(), diff --git a/ui/controller.go b/ui/controller.go index e292dc4..96f70a7 100644 --- a/ui/controller.go +++ b/ui/controller.go @@ -179,6 +179,8 @@ func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events. refresh(c.models.RefreshInfo) case events.RoutingEventUpdated: refresh(c.models.RefreshRouting(event.Data)) + case events.GraphUpdated: + refresh(c.models.RefreshPolicies(event.Data)) } } } diff --git a/ui/models/channels.go b/ui/models/channels.go index c1bb49c..93dc9d4 100644 --- a/ui/models/channels.go +++ b/ui/models/channels.go @@ -109,12 +109,12 @@ func (c *Channels) Update(newChannel *models.Channel) { oldChannel.LastUpdate = newChannel.LastUpdate } - if newChannel.Policy1 != nil { - oldChannel.Policy1 = newChannel.Policy1 + if newChannel.LocalPolicy != nil { + oldChannel.LocalPolicy = newChannel.LocalPolicy } - if newChannel.Policy2 != nil { - oldChannel.Policy2 = newChannel.Policy2 + if newChannel.RemotePolicy != nil { + oldChannel.RemotePolicy = newChannel.RemotePolicy } } diff --git a/ui/models/models.go b/ui/models/models.go index 08faf95..6801622 100644 --- a/ui/models/models.go +++ b/ui/models/models.go @@ -52,14 +52,16 @@ func (m *Models) RefreshChannels(ctx context.Context) error { if err != nil { return err } + index := map[string]*models.Channel{} for i := range channels { + index[channels[i].ChannelPoint] = channels[i] if !m.Channels.Contains(channels[i]) { m.Channels.Add(channels[i]) } channel := m.Channels.GetByChanPoint(channels[i].ChannelPoint) if channel != nil && (channel.UpdatesCount < channels[i].UpdatesCount || - channel.LastUpdate == nil) { + channel.LastUpdate == nil || channel.LocalPolicy == nil || channel.RemotePolicy == nil) { err := m.network.GetChannelInfo(ctx, channels[i]) if err != nil { return err @@ -77,6 +79,11 @@ func (m *Models) RefreshChannels(ctx context.Context) error { m.Channels.Update(channels[i]) } + for _, c := range m.Channels.List() { + if _, ok := index[c.ChannelPoint]; !ok { + c.Status = models.ChannelClosed + } + } return nil } @@ -137,6 +144,22 @@ func (m *Models) RefreshRouting(update interface{}) func(context.Context) error }) } +func (m *Models) RefreshPolicies(update interface{}) func(context.Context) error { + return func(ctx context.Context) error { + for _, chanpoint := range update.(*models.ChannelEdgeUpdate).ChanPoints { + if m.Channels.Contains(&models.Channel{ChannelPoint: chanpoint}) { + m.logger.Debug("updating channel", logging.String("chanpoint", chanpoint)) + channel := m.Channels.GetByChanPoint(chanpoint) + err := m.network.GetChannelInfo(ctx, channel) + if err != nil { + m.logger.Error("error updating channel info", logging.Error(err)) + } + } + } + return nil + } +} + func (m *Models) RefreshCurrentNode(ctx context.Context) (err error) { cur := m.Channels.Current() if cur != nil { diff --git a/ui/views/channel.go b/ui/views/channel.go index 95297b0..3a58bf2 100644 --- a/ui/views/channel.go +++ b/ui/views/channel.go @@ -218,10 +218,10 @@ func (c *Channel) display() { disabledOut := 0 disabledIn := 0 for _, ch := range c.channels.CurrentNode.Channels { - if ch.Policy1 != nil && ch.Policy1.Disabled { + if ch.LocalPolicy != nil && ch.LocalPolicy.Disabled { disabledOut++ } - if ch.Policy2 != nil && ch.Policy2.Disabled { + if ch.RemotePolicy != nil && ch.RemotePolicy.Disabled { disabledIn++ } } @@ -230,17 +230,14 @@ func (c *Channel) display() { } } - if channel.Policy1 != nil && channel.WeFirst { - printPolicy(v, p, channel.Policy1, true) + if channel.LocalPolicy != nil { + printPolicy(v, p, channel.LocalPolicy, true) } - if channel.Policy2 != nil { - printPolicy(v, p, channel.Policy2, !channel.WeFirst) + if channel.RemotePolicy != nil { + printPolicy(v, p, channel.RemotePolicy, false) } - if channel.Policy1 != nil && !channel.WeFirst { - printPolicy(v, p, channel.Policy1, false) - } if len(channel.PendingHTLC) > 0 { fmt.Fprintln(v) fmt.Fprintln(v, green(" [ Pending HTLCs ]")) diff --git a/ui/views/channels.go b/ui/views/channels.go index d0d81df..b5a0220 100644 --- a/ui/views/channels.go +++ b/ui/views/channels.go @@ -546,19 +546,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels { return func(c1, c2 *netmodels.Channel) bool { var c1f uint64 var c2f uint64 - if c1.Policy1 != nil { - c1f = uint64(c1.Policy1.FeeBaseMsat) + if c1.LocalPolicy != nil { + c1f = uint64(c1.LocalPolicy.FeeBaseMsat) } - if c2.Policy1 != nil { - c2f = uint64(c2.Policy1.FeeBaseMsat) + if c2.LocalPolicy != nil { + c2f = uint64(c2.LocalPolicy.FeeBaseMsat) } return models.UInt64Sort(c1f, c2f, order) } }, display: func(c *netmodels.Channel, opts ...color.Option) string { var val int64 - if c.Policy1 != nil { - val = c.Policy1.FeeBaseMsat + if c.LocalPolicy != nil { + val = c.LocalPolicy.FeeBaseMsat } return color.White(opts...)(printer.Sprintf("%8d", val)) }, @@ -571,19 +571,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels { return func(c1, c2 *netmodels.Channel) bool { var c1f uint64 var c2f uint64 - if c1.Policy1 != nil { - c1f = uint64(c1.Policy1.FeeRateMilliMsat) + if c1.LocalPolicy != nil { + c1f = uint64(c1.LocalPolicy.FeeRateMilliMsat) } - if c2.Policy1 != nil { - c2f = uint64(c2.Policy1.FeeRateMilliMsat) + if c2.LocalPolicy != nil { + c2f = uint64(c2.LocalPolicy.FeeRateMilliMsat) } return models.UInt64Sort(c1f, c2f, order) } }, display: func(c *netmodels.Channel, opts ...color.Option) string { var val int64 - if c.Policy1 != nil { - val = c.Policy1.FeeRateMilliMsat + if c.LocalPolicy != nil { + val = c.LocalPolicy.FeeRateMilliMsat } return color.White(opts...)(printer.Sprintf("%8d", val)) }, @@ -596,19 +596,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels { return func(c1, c2 *netmodels.Channel) bool { var c1f uint64 var c2f uint64 - if c1.Policy2 != nil { - c1f = uint64(c1.Policy2.FeeBaseMsat) + if c1.RemotePolicy != nil { + c1f = uint64(c1.RemotePolicy.FeeBaseMsat) } - if c2.Policy2 != nil { - c2f = uint64(c2.Policy2.FeeBaseMsat) + if c2.RemotePolicy != nil { + c2f = uint64(c2.RemotePolicy.FeeBaseMsat) } return models.UInt64Sort(c1f, c2f, order) } }, display: func(c *netmodels.Channel, opts ...color.Option) string { var val int64 - if c.Policy2 != nil { - val = c.Policy2.FeeBaseMsat + if c.RemotePolicy != nil { + val = c.RemotePolicy.FeeBaseMsat } return color.White(opts...)(printer.Sprintf("%7d", val)) }, @@ -621,19 +621,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels { return func(c1, c2 *netmodels.Channel) bool { var c1f uint64 var c2f uint64 - if c1.Policy2 != nil { - c1f = uint64(c1.Policy2.FeeRateMilliMsat) + if c1.RemotePolicy != nil { + c1f = uint64(c1.RemotePolicy.FeeRateMilliMsat) } - if c2.Policy2 != nil { - c2f = uint64(c2.Policy2.FeeRateMilliMsat) + if c2.RemotePolicy != nil { + c2f = uint64(c2.RemotePolicy.FeeRateMilliMsat) } return models.UInt64Sort(c1f, c2f, order) } }, display: func(c *netmodels.Channel, opts ...color.Option) string { var val int64 - if c.Policy2 != nil { - val = c.Policy2.FeeRateMilliMsat + if c.RemotePolicy != nil { + val = c.RemotePolicy.FeeRateMilliMsat } return color.White(opts...)(printer.Sprintf("%7d", val)) }, @@ -652,12 +652,40 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels { return channels } +func channelDisabled(c *netmodels.Channel, opts ...color.Option) string { + outgoing := false + incoming := false + if c.LocalPolicy != nil && c.LocalPolicy.Disabled { + outgoing = true + } + if c.RemotePolicy != nil && c.RemotePolicy.Disabled { + incoming = true + } + result := "" + if incoming && outgoing { + result = "⇅" + } else if incoming { + result = "⇊" + } else if outgoing { + result = "⇈" + } + if result == "" { + return result + } + return color.Red(opts...)(fmt.Sprintf("%-4s", result)) +} + func status(c *netmodels.Channel, opts ...color.Option) string { + disabled := channelDisabled(c, opts...) + format := "%-13s" + if disabled != "" { + format = "%-9s" + } switch c.Status { case netmodels.ChannelActive: - return color.Green(opts...)(fmt.Sprintf("%-13s", "active")) + return color.Green(opts...)(fmt.Sprintf(format, "active ")) + disabled case netmodels.ChannelInactive: - return color.Red(opts...)(fmt.Sprintf("%-13s", "inactive")) + return color.Red(opts...)(fmt.Sprintf(format, "inactive ")) + disabled case netmodels.ChannelOpening: return color.Yellow(opts...)(fmt.Sprintf("%-13s", "opening")) case netmodels.ChannelClosing: @@ -666,6 +694,8 @@ func status(c *netmodels.Channel, opts ...color.Option) string { return color.Yellow(opts...)(fmt.Sprintf("%-13s", "force closing")) case netmodels.ChannelWaitingClose: return color.Yellow(opts...)(fmt.Sprintf("%-13s", "waiting close")) + case netmodels.ChannelClosed: + return color.Red(opts...)(fmt.Sprintf("%-13s", "closed")) } return "" }