Merge pull request #76 from rkfg/channels

Show closed and disabled channels, fix policy order
pull/92/head
Édouard 2 years ago committed by GitHub
commit 9954fd5704
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,6 +12,7 @@ const (
TransactionCreated = "transaction.created"
WalletBalanceUpdated = "wallet.balance.updated"
RoutingEventUpdated = "routing.event.updated"
GraphUpdated = "graph.updated"
)
type Event struct {

@ -41,4 +41,6 @@ type Backend interface {
SubscribeTransactions(context.Context, chan *models.Transaction) error
SubscribeRoutingEvents(context.Context, chan *models.RoutingEvent) error
SubscribeGraphEvents(context.Context, chan *models.ChannelEdgeUpdate) error
}

@ -2,6 +2,7 @@ package lnd
import (
"context"
"encoding/hex"
"fmt"
"time"
@ -21,7 +22,7 @@ import (
const (
lndDefaultInvoiceExpiry = 3600
lndMinPoolCapacity = 4
lndMinPoolCapacity = 6
)
type Client struct {
@ -91,7 +92,7 @@ func (l Backend) SubscribeInvoice(ctx context.Context, channelInvoice chan *mode
for {
select {
case <-ctx.Done():
break
return nil
default:
invoice, err := cltInvoices.Recv()
if err != nil {
@ -123,7 +124,7 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models
for {
select {
case <-ctx.Done():
break
return nil
default:
transaction, err := cltTransactions.Recv()
if err != nil {
@ -141,24 +142,84 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models
}
func (l Backend) SubscribeChannels(ctx context.Context, events chan *models.ChannelUpdate) error {
_, err := l.Client(ctx)
clt, err := l.Client(ctx)
if err != nil {
return err
}
defer clt.Close()
// events, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
// if err != nil {
// return err
// }
channelEvents, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
if err != nil {
return err
}
// for {
// event, err := events.Recv()
// if err != nil {
// return err
// }
// events <-
//}
return nil
for {
select {
case <-ctx.Done():
return nil
default:
event, err := channelEvents.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
l.logger.Debug("stopping subscribe channels: context canceled")
return nil
}
return err
}
if event.Type == lnrpc.ChannelEventUpdate_FULLY_RESOLVED_CHANNEL {
events <- &models.ChannelUpdate{}
}
}
}
}
func chanpointToString(c *lnrpc.ChannelPoint) string {
hash := c.GetFundingTxidBytes()
for i := 0; i < len(hash)/2; i++ {
hash[i], hash[len(hash)-i-1] = hash[len(hash)-i-1], hash[i]
}
output := c.OutputIndex
result := fmt.Sprintf("%s:%d", hex.EncodeToString(hash), output)
return result
}
func (l Backend) SubscribeGraphEvents(ctx context.Context, events chan *models.ChannelEdgeUpdate) error {
clt, err := l.Client(ctx)
if err != nil {
return err
}
defer clt.Close()
graphEvents, err := clt.SubscribeChannelGraph(ctx, &lnrpc.GraphTopologySubscription{})
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
default:
event, err := graphEvents.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
l.logger.Debug("stopping subscribe graph: context canceled")
return nil
}
return err
}
chanPoints := []string{}
for _, c := range event.ChannelUpdates {
chanPoints = append(chanPoints, chanpointToString(c.ChanPoint))
}
if len(chanPoints) > 0 {
events <- &models.ChannelEdgeUpdate{ChanPoints: chanPoints}
}
}
}
}
func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan *models.RoutingEvent) error {
@ -176,7 +237,7 @@ func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan
for {
select {
case <-ctx.Done():
break
return nil
default:
event, err := cltRoutingEvents.Recv()
if err != nil {
@ -351,15 +412,15 @@ func (l Backend) GetChannelInfo(ctx context.Context, channel *models.Channel) er
t := time.Unix(int64(uint64(resp.LastUpdate)), 0)
channel.LastUpdate = &t
channel.Policy1 = protoToRoutingPolicy(resp.Node1Policy)
channel.Policy2 = protoToRoutingPolicy(resp.Node2Policy)
channel.LocalPolicy = protoToRoutingPolicy(resp.Node1Policy)
channel.RemotePolicy = protoToRoutingPolicy(resp.Node2Policy)
info, err := clt.GetInfo(ctx, &lnrpc.GetInfoRequest{})
if err != nil {
return errors.WithStack(err)
}
if info != nil {
channel.WeFirst = resp.Node1Pub == info.IdentityPubkey
if info != nil && resp.Node1Pub != info.IdentityPubkey {
channel.LocalPolicy, channel.RemotePolicy = channel.RemotePolicy, channel.LocalPolicy
}
return nil

@ -274,11 +274,11 @@ func nodeProtoToNode(resp *lnrpc.NodeInfo) *models.Node {
ID: c.ChannelId,
ChannelPoint: c.ChanPoint,
Capacity: c.Capacity,
Policy1: protoToRoutingPolicy(c.Node1Policy),
Policy2: protoToRoutingPolicy(c.Node2Policy),
LocalPolicy: protoToRoutingPolicy(c.Node1Policy),
RemotePolicy: protoToRoutingPolicy(c.Node2Policy),
}
if c.Node1Pub != resp.Node.PubKey {
ch.Policy1, ch.Policy2 = ch.Policy2, ch.Policy1
ch.LocalPolicy, ch.RemotePolicy = ch.RemotePolicy, ch.LocalPolicy
}
channels = append(channels, ch)
}

@ -54,6 +54,10 @@ func (b *Backend) SubscribeRoutingEvents(ctx context.Context, channel chan *mode
return nil
}
func (b *Backend) SubscribeGraphEvents(ctx context.Context, channel chan *models.ChannelEdgeUpdate) error {
return nil
}
func (b *Backend) GetNode(ctx context.Context, pubkey string, includeChannels bool) (*models.Node, error) {
return &models.Node{}, nil
}

@ -1,6 +1,7 @@
package models
import (
"strings"
"time"
"github.com/edouardparis/lntop/logging"
@ -13,6 +14,7 @@ const (
ChannelClosing
ChannelForceClosing
ChannelWaitingClose
ChannelClosed
)
type ChannelsBalance struct {
@ -47,9 +49,8 @@ type Channel struct {
PendingHTLC []*HTLC
LastUpdate *time.Time
Node *Node
WeFirst bool
Policy1 *RoutingPolicy
Policy2 *RoutingPolicy
LocalPolicy *RoutingPolicy
RemotePolicy *RoutingPolicy
}
func (m Channel) MarshalLogObject(enc logging.ObjectEncoder) error {
@ -78,7 +79,7 @@ func (m Channel) ShortAlias() (alias string, forced bool) {
} else if m.Node == nil || m.Node.Alias == "" {
alias = m.RemotePubKey[:24]
} else {
alias = m.Node.Alias
alias = strings.ReplaceAll(m.Node.Alias, "\ufe0f", "")
}
if len(alias) > 25 {
alias = alias[:24]
@ -89,6 +90,9 @@ func (m Channel) ShortAlias() (alias string, forced bool) {
type ChannelUpdate struct {
}
type ChannelEdgeUpdate struct {
ChanPoints []string
}
type RoutingPolicy struct {
TimeLockDelta uint32
MinHtlc int64

@ -117,6 +117,64 @@ func (p *PubSub) routingUpdates(ctx context.Context, sub chan *events.Event) {
}()
}
func (p *PubSub) graphUpdates(ctx context.Context, sub chan *events.Event) {
p.wg.Add(3)
graphUpdates := make(chan *models.ChannelEdgeUpdate)
ctx, cancel := context.WithCancel(ctx)
go func() {
for gu := range graphUpdates {
p.logger.Debug("receive graph update")
sub <- events.NewWithData(events.GraphUpdated, gu)
}
p.wg.Done()
}()
go func() {
err := p.network.SubscribeGraphEvents(ctx, graphUpdates)
if err != nil {
p.logger.Error("SubscribeGraphEvents returned an error", logging.Error(err))
}
p.wg.Done()
}()
go func() {
<-p.stop
cancel()
close(graphUpdates)
p.wg.Done()
}()
}
func (p *PubSub) channels(ctx context.Context, sub chan *events.Event) {
p.wg.Add(3)
channels := make(chan *models.ChannelUpdate)
ctx, cancel := context.WithCancel(ctx)
go func() {
for range channels {
p.logger.Debug("channels updated")
sub <- events.New(events.ChannelActive)
}
p.wg.Done()
}()
go func() {
err := p.network.SubscribeChannels(ctx, channels)
if err != nil {
p.logger.Error("SubscribeChannels returned an error", logging.Error(err))
}
p.wg.Done()
}()
go func() {
<-p.stop
cancel()
close(channels)
p.wg.Done()
}()
}
func (p *PubSub) Stop() {
p.stop <- true
close(p.stop)
@ -129,6 +187,8 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) {
p.invoices(ctx, sub)
p.transactions(ctx, sub)
p.routingUpdates(ctx, sub)
p.channels(ctx, sub)
p.graphUpdates(ctx, sub)
p.ticker(ctx, sub,
withTickerInfo(),
withTickerChannelsBalance(),

@ -179,6 +179,8 @@ func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events.
refresh(c.models.RefreshInfo)
case events.RoutingEventUpdated:
refresh(c.models.RefreshRouting(event.Data))
case events.GraphUpdated:
refresh(c.models.RefreshPolicies(event.Data))
}
}
}

@ -109,12 +109,12 @@ func (c *Channels) Update(newChannel *models.Channel) {
oldChannel.LastUpdate = newChannel.LastUpdate
}
if newChannel.Policy1 != nil {
oldChannel.Policy1 = newChannel.Policy1
if newChannel.LocalPolicy != nil {
oldChannel.LocalPolicy = newChannel.LocalPolicy
}
if newChannel.Policy2 != nil {
oldChannel.Policy2 = newChannel.Policy2
if newChannel.RemotePolicy != nil {
oldChannel.RemotePolicy = newChannel.RemotePolicy
}
}

@ -52,14 +52,16 @@ func (m *Models) RefreshChannels(ctx context.Context) error {
if err != nil {
return err
}
index := map[string]*models.Channel{}
for i := range channels {
index[channels[i].ChannelPoint] = channels[i]
if !m.Channels.Contains(channels[i]) {
m.Channels.Add(channels[i])
}
channel := m.Channels.GetByChanPoint(channels[i].ChannelPoint)
if channel != nil &&
(channel.UpdatesCount < channels[i].UpdatesCount ||
channel.LastUpdate == nil) {
channel.LastUpdate == nil || channel.LocalPolicy == nil || channel.RemotePolicy == nil) {
err := m.network.GetChannelInfo(ctx, channels[i])
if err != nil {
return err
@ -77,6 +79,11 @@ func (m *Models) RefreshChannels(ctx context.Context) error {
m.Channels.Update(channels[i])
}
for _, c := range m.Channels.List() {
if _, ok := index[c.ChannelPoint]; !ok {
c.Status = models.ChannelClosed
}
}
return nil
}
@ -137,6 +144,22 @@ func (m *Models) RefreshRouting(update interface{}) func(context.Context) error
})
}
func (m *Models) RefreshPolicies(update interface{}) func(context.Context) error {
return func(ctx context.Context) error {
for _, chanpoint := range update.(*models.ChannelEdgeUpdate).ChanPoints {
if m.Channels.Contains(&models.Channel{ChannelPoint: chanpoint}) {
m.logger.Debug("updating channel", logging.String("chanpoint", chanpoint))
channel := m.Channels.GetByChanPoint(chanpoint)
err := m.network.GetChannelInfo(ctx, channel)
if err != nil {
m.logger.Error("error updating channel info", logging.Error(err))
}
}
}
return nil
}
}
func (m *Models) RefreshCurrentNode(ctx context.Context) (err error) {
cur := m.Channels.Current()
if cur != nil {

@ -218,10 +218,10 @@ func (c *Channel) display() {
disabledOut := 0
disabledIn := 0
for _, ch := range c.channels.CurrentNode.Channels {
if ch.Policy1 != nil && ch.Policy1.Disabled {
if ch.LocalPolicy != nil && ch.LocalPolicy.Disabled {
disabledOut++
}
if ch.Policy2 != nil && ch.Policy2.Disabled {
if ch.RemotePolicy != nil && ch.RemotePolicy.Disabled {
disabledIn++
}
}
@ -230,17 +230,14 @@ func (c *Channel) display() {
}
}
if channel.Policy1 != nil && channel.WeFirst {
printPolicy(v, p, channel.Policy1, true)
if channel.LocalPolicy != nil {
printPolicy(v, p, channel.LocalPolicy, true)
}
if channel.Policy2 != nil {
printPolicy(v, p, channel.Policy2, !channel.WeFirst)
if channel.RemotePolicy != nil {
printPolicy(v, p, channel.RemotePolicy, false)
}
if channel.Policy1 != nil && !channel.WeFirst {
printPolicy(v, p, channel.Policy1, false)
}
if len(channel.PendingHTLC) > 0 {
fmt.Fprintln(v)
fmt.Fprintln(v, green(" [ Pending HTLCs ]"))

@ -546,19 +546,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels {
return func(c1, c2 *netmodels.Channel) bool {
var c1f uint64
var c2f uint64
if c1.Policy1 != nil {
c1f = uint64(c1.Policy1.FeeBaseMsat)
if c1.LocalPolicy != nil {
c1f = uint64(c1.LocalPolicy.FeeBaseMsat)
}
if c2.Policy1 != nil {
c2f = uint64(c2.Policy1.FeeBaseMsat)
if c2.LocalPolicy != nil {
c2f = uint64(c2.LocalPolicy.FeeBaseMsat)
}
return models.UInt64Sort(c1f, c2f, order)
}
},
display: func(c *netmodels.Channel, opts ...color.Option) string {
var val int64
if c.Policy1 != nil {
val = c.Policy1.FeeBaseMsat
if c.LocalPolicy != nil {
val = c.LocalPolicy.FeeBaseMsat
}
return color.White(opts...)(printer.Sprintf("%8d", val))
},
@ -571,19 +571,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels {
return func(c1, c2 *netmodels.Channel) bool {
var c1f uint64
var c2f uint64
if c1.Policy1 != nil {
c1f = uint64(c1.Policy1.FeeRateMilliMsat)
if c1.LocalPolicy != nil {
c1f = uint64(c1.LocalPolicy.FeeRateMilliMsat)
}
if c2.Policy1 != nil {
c2f = uint64(c2.Policy1.FeeRateMilliMsat)
if c2.LocalPolicy != nil {
c2f = uint64(c2.LocalPolicy.FeeRateMilliMsat)
}
return models.UInt64Sort(c1f, c2f, order)
}
},
display: func(c *netmodels.Channel, opts ...color.Option) string {
var val int64
if c.Policy1 != nil {
val = c.Policy1.FeeRateMilliMsat
if c.LocalPolicy != nil {
val = c.LocalPolicy.FeeRateMilliMsat
}
return color.White(opts...)(printer.Sprintf("%8d", val))
},
@ -596,19 +596,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels {
return func(c1, c2 *netmodels.Channel) bool {
var c1f uint64
var c2f uint64
if c1.Policy2 != nil {
c1f = uint64(c1.Policy2.FeeBaseMsat)
if c1.RemotePolicy != nil {
c1f = uint64(c1.RemotePolicy.FeeBaseMsat)
}
if c2.Policy2 != nil {
c2f = uint64(c2.Policy2.FeeBaseMsat)
if c2.RemotePolicy != nil {
c2f = uint64(c2.RemotePolicy.FeeBaseMsat)
}
return models.UInt64Sort(c1f, c2f, order)
}
},
display: func(c *netmodels.Channel, opts ...color.Option) string {
var val int64
if c.Policy2 != nil {
val = c.Policy2.FeeBaseMsat
if c.RemotePolicy != nil {
val = c.RemotePolicy.FeeBaseMsat
}
return color.White(opts...)(printer.Sprintf("%7d", val))
},
@ -621,19 +621,19 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels {
return func(c1, c2 *netmodels.Channel) bool {
var c1f uint64
var c2f uint64
if c1.Policy2 != nil {
c1f = uint64(c1.Policy2.FeeRateMilliMsat)
if c1.RemotePolicy != nil {
c1f = uint64(c1.RemotePolicy.FeeRateMilliMsat)
}
if c2.Policy2 != nil {
c2f = uint64(c2.Policy2.FeeRateMilliMsat)
if c2.RemotePolicy != nil {
c2f = uint64(c2.RemotePolicy.FeeRateMilliMsat)
}
return models.UInt64Sort(c1f, c2f, order)
}
},
display: func(c *netmodels.Channel, opts ...color.Option) string {
var val int64
if c.Policy2 != nil {
val = c.Policy2.FeeRateMilliMsat
if c.RemotePolicy != nil {
val = c.RemotePolicy.FeeRateMilliMsat
}
return color.White(opts...)(printer.Sprintf("%7d", val))
},
@ -652,12 +652,40 @@ func NewChannels(cfg *config.View, chans *models.Channels) *Channels {
return channels
}
func channelDisabled(c *netmodels.Channel, opts ...color.Option) string {
outgoing := false
incoming := false
if c.LocalPolicy != nil && c.LocalPolicy.Disabled {
outgoing = true
}
if c.RemotePolicy != nil && c.RemotePolicy.Disabled {
incoming = true
}
result := ""
if incoming && outgoing {
result = "⇅"
} else if incoming {
result = "⇊"
} else if outgoing {
result = "⇈"
}
if result == "" {
return result
}
return color.Red(opts...)(fmt.Sprintf("%-4s", result))
}
func status(c *netmodels.Channel, opts ...color.Option) string {
disabled := channelDisabled(c, opts...)
format := "%-13s"
if disabled != "" {
format = "%-9s"
}
switch c.Status {
case netmodels.ChannelActive:
return color.Green(opts...)(fmt.Sprintf("%-13s", "active"))
return color.Green(opts...)(fmt.Sprintf(format, "active ")) + disabled
case netmodels.ChannelInactive:
return color.Red(opts...)(fmt.Sprintf("%-13s", "inactive"))
return color.Red(opts...)(fmt.Sprintf(format, "inactive ")) + disabled
case netmodels.ChannelOpening:
return color.Yellow(opts...)(fmt.Sprintf("%-13s", "opening"))
case netmodels.ChannelClosing:
@ -666,6 +694,8 @@ func status(c *netmodels.Channel, opts ...color.Option) string {
return color.Yellow(opts...)(fmt.Sprintf("%-13s", "force closing"))
case netmodels.ChannelWaitingClose:
return color.Yellow(opts...)(fmt.Sprintf("%-13s", "waiting close"))
case netmodels.ChannelClosed:
return color.Red(opts...)(fmt.Sprintf("%-13s", "closed"))
}
return ""
}

Loading…
Cancel
Save