refac lnd: subscribe invoices

pull/1/head
Edouard Paris 5 years ago
parent e2c89f3143
commit cdfcb5ecd6

@ -8,6 +8,8 @@ import (
"github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/edouardparis/lntop/config" "github.com/edouardparis/lntop/config"
"github.com/edouardparis/lntop/logging" "github.com/edouardparis/lntop/logging"
@ -44,6 +46,7 @@ func (l Backend) Info(ctx context.Context) (*models.Info, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer clt.Close()
resp, err := clt.GetInfo(ctx, &lnrpc.GetInfoRequest{}) resp, err := clt.GetInfo(ctx, &lnrpc.GetInfoRequest{})
if err != nil { if err != nil {
@ -58,6 +61,7 @@ func (l Backend) SubscribeInvoice(ctx context.Context, channelInvoice chan *mode
if err != nil { if err != nil {
return err return err
} }
defer clt.Close()
cltInvoices, err := clt.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{}) cltInvoices, err := clt.SubscribeInvoices(ctx, &lnrpc.InvoiceSubscription{})
if err != nil { if err != nil {
@ -65,12 +69,22 @@ func (l Backend) SubscribeInvoice(ctx context.Context, channelInvoice chan *mode
} }
for { for {
invoice, err := cltInvoices.Recv() select {
if err != nil { case <-ctx.Done():
return err break
default:
invoice, err := cltInvoices.Recv()
if err != nil {
st, ok := status.FromError(err)
if ok && st.Code() == codes.Canceled {
l.logger.Debug("stopping subscribe invoice: context canceled")
return nil
}
return err
}
channelInvoice <- lookupInvoiceProtoToInvoice(invoice)
} }
channelInvoice <- lookupInvoiceProtoToInvoice(invoice)
} }
} }

@ -30,7 +30,7 @@ func newPubSub(logger logging.Logger, network *network.Network) *pubSub {
} }
func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) { func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) {
p.wg.Add(2) p.wg.Add(3)
invoices := make(chan *models.Invoice) invoices := make(chan *models.Invoice)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
@ -51,11 +51,15 @@ func (p *pubSub) invoices(ctx context.Context, sub chan *events.Event) {
if err != nil { if err != nil {
p.logger.Error("SubscribeInvoice returned an error", logging.Error(err)) p.logger.Error("SubscribeInvoice returned an error", logging.Error(err))
} }
close(invoices) p.wg.Done()
}() }()
<-p.stop go func() {
cancel() <-p.stop
cancel()
close(invoices)
p.wg.Done()
}()
} }
func (p *pubSub) wait() { func (p *pubSub) wait() {

Loading…
Cancel
Save