From 7740231bac0796bde6507f9bc6c3e08cbea2c569 Mon Sep 17 00:00:00 2001 From: carla Date: Wed, 30 Sep 2020 12:34:07 +0200 Subject: [PATCH] liquidity: make swap suggestions aware of ongoing swaps --- liquidity/liquidity.go | 122 ++++++++++++++++++++++++++++- liquidity/liquidity_test.go | 151 ++++++++++++++++++++++++++++++++++++ liquidity/log.go | 26 +++++++ loopd/log.go | 2 + loopd/utils.go | 6 +- 5 files changed, 301 insertions(+), 6 deletions(-) create mode 100644 liquidity/log.go diff --git a/liquidity/liquidity.go b/liquidity/liquidity.go index 0829363..1fd8f68 100644 --- a/liquidity/liquidity.go +++ b/liquidity/liquidity.go @@ -1,6 +1,12 @@ // Package liquidity is responsible for monitoring our node's liquidity. It // allows setting of a liquidity rule which describes the desired liquidity // balance on a per-channel basis. +// +// Swap suggestions are limited to channels that are not currently being used +// for a pending swap. If we are currently processing an unrestricted swap (ie, +// a loop out with no outgoing channel targets set or a loop in with no last +// hop set), we will not suggest any swaps because these swaps will shift the +// balances of our channels in ways we can't predict. package liquidity import ( @@ -15,6 +21,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" ) const ( @@ -65,6 +72,12 @@ type Config struct { // Lnd provides us with access to lnd's rpc servers. Lnd *lndclient.LndServices + // ListLoopOut returns all of the loop our swaps stored on disk. + ListLoopOut func() ([]*loopdb.LoopOut, error) + + // ListLoopIn returns all of the loop in swaps stored on disk. + ListLoopIn func() ([]*loopdb.LoopIn, error) + // Clock allows easy mocking of time in unit tests. Clock clock.Clock } @@ -184,19 +197,32 @@ func (m *Manager) SuggestSwaps(ctx context.Context) ( return nil, nil } - channels, err := m.cfg.Lnd.Client.ListChannels(ctx) + // Get the current server side restrictions. + outRestrictions, err := m.cfg.LoopOutRestrictions(ctx) if err != nil { return nil, err } - // Get the current server side restrictions. - outRestrictions, err := m.cfg.LoopOutRestrictions(ctx) + // List our current set of swaps so that we can determine which channels + // are already being utilized by swaps. Note that these calls may race + // with manual initiation of swaps. + loopOut, err := m.cfg.ListLoopOut() + if err != nil { + return nil, err + } + + loopIn, err := m.cfg.ListLoopIn() + if err != nil { + return nil, err + } + + eligible, err := m.getEligibleChannels(ctx, loopOut, loopIn) if err != nil { return nil, err } var suggestions []loop.OutRequest - for _, channel := range channels { + for _, channel := range eligible { channelID := lnwire.NewShortChanIDFromInt(channel.ChannelID) rule, ok := m.params.ChannelRules[channelID] if !ok { @@ -242,6 +268,94 @@ func makeLoopOutRequest(suggestion *LoopOutRecommendation) loop.OutRequest { } } +// getEligibleChannels takes lists of our existing loop out and in swaps, and +// gets a list of channels that are not currently being utilized for a swap. +// If an unrestricted swap is ongoing, we return an empty set of channels +// because we don't know which channels balances it will affect. +func (m *Manager) getEligibleChannels(ctx context.Context, + loopOut []*loopdb.LoopOut, loopIn []*loopdb.LoopIn) ( + []lndclient.ChannelInfo, error) { + + var ( + existingOut = make(map[lnwire.ShortChannelID]bool) + existingIn = make(map[route.Vertex]bool) + ) + + for _, out := range loopOut { + var ( + state = out.State().State + chanSet = out.Contract.OutgoingChanSet + ) + + // Skip completed swaps, they can't affect our channel balances. + if state.Type() != loopdb.StateTypePending { + continue + } + + if len(chanSet) == 0 { + log.Debugf("Ongoing unrestricted loop out: "+ + "%v, no suggestions at present", out.Hash) + + return nil, nil + } + + for _, id := range chanSet { + chanID := lnwire.NewShortChanIDFromInt(id) + existingOut[chanID] = true + } + } + + for _, in := range loopIn { + // Skip completed swaps, they can't affect our channel balances. + if in.State().State.Type() != loopdb.StateTypePending { + continue + } + + if in.Contract.LastHop == nil { + log.Debugf("Ongoing unrestricted loop in: "+ + "%v, no suggestions at present", in.Hash) + + return nil, nil + } + + existingIn[*in.Contract.LastHop] = true + } + + channels, err := m.cfg.Lnd.Client.ListChannels(ctx) + if err != nil { + return nil, err + } + + // Run through our set of channels and skip over any channels that + // are currently being utilized by a restricted swap (where restricted + // means that a loop out limited channels, or a loop in limited last + // hop). + var eligible []lndclient.ChannelInfo + for _, channel := range channels { + shortID := lnwire.NewShortChanIDFromInt(channel.ChannelID) + + if existingOut[shortID] { + log.Debugf("Channel: %v not eligible for "+ + "suggestions, ongoing loop out utilizing "+ + "channel", channel.ChannelID) + + continue + } + + if existingIn[channel.PubKeyBytes] { + log.Debugf("Channel: %v not eligible for "+ + "suggestions, ongoing loop in utilizing "+ + "peer", channel.ChannelID) + + continue + } + + eligible = append(eligible, channel) + } + + return eligible, nil +} + // ppmToSat takes an amount and a measure of parts per million for the amount // and returns the amount that the ppm represents. func ppmToSat(amount btcutil.Amount, ppm int) btcutil.Amount { diff --git a/liquidity/liquidity_test.go b/liquidity/liquidity_test.go index cba9ca3..b8413ec 100644 --- a/liquidity/liquidity_test.go +++ b/liquidity/liquidity_test.go @@ -11,6 +11,7 @@ import ( "github.com/lightninglabs/loop/test" "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" "github.com/stretchr/testify/require" ) @@ -20,8 +21,20 @@ var ( chanID1 = lnwire.NewShortChanIDFromInt(1) chanID2 = lnwire.NewShortChanIDFromInt(2) + peer1 = route.Vertex{1} + peer2 = route.Vertex{2} + channel1 = lndclient.ChannelInfo{ ChannelID: chanID1.ToUint64(), + PubKeyBytes: peer1, + LocalBalance: 10000, + RemoteBalance: 0, + Capacity: 10000, + } + + channel2 = lndclient.ChannelInfo{ + ChannelID: chanID2.ToUint64(), + PubKeyBytes: peer2, LocalBalance: 10000, RemoteBalance: 0, Capacity: 10000, @@ -47,6 +60,28 @@ var ( MaxPrepayAmount: defaultMaximumPrepay, SweepConfTarget: loop.DefaultSweepConfTarget, } + + // chan2Rec is the suggested swap for channel 2 when we use chanRule. + chan2Rec = loop.OutRequest{ + Amount: 7500, + OutgoingChanSet: loopdb.ChannelSet{chanID2.ToUint64()}, + MaxPrepayRoutingFee: prepayFee, + MaxSwapRoutingFee: routingFee, + MaxMinerFee: defaultMaximumMinerFee, + MaxSwapFee: swapFee, + MaxPrepayAmount: defaultMaximumPrepay, + SweepConfTarget: loop.DefaultSweepConfTarget, + } + + // chan1Out is a contract that uses channel 1, used to represent on + // disk swap using chan 1. + chan1Out = &loopdb.LoopOutContract{ + OutgoingChanSet: loopdb.ChannelSet( + []uint64{ + chanID1.ToUint64(), + }, + ), + } ) // newTestConfig creates a default test config. @@ -61,6 +96,12 @@ func newTestConfig() (*Config, *test.LndMockServices) { }, Lnd: &lnd.LndServices, Clock: clock.NewTestClock(testTime), + ListLoopOut: func() ([]*loopdb.LoopOut, error) { + return nil, nil + }, + ListLoopIn: func() ([]*loopdb.LoopIn, error) { + return nil, nil + }, }, lnd } @@ -110,6 +151,116 @@ func TestParameters(t *testing.T) { require.Equal(t, ErrZeroChannelID, err) } +// TestRestrictedSuggestions tests getting of swap suggestions when we have +// other in-flight swaps. We setup our manager with a set of channels and rules +// that require a loop out swap, focusing on the filtering our of channels that +// are in use for in-flight swaps. +func TestRestrictedSuggestions(t *testing.T) { + tests := []struct { + name string + channels []lndclient.ChannelInfo + loopOut []*loopdb.LoopOut + loopIn []*loopdb.LoopIn + expected []loop.OutRequest + }{ + { + name: "no existing swaps", + channels: []lndclient.ChannelInfo{ + channel1, + }, + loopOut: nil, + loopIn: nil, + expected: []loop.OutRequest{ + chan1Rec, + }, + }, + { + name: "unrestricted loop out", + channels: []lndclient.ChannelInfo{ + channel1, channel2, + }, + loopOut: []*loopdb.LoopOut{ + { + Contract: &loopdb.LoopOutContract{ + OutgoingChanSet: nil, + }, + }, + }, + expected: nil, + }, + { + name: "unrestricted loop in", + channels: []lndclient.ChannelInfo{ + channel1, channel2, + }, + loopIn: []*loopdb.LoopIn{ + { + Contract: &loopdb.LoopInContract{ + LastHop: nil, + }, + }, + }, + expected: nil, + }, + { + name: "restricted loop out", + channels: []lndclient.ChannelInfo{ + channel1, channel2, + }, + loopOut: []*loopdb.LoopOut{ + { + Contract: chan1Out, + }, + }, + expected: []loop.OutRequest{ + chan2Rec, + }, + }, + { + name: "restricted loop in", + channels: []lndclient.ChannelInfo{ + channel1, channel2, + }, + loopIn: []*loopdb.LoopIn{ + { + Contract: &loopdb.LoopInContract{ + LastHop: &peer2, + }, + }, + }, + expected: []loop.OutRequest{ + chan1Rec, + }, + }, + } + + for _, testCase := range tests { + testCase := testCase + + t.Run(testCase.name, func(t *testing.T) { + // Create a manager config which will return the test + // case's set of existing swaps. + cfg, lnd := newTestConfig() + cfg.ListLoopOut = func() ([]*loopdb.LoopOut, error) { + return testCase.loopOut, nil + } + cfg.ListLoopIn = func() ([]*loopdb.LoopIn, error) { + return testCase.loopIn, nil + } + + rules := map[lnwire.ShortChannelID]*ThresholdRule{ + chanID1: chanRule, + chanID2: chanRule, + } + + testSuggestSwaps( + t, cfg, lnd, testCase.channels, rules, + testCase.expected, + ) + }) + } +} + // TestSuggestSwaps tests getting of swap suggestions based on the rules set for // the liquidity manager and the current set of channel balances. func TestSuggestSwaps(t *testing.T) { diff --git a/liquidity/log.go b/liquidity/log.go new file mode 100644 index 0000000..39a3ebe --- /dev/null +++ b/liquidity/log.go @@ -0,0 +1,26 @@ +package liquidity + +import ( + "github.com/btcsuite/btclog" + "github.com/lightningnetwork/lnd/build" +) + +// Subsystem defines the sub system name of this package. +const Subsystem = "LQDY" + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/loopd/log.go b/loopd/log.go index 2abc58e..5dcf70e 100644 --- a/loopd/log.go +++ b/loopd/log.go @@ -4,6 +4,7 @@ import ( "github.com/btcsuite/btclog" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" + "github.com/lightninglabs/loop/liquidity" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/lsat" "github.com/lightningnetwork/lnd/build" @@ -21,6 +22,7 @@ func init() { addSubLogger("LNDC", lndclient.UseLogger) addSubLogger("STORE", loopdb.UseLogger) addSubLogger(lsat.Subsystem, lsat.UseLogger) + addSubLogger(liquidity.Subsystem, liquidity.UseLogger) } // addSubLogger is a helper method to conveniently create and register the diff --git a/loopd/utils.go b/loopd/utils.go index 8b74a66..f8512cb 100644 --- a/loopd/utils.go +++ b/loopd/utils.go @@ -47,8 +47,10 @@ func getLiquidityManager(client *loop.Client) *liquidity.Manager { outTerms.MinSwapAmount, outTerms.MaxSwapAmount, ), nil }, - Lnd: client.LndServices, - Clock: clock.NewDefaultClock(), + Lnd: client.LndServices, + Clock: clock.NewDefaultClock(), + ListLoopOut: client.Store.FetchLoopOutSwaps, + ListLoopIn: client.Store.FetchLoopInSwaps, } return liquidity.NewManager(mngrCfg)