diff --git a/staticaddr/manager.go b/staticaddr/address/manager.go similarity index 75% rename from staticaddr/manager.go rename to staticaddr/address/manager.go index a963c86..7100977 100644 --- a/staticaddr/manager.go +++ b/staticaddr/address/manager.go @@ -1,4 +1,4 @@ -package staticaddr +package address import ( "bytes" @@ -10,8 +10,9 @@ import ( "github.com/btcsuite/btcd/btcec/v2/schnorr" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btclog" "github.com/lightninglabs/lndclient" - "github.com/lightninglabs/loop" + "github.com/lightninglabs/loop/staticaddr" "github.com/lightninglabs/loop/staticaddr/script" "github.com/lightninglabs/loop/swap" staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc" @@ -20,18 +21,22 @@ import ( "github.com/lightningnetwork/lnd/lnwallet" ) +var ( + log btclog.Logger +) + // ManagerConfig holds the configuration for the address manager. type ManagerConfig struct { // AddressClient is the client that communicates with the loop server // to manage static addresses. AddressClient staticaddressrpc.StaticAddressServerClient - // SwapClient provides loop rpc functionality. - SwapClient *loop.Client + // FetchL402 is the function used to fetch the l402 token. + FetchL402 func(context.Context) error // Store is the database store that is used to store static address // related records. - Store AddressStore + Store Store // WalletKit is the wallet client that is used to derive new keys from // lnd's wallet. @@ -46,30 +51,20 @@ type ManagerConfig struct { type Manager struct { cfg *ManagerConfig - initChan chan struct{} - sync.Mutex } -// NewAddressManager creates a new address manager. -func NewAddressManager(cfg *ManagerConfig) *Manager { +// NewManager creates a new address manager. +func NewManager(cfg *ManagerConfig) *Manager { + log = staticaddr.GetLogger() return &Manager{ - cfg: cfg, - initChan: make(chan struct{}), + cfg: cfg, } } // Run runs the address manager. func (m *Manager) Run(ctx context.Context) error { - log.Debugf("Starting address manager.") - defer log.Debugf("Address manager stopped.") - - // Communicate to the caller that the address manager has completed its - // initialization. - close(m.initChan) - <-ctx.Done() - return nil } @@ -99,7 +94,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, // We are fetching a new L402 token from the server. There is one static // address per L402 token allowed. - err = m.cfg.SwapClient.Server.FetchL402(ctx) + err = m.cfg.FetchL402(ctx) if err != nil { return nil, err } @@ -113,7 +108,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, // Send our clientPubKey to the server and wait for the server to // respond with he serverPubKey and the static address CSV expiry. - protocolVersion := CurrentRPCProtocolVersion() + protocolVersion := staticaddr.CurrentRPCProtocolVersion() resp, err := m.cfg.AddressClient.ServerNewAddress( ctx, &staticaddressrpc.ServerNewAddressRequest{ ProtocolVersion: protocolVersion, @@ -146,7 +141,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, // Create the static address from the parameters the server provided and // store all parameters in the database. - addrParams := &AddressParameters{ + addrParams := &Parameters{ ClientPubkey: clientPubKey.PubKey, ServerPubkey: serverPubKey, PkScript: pkScript, @@ -155,7 +150,9 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, Family: clientPubKey.Family, Index: clientPubKey.Index, }, - ProtocolVersion: AddressProtocolVersion(protocolVersion), + ProtocolVersion: staticaddr.AddressProtocolVersion( + protocolVersion, + ), } err = m.cfg.Store.CreateStaticAddress(ctx, addrParams) if err != nil { @@ -172,7 +169,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot, return nil, err } - log.Infof("imported static address taproot script to lnd wallet: %v", + log.Infof("Imported static address taproot script to lnd wallet: %v", addr) return m.getTaprootAddress( @@ -197,12 +194,6 @@ func (m *Manager) getTaprootAddress(clientPubkey, ) } -// WaitInitComplete waits until the address manager has completed its setup. -func (m *Manager) WaitInitComplete() { - defer log.Debugf("Address manager initiation complete.") - <-m.initChan -} - // ListUnspentRaw returns a list of utxos at the static address. func (m *Manager) ListUnspentRaw(ctx context.Context, minConfs, maxConfs int32) (*btcutil.AddressTaproot, []*lnwallet.Utxo, error) { @@ -213,7 +204,7 @@ func (m *Manager) ListUnspentRaw(ctx context.Context, minConfs, return nil, nil, err case len(addresses) == 0: - return nil, nil, fmt.Errorf("no address found") + return nil, nil, nil case len(addresses) > 1: return nil, nil, fmt.Errorf("more than one address found") @@ -249,3 +240,52 @@ func (m *Manager) ListUnspentRaw(ctx context.Context, minConfs, return taprootAddress, filteredUtxos, nil } + +// GetStaticAddressParameters returns the parameters of the static address. +func (m *Manager) GetStaticAddressParameters(ctx context.Context) (*Parameters, + error) { + + params, err := m.cfg.Store.GetAllStaticAddresses(ctx) + if err != nil { + return nil, err + } + + if len(params) == 0 { + return nil, fmt.Errorf("no static address parameters found") + } + + return params[0], nil +} + +// GetStaticAddress returns a taproot address for the given client and server +// public keys and expiry. +func (m *Manager) GetStaticAddress(ctx context.Context) (*script.StaticAddress, + error) { + + params, err := m.GetStaticAddressParameters(ctx) + if err != nil { + return nil, err + } + + address, err := script.NewStaticAddress( + input.MuSig2Version100RC2, int64(params.Expiry), + params.ClientPubkey, params.ServerPubkey, + ) + if err != nil { + return nil, err + } + + return address, nil +} + +// ListUnspent returns a list of utxos at the static address. +func (m *Manager) ListUnspent(ctx context.Context, minConfs, + maxConfs int32) ([]*lnwallet.Utxo, error) { + + _, utxos, err := m.ListUnspentRaw(ctx, minConfs, maxConfs) + if err != nil { + return nil, err + } + + return utxos, nil +} diff --git a/staticaddr/deposit/actions.go b/staticaddr/deposit/actions.go new file mode 100644 index 0000000..d40eefa --- /dev/null +++ b/staticaddr/deposit/actions.go @@ -0,0 +1,148 @@ +package deposit + +import ( + "errors" + "fmt" + "strings" + + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/lndclient" + "github.com/lightninglabs/loop/fsm" + "github.com/lightninglabs/loop/staticaddr/script" +) + +const ( + defaultConfTarget = 3 +) + +// PublishDepositExpirySweepAction creates and publishes the timeout transaction +// that spends the deposit from the static address timeout leaf to the +// predefined timeout sweep pkscript. +func (f *FSM) PublishDepositExpirySweepAction(_ fsm.EventContext) fsm.EventType { + msgTx := wire.NewMsgTx(2) + + params, err := f.cfg.AddressManager.GetStaticAddressParameters(f.ctx) + if err != nil { + return fsm.OnError + } + + // Add the deposit outpoint as input to the transaction. + msgTx.AddTxIn(&wire.TxIn{ + PreviousOutPoint: f.deposit.OutPoint, + Sequence: params.Expiry, + SignatureScript: nil, + }) + + // Estimate the fee rate of an expiry spend transaction. + feeRateEstimator, err := f.cfg.WalletKit.EstimateFeeRate( + f.ctx, defaultConfTarget, + ) + if err != nil { + return f.HandleError(fmt.Errorf("timeout sweep fee "+ + "estimation failed: %v", err)) + } + + weight := script.ExpirySpendWeight() + + fee := feeRateEstimator.FeeForWeight(weight) + + // We cap the fee at 20% of the deposit value. + if fee > f.deposit.Value/5 { + return f.HandleError(errors.New("fee is greater than 20% of " + + "the deposit value")) + } + + output := &wire.TxOut{ + Value: int64(f.deposit.Value - fee), + PkScript: f.deposit.TimeOutSweepPkScript, + } + msgTx.AddTxOut(output) + + txOut := &wire.TxOut{ + Value: int64(f.deposit.Value), + PkScript: params.PkScript, + } + + prevOut := []*wire.TxOut{txOut} + + signDesc, err := f.SignDescriptor() + if err != nil { + return f.HandleError(err) + } + + rawSigs, err := f.cfg.Signer.SignOutputRaw( + f.ctx, msgTx, []*lndclient.SignDescriptor{signDesc}, prevOut, + ) + if err != nil { + return f.HandleError(err) + } + + address, err := f.cfg.AddressManager.GetStaticAddress(f.ctx) + if err != nil { + return f.HandleError(err) + } + + sig := rawSigs[0] + msgTx.TxIn[0].Witness, err = address.GenTimeoutWitness(sig) + if err != nil { + return f.HandleError(err) + } + + txLabel := fmt.Sprintf("timeout sweep for deposit %v", + f.deposit.OutPoint) + + err = f.cfg.WalletKit.PublishTransaction(f.ctx, msgTx, txLabel) + if err != nil { + if !strings.Contains(err.Error(), "output already spent") { + log.Errorf("%v: %v", txLabel, err) + f.LastActionError = err + return fsm.OnError + } + } else { + f.Debugf("published timeout sweep with txid: %v", + msgTx.TxHash()) + } + + return OnExpiryPublished +} + +// WaitForExpirySweepAction waits for a sufficient number of confirmations +// before a timeout sweep is considered successful. +func (f *FSM) WaitForExpirySweepAction(_ fsm.EventContext) fsm.EventType { + spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll + f.ctx, nil, f.deposit.TimeOutSweepPkScript, defaultConfTarget, + int32(f.deposit.ConfirmationHeight), + ) + if err != nil { + return f.HandleError(err) + } + + select { + case err := <-errSpendChan: + log.Debugf("error while sweeping expired deposit: %v", err) + return fsm.OnError + + case confirmedTx := <-spendChan: + f.deposit.ExpirySweepTxid = confirmedTx.Tx.TxHash() + return OnExpirySwept + + case <-f.ctx.Done(): + return fsm.OnError + } +} + +// SweptExpiredDepositAction is the final action of the FSM. It signals to the +// manager that the deposit has been swept and the FSM can be removed. It also +// ends the state machine main loop by cancelling its context. +func (f *FSM) SweptExpiredDepositAction(_ fsm.EventContext) fsm.EventType { + select { + case <-f.ctx.Done(): + return fsm.OnError + + default: + f.finalizedDepositChan <- f.deposit.OutPoint + f.ctx.Done() + } + + return fsm.NoOp +} diff --git a/staticaddr/deposit/deposit.go b/staticaddr/deposit/deposit.go new file mode 100644 index 0000000..67ec9e7 --- /dev/null +++ b/staticaddr/deposit/deposit.go @@ -0,0 +1,107 @@ +package deposit + +import ( + "crypto/rand" + "fmt" + "sync" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/loop/fsm" +) + +// ID is a unique identifier for a deposit. +type ID [IdLength]byte + +// FromByteSlice creates a deposit id from a byte slice. +func (r *ID) FromByteSlice(b []byte) error { + if len(b) != IdLength { + return fmt.Errorf("deposit id must be 32 bytes, got %d, %x", + len(b), b) + } + + copy(r[:], b) + + return nil +} + +// Deposit bundles an utxo at a static address together with manager-relevant +// data. +type Deposit struct { + // ID is the unique identifier of the deposit. + ID ID + + // State is the current state of the deposit. + State fsm.StateType + + // The outpoint of the deposit. + wire.OutPoint + + // Value is the amount of the deposit. + Value btcutil.Amount + + // ConfirmationHeight is the absolute height at which the deposit was + // first confirmed. + ConfirmationHeight int64 + + // TimeOutSweepPkScript is the pk script that is used to sweep the + // deposit to after it is expired. + TimeOutSweepPkScript []byte + + // ExpirySweepTxid is the transaction id of the expiry sweep. + ExpirySweepTxid chainhash.Hash + + sync.Mutex +} + +// IsInPendingState returns true if the deposit is pending. +func (d *Deposit) IsInPendingState() bool { + d.Lock() + defer d.Unlock() + + return !d.IsInFinalState() +} + +// IsInFinalState returns true if the deposit is final. +func (d *Deposit) IsInFinalState() bool { + d.Lock() + defer d.Unlock() + + return d.State == Expired || d.State == Failed +} + +func (d *Deposit) isExpired(currentHeight, expiry uint32) bool { + d.Lock() + defer d.Unlock() + + return currentHeight >= uint32(d.ConfirmationHeight)+expiry +} + +func (d *Deposit) getState() fsm.StateType { + d.Lock() + defer d.Unlock() + + return d.State +} + +func (d *Deposit) setState(state fsm.StateType) { + d.Lock() + defer d.Unlock() + + d.State = state +} + +func (d *Deposit) isInState(state fsm.StateType) bool { + d.Lock() + defer d.Unlock() + + return d.State == state +} + +// GetRandomDepositID generates a random deposit ID. +func GetRandomDepositID() (ID, error) { + var id ID + _, err := rand.Read(id[:]) + return id, err +} diff --git a/staticaddr/deposit/fsm.go b/staticaddr/deposit/fsm.go new file mode 100644 index 0000000..cf0e414 --- /dev/null +++ b/staticaddr/deposit/fsm.go @@ -0,0 +1,305 @@ +package deposit + +import ( + "context" + "errors" + "fmt" + + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/lndclient" + "github.com/lightninglabs/loop/fsm" + "github.com/lightninglabs/loop/staticaddr" + "github.com/lightninglabs/loop/staticaddr/address" + "github.com/lightninglabs/loop/staticaddr/script" + "github.com/lightningnetwork/lnd/input" + "github.com/lightningnetwork/lnd/keychain" +) + +const ( + DefaultObserverSize = 20 +) + +var ( + ErrProtocolVersionNotSupported = errors.New("protocol version not " + + "supported") +) + +// States. +var ( + Deposited = fsm.StateType("Deposited") + + PublishExpiredDeposit = fsm.StateType("PublishExpiredDeposit") + + WaitForExpirySweep = fsm.StateType("WaitForExpirySweep") + + Expired = fsm.StateType("Expired") + + Failed = fsm.StateType("Failed") +) + +// Events. +var ( + OnStart = fsm.EventType("OnStart") + OnExpiry = fsm.EventType("OnExpiry") + OnExpiryPublished = fsm.EventType("OnExpiryPublished") + OnExpirySwept = fsm.EventType("OnExpirySwept") + OnRecover = fsm.EventType("OnRecover") +) + +// FSM is the state machine that handles the instant out. +type FSM struct { + *fsm.StateMachine + + cfg *ManagerConfig + + deposit *Deposit + + params *address.Parameters + + address *script.StaticAddress + + ctx context.Context + + blockNtfnChan chan uint32 + + finalizedDepositChan chan wire.OutPoint +} + +// NewFSM creates a new state machine that can action on all static address +// feature requests. +func NewFSM(ctx context.Context, deposit *Deposit, cfg *ManagerConfig, + finalizedDepositChan chan wire.OutPoint, + recoverStateMachine bool) (*FSM, error) { + + params, err := cfg.AddressManager.GetStaticAddressParameters(ctx) + if err != nil { + return nil, fmt.Errorf("unable to get static address "+ + "parameters: %v", err) + } + + address, err := cfg.AddressManager.GetStaticAddress(ctx) + if err != nil { + return nil, fmt.Errorf("unable to get static address: %v", err) + } + + depoFsm := &FSM{ + cfg: cfg, + deposit: deposit, + params: params, + address: address, + ctx: ctx, + blockNtfnChan: make(chan uint32), + finalizedDepositChan: finalizedDepositChan, + } + + depositStates := depoFsm.DepositStatesV0() + switch params.ProtocolVersion { + case staticaddr.ProtocolVersion_V0: + + default: + return nil, ErrProtocolVersionNotSupported + } + + if recoverStateMachine { + depoFsm.StateMachine = fsm.NewStateMachineWithState( + depositStates, deposit.State, + DefaultObserverSize, + ) + } else { + depoFsm.StateMachine = fsm.NewStateMachine( + depositStates, DefaultObserverSize, + ) + } + + depoFsm.ActionEntryFunc = depoFsm.updateDeposit + + go func() { + for { + select { + case currentHeight := <-depoFsm.blockNtfnChan: + err := depoFsm.handleBlockNotification( + currentHeight, + ) + if err != nil { + log.Errorf("error handling block "+ + "notification: %v", err) + } + + case <-ctx.Done(): + return + } + } + }() + + return depoFsm, nil +} + +// handleBlockNotification inspects the current block height and sends the +// OnExpiry event to publish the expiry sweep transaction if the deposit timed +// out, or it republishes the expiry sweep transaction if it was not yet swept. +func (f *FSM) handleBlockNotification(currentHeight uint32) error { + params, err := f.cfg.AddressManager.GetStaticAddressParameters(f.ctx) + if err != nil { + return err + } + + // If the deposit is expired but not yet sufficiently confirmed, we + // republish the expiry sweep transaction. + if f.deposit.isExpired(currentHeight, params.Expiry) { + if f.deposit.isInState(WaitForExpirySweep) { + f.PublishDepositExpirySweepAction(nil) + } else { + go func() { + err := f.SendEvent(OnExpiry, nil) + if err != nil { + log.Debugf("error sending OnExpiry "+ + "event: %v", err) + } + }() + } + } + + return nil +} + +// DepositStatesV0 returns the states a deposit can be in. +func (f *FSM) DepositStatesV0() fsm.States { + return fsm.States{ + fsm.EmptyState: fsm.State{ + Transitions: fsm.Transitions{ + OnStart: Deposited, + }, + Action: fsm.NoOpAction, + }, + Deposited: fsm.State{ + Transitions: fsm.Transitions{ + OnExpiry: PublishExpiredDeposit, + OnRecover: Deposited, + }, + Action: fsm.NoOpAction, + }, + PublishExpiredDeposit: fsm.State{ + Transitions: fsm.Transitions{ + OnRecover: PublishExpiredDeposit, + OnExpiryPublished: WaitForExpirySweep, + // If the timeout sweep failed we go back to + // Deposited, hoping that another timeout sweep + // attempt will be successful. Alternatively, + // the client can try to coop-spend the deposit. + fsm.OnError: Deposited, + }, + Action: f.PublishDepositExpirySweepAction, + }, + WaitForExpirySweep: fsm.State{ + Transitions: fsm.Transitions{ + OnExpirySwept: Expired, + // Upon recovery, we republish the sweep tx. + OnRecover: PublishExpiredDeposit, + // If the timeout sweep failed we go back to + // Deposited, hoping that another timeout sweep + // attempt will be successful. Alternatively, + // the client can try to coop-spend the deposit. + fsm.OnError: Deposited, + }, + Action: f.WaitForExpirySweepAction, + }, + Expired: fsm.State{ + Transitions: fsm.Transitions{ + OnExpiry: Expired, + }, + Action: f.SweptExpiredDepositAction, + }, + Failed: fsm.State{ + Transitions: fsm.Transitions{ + OnExpiry: Failed, + }, + Action: fsm.NoOpAction, + }, + } +} + +// DepositEntryFunction is called after every action and updates the deposit in +// the db. +func (f *FSM) updateDeposit(notification fsm.Notification) { + if f.deposit == nil { + return + } + + f.Debugf("NextState: %v, PreviousState: %v, Event: %v", + notification.NextState, notification.PreviousState, + notification.Event, + ) + + f.deposit.setState(notification.NextState) + + // Don't update the deposit if we are in an initial state or if we + // are transitioning from an initial state to a failed state. + d := f.deposit + if d.isInState(fsm.EmptyState) || d.isInState(Deposited) || + (notification.PreviousState == Deposited && d.isInState( + Failed, + )) { + + return + } + + err := f.cfg.Store.UpdateDeposit(f.ctx, f.deposit) + if err != nil { + f.Errorf("unable to update deposit: %v", err) + } +} + +// Infof logs an info message with the deposit outpoint. +func (f *FSM) Infof(format string, args ...interface{}) { + log.Infof( + "Deposit %v: "+format, + append( + []interface{}{f.deposit.OutPoint}, + args..., + )..., + ) +} + +// Debugf logs a debug message with the deposit outpoint. +func (f *FSM) Debugf(format string, args ...interface{}) { + log.Debugf( + "Deposit %v: "+format, + append( + []interface{}{f.deposit.OutPoint}, + args..., + )..., + ) +} + +// Errorf logs an error message with the deposit outpoint. +func (f *FSM) Errorf(format string, args ...interface{}) { + log.Errorf( + "Deposit %v: "+format, + append( + []interface{}{f.deposit.OutPoint}, + args..., + )..., + ) +} + +// SignDescriptor returns the sign descriptor for the static address output. +func (f *FSM) SignDescriptor() (*lndclient.SignDescriptor, error) { + address, err := f.cfg.AddressManager.GetStaticAddress(f.ctx) + if err != nil { + return nil, err + } + + return &lndclient.SignDescriptor{ + WitnessScript: address.TimeoutLeaf.Script, + KeyDesc: keychain.KeyDescriptor{ + PubKey: f.params.ClientPubkey, + }, + Output: wire.NewTxOut( + int64(f.deposit.Value), f.params.PkScript, + ), + HashType: txscript.SigHashDefault, + InputIndex: 0, + SignMethod: input.TaprootScriptSpendSignMethod, + }, nil +} diff --git a/staticaddr/deposit/manager.go b/staticaddr/deposit/manager.go new file mode 100644 index 0000000..eaf27fe --- /dev/null +++ b/staticaddr/deposit/manager.go @@ -0,0 +1,429 @@ +package deposit + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/txscript" + "github.com/btcsuite/btcd/wire" + "github.com/btcsuite/btclog" + "github.com/lightninglabs/lndclient" + "github.com/lightninglabs/loop" + "github.com/lightninglabs/loop/staticaddr" + staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightningnetwork/lnd/lnrpc/walletrpc" + "github.com/lightningnetwork/lnd/lnwallet" +) + +const ( + // PollInterval is the interval in which we poll for new deposits to our + // static address. + PollInterval = 10 * time.Second + + // MinConfs is the minimum number of confirmations we require for a + // deposit to be considered available for loop-ins, coop-spends and + // timeouts. + MinConfs = 3 + + // MaxConfs is unset since we don't require a max number of + // confirmations for deposits. + MaxConfs = 0 +) + +var ( + log btclog.Logger +) + +// ManagerConfig holds the configuration for the address manager. +type ManagerConfig struct { + // AddressClient is the client that communicates with the loop server + // to manage static addresses. + AddressClient staticaddressrpc.StaticAddressServerClient + + // AddressManager is the address manager that is used to fetch static + // address parameters. + AddressManager AddressManager + + // SwapClient provides loop rpc functionality. + SwapClient *loop.Client + + // Store is the database store that is used to store static address + // related records. + Store Store + + // WalletKit is the wallet client that is used to derive new keys from + // lnd's wallet. + WalletKit lndclient.WalletKitClient + + // ChainParams is the chain configuration(mainnet, testnet...) this + // manager uses. + ChainParams *chaincfg.Params + + // ChainNotifier is the chain notifier that is used to listen for new + // blocks. + ChainNotifier lndclient.ChainNotifierClient + + // Signer is the signer client that is used to sign transactions. + Signer lndclient.SignerClient +} + +// Manager manages the address state machines. +type Manager struct { + cfg *ManagerConfig + + runCtx context.Context + + sync.Mutex + + // initChan signals the daemon that the address manager has completed + // its initialization. + initChan chan struct{} + + // activeDeposits contains all the active static address outputs. + activeDeposits map[wire.OutPoint]*FSM + + // initiationHeight stores the currently best known block height. + initiationHeight uint32 + + // currentHeight stores the currently best known block height. + currentHeight uint32 + + // deposits contains all the deposits that have ever been made to the + // static address. This field is used to store and recover deposits. It + // also serves as basis for reconciliation of newly detected deposits by + // matching them against deposits in this map that were already seen. + deposits map[wire.OutPoint]*Deposit + + // finalizedDepositChan is a channel that receives deposits that have + // been finalized. The manager will adjust its internal state and flush + // finalized deposits from its memory. + finalizedDepositChan chan wire.OutPoint +} + +// NewManager creates a new deposit manager. +func NewManager(cfg *ManagerConfig) *Manager { + log = staticaddr.GetLogger() + return &Manager{ + cfg: cfg, + initChan: make(chan struct{}), + activeDeposits: make(map[wire.OutPoint]*FSM), + deposits: make(map[wire.OutPoint]*Deposit), + finalizedDepositChan: make(chan wire.OutPoint), + } +} + +// Run runs the address manager. +func (m *Manager) Run(ctx context.Context, currentHeight uint32) error { + m.runCtx = ctx + + m.Lock() + m.currentHeight, m.initiationHeight = currentHeight, currentHeight + m.Unlock() + + newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(m.runCtx) //nolint:lll + if err != nil { + return err + } + + // Recover previous deposits and static address parameters from the DB. + err = m.recover(m.runCtx) + if err != nil { + return err + } + + // Start the deposit notifier. + m.pollDeposits(ctx) + + // Communicate to the caller that the address manager has completed its + // initialization. + close(m.initChan) + + for { + select { + case height := <-newBlockChan: + m.Lock() + m.currentHeight = uint32(height) + m.Unlock() + + // Inform all active deposits about a new block arrival. + for _, fsm := range m.activeDeposits { + select { + case fsm.blockNtfnChan <- uint32(height): + + case <-m.runCtx.Done(): + return m.runCtx.Err() + } + } + case outpoint := <-m.finalizedDepositChan: + // If deposits notify us about their finalization, we + // update the manager's internal state and flush the + // finalized deposit from memory. + m.finalizeDeposit(outpoint) + + case err := <-newBlockErrChan: + return err + + case <-m.runCtx.Done(): + return m.runCtx.Err() + } + } +} + +// recover recovers static address parameters, previous deposits and state +// machines from the database and starts the deposit notifier. +func (m *Manager) recover(ctx context.Context) error { + log.Infof("Recovering static address parameters and deposits...") + + // Recover deposits. + deposits, err := m.cfg.Store.AllDeposits(ctx) + if err != nil { + return err + } + + for i, d := range deposits { + m.deposits[d.OutPoint] = deposits[i] + + // If the current deposit is final it wasn't active when we + // shut down the client last. So we don't need to start a fsm + // for it. + if d.IsInFinalState() { + continue + } + + log.Debugf("Recovering deposit %x", d.ID) + + // Create a state machine for a given deposit. + fsm, err := NewFSM( + m.runCtx, d, m.cfg, + m.finalizedDepositChan, true, + ) + if err != nil { + return err + } + + // Send the OnRecover event to the state machine. + go func() { + err = fsm.SendEvent(OnRecover, nil) + if err != nil { + log.Errorf("Error sending OnStart event: %v", + err) + } + }() + + m.activeDeposits[d.OutPoint] = fsm + } + + return nil +} + +// WaitInitComplete waits until the address manager has completed its setup. +func (m *Manager) WaitInitComplete() { + defer log.Debugf("Static address deposit manager initiation complete.") + <-m.initChan +} + +// pollDeposits polls new deposits to our static address and notifies the +// manager's event loop about them. +func (m *Manager) pollDeposits(ctx context.Context) { + log.Debugf("Waiting for new static address deposits...") + + go func() { + ticker := time.NewTicker(PollInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := m.reconcileDeposits(ctx) + if err != nil { + log.Errorf("unable to reconcile "+ + "deposits: %v", err) + } + + case <-ctx.Done(): + return + } + } + }() +} + +// reconcileDeposits fetches all spends to our static address from our lnd +// wallet and matches it against the deposits in our memory that we've seen so +// far. It picks the newly identified deposits and starts a state machine per +// deposit to track its progress. +func (m *Manager) reconcileDeposits(ctx context.Context) error { + log.Tracef("Reconciling new deposits...") + + utxos, err := m.cfg.AddressManager.ListUnspent( + ctx, MinConfs, MaxConfs, + ) + if err != nil { + return fmt.Errorf("unable to list new deposits: %v", err) + } + + newDeposits := m.filterNewDeposits(utxos) + if err != nil { + return fmt.Errorf("unable to filter new deposits: %v", err) + } + + if len(newDeposits) == 0 { + log.Tracef("No new deposits...") + return nil + } + + for _, utxo := range newDeposits { + deposit, err := m.createNewDeposit(ctx, utxo) + if err != nil { + return fmt.Errorf("unable to retain new deposit: %v", + err) + } + + log.Debugf("Received deposit: %v", deposit) + err = m.startDepositFsm(deposit) + if err != nil { + return fmt.Errorf("unable to start new deposit FSM: %v", + err) + } + } + + return nil +} + +// createNewDeposit transforms the wallet utxo into a deposit struct and stores +// it in our database and manager memory. +func (m *Manager) createNewDeposit(ctx context.Context, + utxo *lnwallet.Utxo) (*Deposit, error) { + + blockHeight, err := m.getBlockHeight(ctx, utxo) + if err != nil { + return nil, err + } + + // Get the sweep pk script. + addr, err := m.cfg.WalletKit.NextAddr( + ctx, lnwallet.DefaultAccountName, + walletrpc.AddressType_TAPROOT_PUBKEY, false, + ) + if err != nil { + return nil, err + } + + timeoutSweepPkScript, err := txscript.PayToAddrScript(addr) + if err != nil { + return nil, err + } + + id, err := GetRandomDepositID() + if err != nil { + return nil, err + } + deposit := &Deposit{ + ID: id, + State: Deposited, + OutPoint: utxo.OutPoint, + Value: utxo.Value, + ConfirmationHeight: int64(blockHeight), + TimeOutSweepPkScript: timeoutSweepPkScript, + } + + err = m.cfg.Store.CreateDeposit(ctx, deposit) + if err != nil { + return nil, err + } + + m.Lock() + m.deposits[deposit.OutPoint] = deposit + m.Unlock() + + return deposit, nil +} + +// getBlockHeight retrieves the block height of a given utxo. +func (m *Manager) getBlockHeight(ctx context.Context, + utxo *lnwallet.Utxo) (uint32, error) { + + addressParams, err := m.cfg.AddressManager.GetStaticAddressParameters( + ctx, + ) + if err != nil { + return 0, fmt.Errorf("couldn't get confirmation height for "+ + "deposit, %v", err) + } + + notifChan, errChan, err := m.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll + ctx, &utxo.OutPoint.Hash, addressParams.PkScript, MinConfs, + int32(m.initiationHeight), + ) + if err != nil { + return 0, err + } + + select { + case tx := <-notifChan: + return tx.BlockHeight, nil + + case err := <-errChan: + return 0, err + + case <-ctx.Done(): + return 0, ctx.Err() + } +} + +// filterNewDeposits filters the given utxos for new deposits that we haven't +// seen before. +func (m *Manager) filterNewDeposits(utxos []*lnwallet.Utxo) []*lnwallet.Utxo { + m.Lock() + defer m.Unlock() + + var newDeposits []*lnwallet.Utxo + for _, utxo := range utxos { + _, ok := m.deposits[utxo.OutPoint] + if !ok { + newDeposits = append(newDeposits, utxo) + } + } + + return newDeposits +} + +// startDepositFsm creates a new state machine flow from the latest deposit to +// our static address. +func (m *Manager) startDepositFsm(deposit *Deposit) error { + // Create a state machine for a given deposit. + fsm, err := NewFSM( + m.runCtx, deposit, m.cfg, m.finalizedDepositChan, false, + ) + if err != nil { + return err + } + + // Send the start event to the state machine. + go func() { + err = fsm.SendEvent(OnStart, nil) + if err != nil { + log.Errorf("Error sending OnStart event: %v", err) + } + }() + + err = fsm.DefaultObserver.WaitForState(m.runCtx, time.Minute, Deposited) + if err != nil { + return err + } + + // Add the FSM to the active FSMs map. + m.Lock() + m.activeDeposits[deposit.OutPoint] = fsm + m.Unlock() + + return nil +} + +func (m *Manager) finalizeDeposit(outpoint wire.OutPoint) { + m.Lock() + delete(m.activeDeposits, outpoint) + delete(m.deposits, outpoint) + m.Unlock() +} diff --git a/staticaddr/deposit/sql_store.go b/staticaddr/deposit/sql_store.go index 08c1d38..41df5ab 100644 --- a/staticaddr/deposit/sql_store.go +++ b/staticaddr/deposit/sql_store.go @@ -44,7 +44,7 @@ func (s *SqlStore) CreateDeposit(ctx context.Context, deposit *Deposit) error { updateArgs := sqlc.InsertDepositUpdateParams{ DepositID: deposit.ID[:], UpdateTimestamp: s.clock.Now().UTC(), - UpdateState: string(deposit.State), + UpdateState: string(deposit.getState()), } return s.baseDB.ExecTx(ctx, &loopdb.SqliteTxOptions{}, @@ -63,7 +63,7 @@ func (s *SqlStore) UpdateDeposit(ctx context.Context, deposit *Deposit) error { insertUpdateArgs := sqlc.InsertDepositUpdateParams{ DepositID: deposit.ID[:], UpdateTimestamp: s.clock.Now().UTC(), - UpdateState: string(deposit.State), + UpdateState: string(deposit.getState()), } var (