Show closed channels, fix policy order

pull/76/head
rkfg 2 years ago
parent eb662744e7
commit a2590193d0

@ -21,7 +21,7 @@ import (
const (
lndDefaultInvoiceExpiry = 3600
lndMinPoolCapacity = 4
lndMinPoolCapacity = 5
)
type Client struct {
@ -91,7 +91,7 @@ func (l Backend) SubscribeInvoice(ctx context.Context, channelInvoice chan *mode
for {
select {
case <-ctx.Done():
break
return nil
default:
invoice, err := cltInvoices.Recv()
if err != nil {
@ -123,7 +123,7 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models
for {
select {
case <-ctx.Done():
break
return nil
default:
transaction, err := cltTransactions.Recv()
if err != nil {
@ -141,24 +141,37 @@ func (l Backend) SubscribeTransactions(ctx context.Context, channel chan *models
}
func (l Backend) SubscribeChannels(ctx context.Context, events chan *models.ChannelUpdate) error {
_, err := l.Client(ctx)
clt, err := l.Client(ctx)
if err != nil {
return err
}
defer clt.Close()
// events, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
// if err != nil {
// return err
// }
channelEvents, err := clt.SubscribeChannelEvents(ctx, &lnrpc.ChannelEventSubscription{})
if err != nil {
return err
}
// for {
// event, err := events.Recv()
// if err != nil {
// return err
// }
// events <-
//}
return nil
for {
select {
case <-ctx.Done():
return nil
default:
event, err := channelEvents.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
l.logger.Debug("stopping subscribe channels: context canceled")
return nil
}
return err
}
if event.Type == lnrpc.ChannelEventUpdate_FULLY_RESOLVED_CHANNEL {
events <- &models.ChannelUpdate{}
}
}
}
}
func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan *models.RoutingEvent) error {
@ -176,7 +189,7 @@ func (l Backend) SubscribeRoutingEvents(ctx context.Context, channelEvents chan
for {
select {
case <-ctx.Done():
break
return nil
default:
event, err := cltRoutingEvents.Recv()
if err != nil {

@ -1,6 +1,7 @@
package models
import (
"strings"
"time"
"github.com/edouardparis/lntop/logging"
@ -13,6 +14,7 @@ const (
ChannelClosing
ChannelForceClosing
ChannelWaitingClose
ChannelClosed
)
type ChannelsBalance struct {
@ -78,7 +80,7 @@ func (m Channel) ShortAlias() (alias string, forced bool) {
} else if m.Node == nil || m.Node.Alias == "" {
alias = m.RemotePubKey[:24]
} else {
alias = m.Node.Alias
alias = strings.ReplaceAll(m.Node.Alias, "\ufe0f", "")
}
if len(alias) > 25 {
alias = alias[:24]

@ -117,6 +117,35 @@ func (p *PubSub) routingUpdates(ctx context.Context, sub chan *events.Event) {
}()
}
func (p *PubSub) channels(ctx context.Context, sub chan *events.Event) {
p.wg.Add(3)
channels := make(chan *models.ChannelUpdate)
ctx, cancel := context.WithCancel(ctx)
go func() {
for range channels {
p.logger.Debug("channels updated")
sub <- events.New(events.ChannelActive)
}
p.wg.Done()
}()
go func() {
err := p.network.SubscribeChannels(ctx, channels)
if err != nil {
p.logger.Error("SubscribeChannels returned an error", logging.Error(err))
}
p.wg.Done()
}()
go func() {
<-p.stop
cancel()
close(channels)
p.wg.Done()
}()
}
func (p *PubSub) Stop() {
p.stop <- true
close(p.stop)
@ -129,6 +158,7 @@ func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) {
p.invoices(ctx, sub)
p.transactions(ctx, sub)
p.routingUpdates(ctx, sub)
p.channels(ctx, sub)
p.ticker(ctx, sub,
withTickerInfo(),
withTickerChannelsBalance(),

@ -52,14 +52,16 @@ func (m *Models) RefreshChannels(ctx context.Context) error {
if err != nil {
return err
}
index := map[string]*models.Channel{}
for i := range channels {
index[channels[i].ChannelPoint] = channels[i]
if !m.Channels.Contains(channels[i]) {
m.Channels.Add(channels[i])
}
channel := m.Channels.GetByChanPoint(channels[i].ChannelPoint)
if channel != nil &&
(channel.UpdatesCount < channels[i].UpdatesCount ||
channel.LastUpdate == nil) {
channel.LastUpdate == nil || channel.Policy1 == nil || channel.Policy2 == nil) {
err := m.network.GetChannelInfo(ctx, channels[i])
if err != nil {
return err
@ -77,6 +79,11 @@ func (m *Models) RefreshChannels(ctx context.Context) error {
m.Channels.Update(channels[i])
}
for _, c := range m.Channels.List() {
if _, ok := index[c.ChannelPoint]; !ok {
c.Status = models.ChannelClosed
}
}
return nil
}

@ -666,6 +666,8 @@ func status(c *netmodels.Channel, opts ...color.Option) string {
return color.Yellow(opts...)(fmt.Sprintf("%-13s", "force closing"))
case netmodels.ChannelWaitingClose:
return color.Yellow(opts...)(fmt.Sprintf("%-13s", "waiting close"))
case netmodels.ChannelClosed:
return color.Red(opts...)(fmt.Sprintf("%-13s", "closed"))
}
return ""
}

Loading…
Cancel
Save