diff --git a/events/events.go b/events/events.go index e20d295..8cb7db0 100644 --- a/events/events.go +++ b/events/events.go @@ -12,6 +12,7 @@ const ( TransactionCreated = "transaction.created" WalletBalanceUpdated = "wallet.balance.updated" RoutingEventUpdated = "routing.event.updated" + GraphUpdated = "graph.updated" ) type Event struct { diff --git a/network/backend/backend.go b/network/backend/backend.go index 0f27d55..5e1c830 100644 --- a/network/backend/backend.go +++ b/network/backend/backend.go @@ -41,4 +41,6 @@ type Backend interface { SubscribeTransactions(context.Context, chan *models.Transaction) error SubscribeRoutingEvents(context.Context, chan *models.RoutingEvent) error + + SubscribeGraphEvents(context.Context, chan *models.ChannelEdgeUpdate) error } diff --git a/network/backend/lnd/lnd.go b/network/backend/lnd/lnd.go index bf50c67..8a81650 100644 --- a/network/backend/lnd/lnd.go +++ b/network/backend/lnd/lnd.go @@ -2,6 +2,7 @@ package lnd import ( "context" + "encoding/hex" "fmt" "time" @@ -21,7 +22,7 @@ import ( const ( lndDefaultInvoiceExpiry = 3600 - lndMinPoolCapacity = 5 + lndMinPoolCapacity = 6 ) type Client struct { @@ -174,6 +175,53 @@ func (l Backend) SubscribeChannels(ctx context.Context, events chan *models.Chan } } +func chanpointToString(c *lnrpc.ChannelPoint) string { + hash := c.GetFundingTxidBytes() + for i := 0; i < len(hash)/2; i++ { + hash[i], hash[len(hash)-i-1] = hash[len(hash)-i-1], hash[i] + } + output := c.OutputIndex + result := fmt.Sprintf("%s:%d", hex.EncodeToString(hash), output) + return result +} + +func (l Backend) SubscribeGraphEvents(ctx context.Context, events chan *models.ChannelEdgeUpdate) error { + clt, err := l.Client(ctx) + if err != nil { + return err + } + defer clt.Close() + + graphEvents, err := clt.SubscribeChannelGraph(ctx, &lnrpc.GraphTopologySubscription{}) + if err != nil { + return err + } + + for { + select { + case <-ctx.Done(): + return nil + default: + event, err := graphEvents.Recv() + if err != nil { + st, ok := status.FromError(err) + if ok && st.Code() == codes.Canceled { + l.logger.Debug("stopping subscribe graph: context canceled") + return nil + } + return err + } + chanPoints := []string{} + for _, c := range event.ChannelUpdates { + chanPoints = append(chanPoints, chanpointToString(c.ChanPoint)) + } + if len(chanPoints) > 0 { + events <- &models.ChannelEdgeUpdate{ChanPoints: chanPoints} + } + } + } +} + func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan *models.RoutingEvent) error { clt, err := l.RouterClient(ctx) if err != nil { diff --git a/network/backend/mock/mock.go b/network/backend/mock/mock.go index 5d059bd..449c7a2 100644 --- a/network/backend/mock/mock.go +++ b/network/backend/mock/mock.go @@ -54,7 +54,11 @@ func (b *Backend) SubscribeRoutingEvents(ctx context.Context, channel chan *mode return nil } -func (b *Backend) GetNode(ctx context.Context, pubkey string, includeChannels bool) (*models.Node, error) { +func (b *Backend) SubscribeGraphEvents(ctx context.Context, channel chan *models.ChannelEdgeUpdate) error { + return nil +} + +func (b *Backend) GetNode(ctx context.Context, pubkey string) (*models.Node, error) { return &models.Node{}, nil } diff --git a/network/models/channel.go b/network/models/channel.go index b57cbcf..6596be6 100644 --- a/network/models/channel.go +++ b/network/models/channel.go @@ -90,6 +90,9 @@ func (m Channel) ShortAlias() (alias string, forced bool) { type ChannelUpdate struct { } +type ChannelEdgeUpdate struct { + ChanPoints []string +} type RoutingPolicy struct { TimeLockDelta uint32 MinHtlc int64 diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 2ce67cf..5364183 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -117,6 +117,35 @@ func (p *PubSub) routingUpdates(ctx context.Context, sub chan *events.Event) { }() } +func (p *PubSub) graphUpdates(ctx context.Context, sub chan *events.Event) { + p.wg.Add(3) + graphUpdates := make(chan *models.ChannelEdgeUpdate) + ctx, cancel := context.WithCancel(ctx) + + go func() { + for gu := range graphUpdates { + p.logger.Debug("receive graph update") + sub <- events.NewWithData(events.GraphUpdated, gu) + } + p.wg.Done() + }() + + go func() { + err := p.network.SubscribeGraphEvents(ctx, graphUpdates) + if err != nil { + p.logger.Error("SubscribeGraphEvents returned an error", logging.Error(err)) + } + p.wg.Done() + }() + + go func() { + <-p.stop + cancel() + close(graphUpdates) + p.wg.Done() + }() +} + func (p *PubSub) channels(ctx context.Context, sub chan *events.Event) { p.wg.Add(3) channels := make(chan *models.ChannelUpdate) @@ -159,6 +188,7 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) { p.transactions(ctx, sub) p.routingUpdates(ctx, sub) p.channels(ctx, sub) + p.graphUpdates(ctx, sub) p.ticker(ctx, sub, withTickerInfo(), withTickerChannelsBalance(), diff --git a/ui/controller.go b/ui/controller.go index e292dc4..96f70a7 100644 --- a/ui/controller.go +++ b/ui/controller.go @@ -179,6 +179,8 @@ func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events. refresh(c.models.RefreshInfo) case events.RoutingEventUpdated: refresh(c.models.RefreshRouting(event.Data)) + case events.GraphUpdated: + refresh(c.models.RefreshPolicies(event.Data)) } } } diff --git a/ui/models/models.go b/ui/models/models.go index c22c2d1..b4e3262 100644 --- a/ui/models/models.go +++ b/ui/models/models.go @@ -144,6 +144,22 @@ func (m *Models) RefreshRouting(update interface{}) func(context.Context) error }) } +func (m *Models) RefreshPolicies(update interface{}) func(context.Context) error { + return func(ctx context.Context) error { + for _, chanpoint := range update.(*models.ChannelEdgeUpdate).ChanPoints { + if m.Channels.Contains(&models.Channel{ChannelPoint: chanpoint}) { + m.logger.Debug("updating channel", logging.String("chanpoint", chanpoint)) + channel := m.Channels.GetByChanPoint(chanpoint) + err := m.network.GetChannelInfo(ctx, channel) + if err != nil { + m.logger.Error("error updating channel info", logging.Error(err)) + } + } + } + return nil + } +} + func (m *Models) RefreshCurrentNode(ctx context.Context) (err error) { cur := m.Channels.Current() if cur != nil { diff --git a/ui/views/channels.go b/ui/views/channels.go index d61fd88..836f4c7 100644 --- a/ui/views/channels.go +++ b/ui/views/channels.go @@ -652,12 +652,40 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels { return channels } +func channelDisabled(c *netmodels.Channel, opts ...color.Option) string { + outgoing := false + incoming := false + if c.Policy1 != nil && c.Policy1.Disabled { + outgoing = true + } + if c.Policy2 != nil && c.Policy2.Disabled { + incoming = true + } + result := "" + if incoming && outgoing { + result = "⇅" + } else if incoming { + result = "⇊" + } else if outgoing { + result = "⇈" + } + if result == "" { + return result + } + return color.Red(opts...)(fmt.Sprintf("%-4s", result)) +} + func status(c *netmodels.Channel, opts ...color.Option) string { + disabled := channelDisabled(c, opts...) + format := "%-13s" + if disabled != "" { + format = "%-9s" + } switch c.Status { case netmodels.ChannelActive: - return color.Green(opts...)(fmt.Sprintf("%-13s", "active")) + return color.Green(opts...)(fmt.Sprintf(format, "active ")) + disabled case netmodels.ChannelInactive: - return color.Red(opts...)(fmt.Sprintf("%-13s", "inactive")) + return color.Red(opts...)(fmt.Sprintf(format, "inactive ")) + disabled case netmodels.ChannelOpening: return color.Yellow(opts...)(fmt.Sprintf("%-13s", "opening")) case netmodels.ChannelClosing: