From 5e91c446b8d71cfce4830756b9271bfe9bb34ace Mon Sep 17 00:00:00 2001 From: Slyghtning Date: Mon, 13 Nov 2023 14:50:10 +0100 Subject: [PATCH] loopd: abandon loop-ins --- client.go | 38 +++++++++++++++++++++++++-- executor.go | 15 ++++++++++- interface.go | 6 +++++ loopd/swapclient_server.go | 46 +++++++++++++++++++++++++++++++++ loopin.go | 53 ++++++++++++++++++++++++++++++++++++++ swap/type.go | 6 +++++ 6 files changed, 161 insertions(+), 3 deletions(-) diff --git a/client.go b/client.go index 6e78b6b..ed73349 100644 --- a/client.go +++ b/client.go @@ -17,6 +17,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/sweep" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/routing/route" "google.golang.org/grpc/status" ) @@ -68,6 +69,11 @@ type Client struct { started uint32 // To be used atomically. errChan chan error + // abandonChans allows for accessing a swap's abandon channel by + // providing its swap hash. This map is used to look up the abandon + // channel of a swap if the client requests to abandon it. + abandonChans map[lntypes.Hash]chan struct{} + lndServices *lndclient.LndServices sweeper *sweep.Sweeper executor *executor @@ -179,6 +185,7 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore, sweeper: sweeper, executor: executor, resumeReady: make(chan struct{}), + abandonChans: make(map[lntypes.Hash]chan struct{}), } cleanup := func() { @@ -317,10 +324,10 @@ func (s *Client) Run(ctx context.Context, statusChan chan<- SwapInfo) error { }() // Main event loop. - err = s.executor.run(mainCtx, statusChan) + err = s.executor.run(mainCtx, statusChan, s.abandonChans) // Consider canceled as happy flow. - if err == context.Canceled { + if errors.Is(err, context.Canceled) { err = nil } @@ -578,6 +585,10 @@ func (s *Client) LoopIn(globalCtx context.Context, } swap := initResult.swap + s.executor.Lock() + s.abandonChans[swap.hash] = swap.abandonChan + s.executor.Unlock() + // Post swap to the main loop. s.executor.initiateSwap(globalCtx, swap) @@ -753,3 +764,26 @@ func (s *Client) Probe(ctx context.Context, req *ProbeRequest) error { req.RouteHints, ) } + +// AbandonSwap sends a signal on the abandon channel of the swap identified by +// the passed swap hash. This will cause the swap to abandon itself. +func (s *Client) AbandonSwap(ctx context.Context, + req *AbandonSwapRequest) error { + + if req == nil { + return errors.New("no request provided") + } + + s.executor.Lock() + defer s.executor.Unlock() + + select { + case s.abandonChans[req.SwapHash] <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + default: + // This is to avoid writing to a full channel. + } + + return nil +} diff --git a/executor.go b/executor.go index 2775ea0..1092183 100644 --- a/executor.go +++ b/executor.go @@ -13,6 +13,7 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/sweep" + "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/queue" ) @@ -46,6 +47,8 @@ type executor struct { currentHeight uint32 ready chan struct{} + sync.Mutex + executorConfig } @@ -61,7 +64,8 @@ func newExecutor(cfg *executorConfig) *executor { // run starts the executor event loop. It accepts and executes new swaps, // providing them with required config data. func (s *executor) run(mainCtx context.Context, - statusChan chan<- SwapInfo) error { + statusChan chan<- SwapInfo, + abandonChans map[lntypes.Hash]chan struct{}) error { var ( err error @@ -167,6 +171,15 @@ func (s *executor) run(mainCtx context.Context, log.Errorf("Execute error: %v", err) } + // If a loop-in ended we have to remove its + // abandon channel from our abandonChans map + // since the swap finalized. + if swap, ok := newSwap.(*loopInSwap); ok { + s.Lock() + delete(abandonChans, swap.hash) + s.Unlock() + } + select { case swapDoneChan <- swapID: case <-mainCtx.Done(): diff --git a/interface.go b/interface.go index 3476fd7..2d1d191 100644 --- a/interface.go +++ b/interface.go @@ -394,3 +394,9 @@ type ProbeRequest struct { // Optional hop hints. RouteHints [][]zpay32.HopHint } + +// AbandonSwapRequest specifies the swap to abandon. It is identified by its +// swap hash. +type AbandonSwapRequest struct { + SwapHash lntypes.Hash +} diff --git a/loopd/swapclient_server.go b/loopd/swapclient_server.go index c42249f..be135a7 100644 --- a/loopd/swapclient_server.go +++ b/loopd/swapclient_server.go @@ -288,6 +288,9 @@ func (s *swapClientServer) marshallSwap(loopSwap *loop.SwapInfo) ( case loopdb.StateFailIncorrectHtlcAmt: failureReason = clientrpc.FailureReason_FAILURE_REASON_INCORRECT_AMOUNT + case loopdb.StateFailAbandoned: + failureReason = clientrpc.FailureReason_FAILURE_REASON_ABANDONED + default: return nil, fmt.Errorf("unknown swap state: %v", loopSwap.State) } @@ -508,6 +511,49 @@ func (s *swapClientServer) SwapInfo(_ context.Context, return s.marshallSwap(&swp) } +// AbandonSwap requests the server to abandon a swap with the given hash. +func (s *swapClientServer) AbandonSwap(ctx context.Context, + req *clientrpc.AbandonSwapRequest) (*clientrpc.AbandonSwapResponse, + error) { + + if !req.IKnowWhatIAmDoing { + return nil, fmt.Errorf("please read the AbandonSwap API " + + "documentation") + } + + swapHash, err := lntypes.MakeHash(req.Id) + if err != nil { + return nil, fmt.Errorf("error parsing swap hash: %v", err) + } + + s.swapsLock.Lock() + swap, ok := s.swaps[swapHash] + s.swapsLock.Unlock() + if !ok { + return nil, fmt.Errorf("swap with hash %s not found", req.Id) + } + + if swap.SwapType.IsOut() { + return nil, fmt.Errorf("abandoning loop out swaps is not " + + "supported yet") + } + + // If the swap is in a final state, we cannot abandon it. + if swap.State.IsFinal() { + return nil, fmt.Errorf("cannot abandon swap in final state, "+ + "state = %s, hash = %s", swap.State.String(), swapHash) + } + + err = s.impl.AbandonSwap(ctx, &loop.AbandonSwapRequest{ + SwapHash: swapHash, + }) + if err != nil { + return nil, fmt.Errorf("error abandoning swap: %v", err) + } + + return &clientrpc.AbandonSwapResponse{}, nil +} + // LoopOutTerms returns the terms that the server enforces for loop out swaps. func (s *swapClientServer) LoopOutTerms(ctx context.Context, _ *clientrpc.TermsRequest) (*clientrpc.OutTermsResponse, error) { diff --git a/loopin.go b/loopin.go index 2bd2c32..275565b 100644 --- a/loopin.go +++ b/loopin.go @@ -48,6 +48,10 @@ var ( // TimeoutTxConfTarget defines the confirmation target for the loop in // timeout tx. TimeoutTxConfTarget = int32(2) + + // ErrSwapFinalized is returned when a to be executed swap is already in + // a final state. + ErrSwapFinalized = errors.New("swap is in a final state") ) // loopInSwap contains all the in-memory state related to a pending loop in @@ -70,6 +74,8 @@ type loopInSwap struct { timeoutAddr btcutil.Address + abandonChan chan struct{} + wg sync.WaitGroup } @@ -308,6 +314,8 @@ func newLoopInSwap(globalCtx context.Context, cfg *swapConfig, swap.log.Infof("Server message: %v", swapResp.serverMessage) } + swap.abandonChan = make(chan struct{}, 1) + return &loopInInitResult{ swap: swap, serverMessage: swapResp.serverMessage, @@ -518,6 +526,11 @@ func (s *loopInSwap) execute(mainCtx context.Context, // error occurs. err = s.executeSwap(mainCtx) + // Stop the execution if the swap has been abandoned. + if err != nil && s.state == loopdb.StateFailAbandoned { + return err + } + // Sanity check. If there is no error, the swap must be in a final // state. if err == nil && s.state.Type() == loopdb.StateTypePending { @@ -553,6 +566,11 @@ func (s *loopInSwap) execute(mainCtx context.Context, func (s *loopInSwap) executeSwap(globalCtx context.Context) error { var err error + // If the swap is already in a final state, we can return immediately. + if s.state.IsFinal() { + return ErrSwapFinalized + } + // For loop in, the client takes the first step by publishing the // on-chain htlc. Only do this if we haven't already done so in a // previous run. @@ -688,6 +706,11 @@ func (s *loopInSwap) waitForHtlcConf(globalCtx context.Context) ( case notification := <-s.blockEpochChan: s.height = notification.(int32) + // If the client requested the swap to be abandoned, we override + // the status in the database. + case <-s.abandonChan: + return nil, s.setStateAbandoned(ctx) + // Cancel. case <-globalCtx.Done(): return nil, globalCtx.Err() @@ -840,6 +863,11 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context, htlcKeyRevealed := false for !htlcSpend || !invoiceFinalized { select { + // If the client requested the swap to be abandoned, we override + // the status in the database. + case <-s.abandonChan: + return s.setStateAbandoned(ctx) + // Spend notification error. case err := <-spendErr: return err @@ -1062,6 +1090,31 @@ func (s *loopInSwap) publishTimeoutTx(ctx context.Context, return fee, nil } +// setStateAbandoned stores the abandoned state and announces it. It also +// cancels the swap invoice so the server can't settle it. +func (s *loopInSwap) setStateAbandoned(ctx context.Context) error { + s.log.Infof("Abandoning swap %v...", s.hash) + + if !s.state.IsPending() { + return fmt.Errorf("cannot abandon swap in state %v", s.state) + } + + s.setState(loopdb.StateFailAbandoned) + + err := s.persistAndAnnounceState(ctx) + if err != nil { + return err + } + + // If the invoice is already settled or canceled, this is a nop. + _ = s.lnd.Invoices.CancelInvoice(ctx, s.hash) + + return fmt.Errorf("swap hash "+ + "abandoned by client, "+ + "swap ID: %v, %v", + s.hash, err) +} + // persistAndAnnounceState updates the swap state on disk and sends out an // update notification. func (s *loopInSwap) persistAndAnnounceState(ctx context.Context) error { diff --git a/swap/type.go b/swap/type.go index 02afdc3..20942c7 100644 --- a/swap/type.go +++ b/swap/type.go @@ -11,6 +11,12 @@ const ( TypeOut ) +// IsOut returns true if the swap is a loop out swap, false if it is a loop in +// swap. +func (t Type) IsOut() bool { + return t == TypeOut +} + func (t Type) String() string { switch t { case TypeIn: