loopd: abandon loop-ins

pull/661/head
Slyghtning 6 months ago
parent 0fbf253391
commit 5e91c446b8
No known key found for this signature in database
GPG Key ID: F82D456EA023C9BF

@ -17,6 +17,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/sweep"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/routing/route"
"google.golang.org/grpc/status"
)
@ -68,6 +69,11 @@ type Client struct {
started uint32 // To be used atomically.
errChan chan error
// abandonChans allows for accessing a swap's abandon channel by
// providing its swap hash. This map is used to look up the abandon
// channel of a swap if the client requests to abandon it.
abandonChans map[lntypes.Hash]chan struct{}
lndServices *lndclient.LndServices
sweeper *sweep.Sweeper
executor *executor
@ -179,6 +185,7 @@ func NewClient(dbDir string, loopDB loopdb.SwapStore,
sweeper: sweeper,
executor: executor,
resumeReady: make(chan struct{}),
abandonChans: make(map[lntypes.Hash]chan struct{}),
}
cleanup := func() {
@ -317,10 +324,10 @@ func (s *Client) Run(ctx context.Context, statusChan chan<- SwapInfo) error {
}()
// Main event loop.
err = s.executor.run(mainCtx, statusChan)
err = s.executor.run(mainCtx, statusChan, s.abandonChans)
// Consider canceled as happy flow.
if err == context.Canceled {
if errors.Is(err, context.Canceled) {
err = nil
}
@ -578,6 +585,10 @@ func (s *Client) LoopIn(globalCtx context.Context,
}
swap := initResult.swap
s.executor.Lock()
s.abandonChans[swap.hash] = swap.abandonChan
s.executor.Unlock()
// Post swap to the main loop.
s.executor.initiateSwap(globalCtx, swap)
@ -753,3 +764,26 @@ func (s *Client) Probe(ctx context.Context, req *ProbeRequest) error {
req.RouteHints,
)
}
// AbandonSwap sends a signal on the abandon channel of the swap identified by
// the passed swap hash. This will cause the swap to abandon itself.
func (s *Client) AbandonSwap(ctx context.Context,
req *AbandonSwapRequest) error {
if req == nil {
return errors.New("no request provided")
}
s.executor.Lock()
defer s.executor.Unlock()
select {
case s.abandonChans[req.SwapHash] <- struct{}{}:
case <-ctx.Done():
return ctx.Err()
default:
// This is to avoid writing to a full channel.
}
return nil
}

@ -13,6 +13,7 @@ import (
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweep"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/queue"
)
@ -46,6 +47,8 @@ type executor struct {
currentHeight uint32
ready chan struct{}
sync.Mutex
executorConfig
}
@ -61,7 +64,8 @@ func newExecutor(cfg *executorConfig) *executor {
// run starts the executor event loop. It accepts and executes new swaps,
// providing them with required config data.
func (s *executor) run(mainCtx context.Context,
statusChan chan<- SwapInfo) error {
statusChan chan<- SwapInfo,
abandonChans map[lntypes.Hash]chan struct{}) error {
var (
err error
@ -167,6 +171,15 @@ func (s *executor) run(mainCtx context.Context,
log.Errorf("Execute error: %v", err)
}
// If a loop-in ended we have to remove its
// abandon channel from our abandonChans map
// since the swap finalized.
if swap, ok := newSwap.(*loopInSwap); ok {
s.Lock()
delete(abandonChans, swap.hash)
s.Unlock()
}
select {
case swapDoneChan <- swapID:
case <-mainCtx.Done():

@ -394,3 +394,9 @@ type ProbeRequest struct {
// Optional hop hints.
RouteHints [][]zpay32.HopHint
}
// AbandonSwapRequest specifies the swap to abandon. It is identified by its
// swap hash.
type AbandonSwapRequest struct {
SwapHash lntypes.Hash
}

@ -288,6 +288,9 @@ func (s *swapClientServer) marshallSwap(loopSwap *loop.SwapInfo) (
case loopdb.StateFailIncorrectHtlcAmt:
failureReason = clientrpc.FailureReason_FAILURE_REASON_INCORRECT_AMOUNT
case loopdb.StateFailAbandoned:
failureReason = clientrpc.FailureReason_FAILURE_REASON_ABANDONED
default:
return nil, fmt.Errorf("unknown swap state: %v", loopSwap.State)
}
@ -508,6 +511,49 @@ func (s *swapClientServer) SwapInfo(_ context.Context,
return s.marshallSwap(&swp)
}
// AbandonSwap requests the server to abandon a swap with the given hash.
func (s *swapClientServer) AbandonSwap(ctx context.Context,
req *clientrpc.AbandonSwapRequest) (*clientrpc.AbandonSwapResponse,
error) {
if !req.IKnowWhatIAmDoing {
return nil, fmt.Errorf("please read the AbandonSwap API " +
"documentation")
}
swapHash, err := lntypes.MakeHash(req.Id)
if err != nil {
return nil, fmt.Errorf("error parsing swap hash: %v", err)
}
s.swapsLock.Lock()
swap, ok := s.swaps[swapHash]
s.swapsLock.Unlock()
if !ok {
return nil, fmt.Errorf("swap with hash %s not found", req.Id)
}
if swap.SwapType.IsOut() {
return nil, fmt.Errorf("abandoning loop out swaps is not " +
"supported yet")
}
// If the swap is in a final state, we cannot abandon it.
if swap.State.IsFinal() {
return nil, fmt.Errorf("cannot abandon swap in final state, "+
"state = %s, hash = %s", swap.State.String(), swapHash)
}
err = s.impl.AbandonSwap(ctx, &loop.AbandonSwapRequest{
SwapHash: swapHash,
})
if err != nil {
return nil, fmt.Errorf("error abandoning swap: %v", err)
}
return &clientrpc.AbandonSwapResponse{}, nil
}
// LoopOutTerms returns the terms that the server enforces for loop out swaps.
func (s *swapClientServer) LoopOutTerms(ctx context.Context,
_ *clientrpc.TermsRequest) (*clientrpc.OutTermsResponse, error) {

@ -48,6 +48,10 @@ var (
// TimeoutTxConfTarget defines the confirmation target for the loop in
// timeout tx.
TimeoutTxConfTarget = int32(2)
// ErrSwapFinalized is returned when a to be executed swap is already in
// a final state.
ErrSwapFinalized = errors.New("swap is in a final state")
)
// loopInSwap contains all the in-memory state related to a pending loop in
@ -70,6 +74,8 @@ type loopInSwap struct {
timeoutAddr btcutil.Address
abandonChan chan struct{}
wg sync.WaitGroup
}
@ -308,6 +314,8 @@ func newLoopInSwap(globalCtx context.Context, cfg *swapConfig,
swap.log.Infof("Server message: %v", swapResp.serverMessage)
}
swap.abandonChan = make(chan struct{}, 1)
return &loopInInitResult{
swap: swap,
serverMessage: swapResp.serverMessage,
@ -518,6 +526,11 @@ func (s *loopInSwap) execute(mainCtx context.Context,
// error occurs.
err = s.executeSwap(mainCtx)
// Stop the execution if the swap has been abandoned.
if err != nil && s.state == loopdb.StateFailAbandoned {
return err
}
// Sanity check. If there is no error, the swap must be in a final
// state.
if err == nil && s.state.Type() == loopdb.StateTypePending {
@ -553,6 +566,11 @@ func (s *loopInSwap) execute(mainCtx context.Context,
func (s *loopInSwap) executeSwap(globalCtx context.Context) error {
var err error
// If the swap is already in a final state, we can return immediately.
if s.state.IsFinal() {
return ErrSwapFinalized
}
// For loop in, the client takes the first step by publishing the
// on-chain htlc. Only do this if we haven't already done so in a
// previous run.
@ -688,6 +706,11 @@ func (s *loopInSwap) waitForHtlcConf(globalCtx context.Context) (
case notification := <-s.blockEpochChan:
s.height = notification.(int32)
// If the client requested the swap to be abandoned, we override
// the status in the database.
case <-s.abandonChan:
return nil, s.setStateAbandoned(ctx)
// Cancel.
case <-globalCtx.Done():
return nil, globalCtx.Err()
@ -840,6 +863,11 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
htlcKeyRevealed := false
for !htlcSpend || !invoiceFinalized {
select {
// If the client requested the swap to be abandoned, we override
// the status in the database.
case <-s.abandonChan:
return s.setStateAbandoned(ctx)
// Spend notification error.
case err := <-spendErr:
return err
@ -1062,6 +1090,31 @@ func (s *loopInSwap) publishTimeoutTx(ctx context.Context,
return fee, nil
}
// setStateAbandoned stores the abandoned state and announces it. It also
// cancels the swap invoice so the server can't settle it.
func (s *loopInSwap) setStateAbandoned(ctx context.Context) error {
s.log.Infof("Abandoning swap %v...", s.hash)
if !s.state.IsPending() {
return fmt.Errorf("cannot abandon swap in state %v", s.state)
}
s.setState(loopdb.StateFailAbandoned)
err := s.persistAndAnnounceState(ctx)
if err != nil {
return err
}
// If the invoice is already settled or canceled, this is a nop.
_ = s.lnd.Invoices.CancelInvoice(ctx, s.hash)
return fmt.Errorf("swap hash "+
"abandoned by client, "+
"swap ID: %v, %v",
s.hash, err)
}
// persistAndAnnounceState updates the swap state on disk and sends out an
// update notification.
func (s *loopInSwap) persistAndAnnounceState(ctx context.Context) error {

@ -11,6 +11,12 @@ const (
TypeOut
)
// IsOut returns true if the swap is a loop out swap, false if it is a loop in
// swap.
func (t Type) IsOut() bool {
return t == TypeOut
}
func (t Type) String() string {
switch t {
case TypeIn:

Loading…
Cancel
Save