You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loop/staticaddr/deposit/manager.go

504 lines
12 KiB
Go

package deposit
import (
"context"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/fsm"
staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lnwallet"
)
const (
// PollInterval is the interval in which we poll for new deposits to our
// static address.
PollInterval = 10 * time.Second
// MinConfs is the minimum number of confirmations we require for a
// deposit to be considered available for loop-ins, coop-spends and
// timeouts.
MinConfs = 3
// MaxConfs is unset since we don't require a max number of
// confirmations for deposits.
MaxConfs = 0
)
// ManagerConfig holds the configuration for the address manager.
type ManagerConfig struct {
// AddressClient is the client that communicates with the loop server
// to manage static addresses.
AddressClient staticaddressrpc.StaticAddressServerClient
AddressManager AddressManager
// SwapClient provides loop rpc functionality.
SwapClient *loop.Client
// Store is the database store that is used to store static address
// related records.
Store Store
// WalletKit is the wallet client that is used to derive new keys from
// lnd's wallet.
WalletKit lndclient.WalletKitClient
// ChainParams is the chain configuration(mainnet, testnet...) this
// manager uses.
ChainParams *chaincfg.Params
// ChainNotifier is the chain notifier that is used to listen for new
// blocks.
ChainNotifier lndclient.ChainNotifierClient
// Signer is the signer client that is used to sign transactions.
Signer lndclient.SignerClient
}
// Manager manages the address state machines.
type Manager struct {
cfg *ManagerConfig
runCtx context.Context
sync.Mutex
// initChan signals the daemon that the address manager has completed
// its initialization.
initChan chan struct{}
// activeDeposits contains all the active static address outputs.
activeDeposits map[wire.OutPoint]*FSM
// initiationHeight stores the currently best known block height.
initiationHeight uint32
// currentHeight stores the currently best known block height.
currentHeight uint32
// deposits contains all the deposits that have ever been made to the
// static address. This field is used to store and recover deposits. It
// also serves as basis for reconciliation of newly detected deposits by
// matching them against deposits in this map that were already seen.
deposits map[wire.OutPoint]*Deposit
// finalizedDepositChan is a channel that receives deposits that have
// been finalized. The manager will adjust its internal state and flush
// finalized deposits from its memory.
finalizedDepositChan chan wire.OutPoint
}
// NewManager creates a new deposit manager.
func NewManager(cfg *ManagerConfig) *Manager {
return &Manager{
cfg: cfg,
initChan: make(chan struct{}),
activeDeposits: make(map[wire.OutPoint]*FSM),
deposits: make(map[wire.OutPoint]*Deposit),
finalizedDepositChan: make(chan wire.OutPoint),
}
}
// Run runs the address manager.
func (m *Manager) Run(ctx context.Context, currentHeight uint32) error {
m.runCtx = ctx
m.Lock()
m.currentHeight, m.initiationHeight = currentHeight, currentHeight
m.Unlock()
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(m.runCtx) //nolint:lll
if err != nil {
return err
}
// Recover previous deposits and static address parameters from the DB.
err = m.recover(m.runCtx)
if err != nil {
return err
}
// Start the deposit notifier.
m.pollDeposits(ctx)
// Communicate to the caller that the address manager has completed its
// initialization.
close(m.initChan)
for {
select {
case height := <-newBlockChan:
m.Lock()
m.currentHeight = uint32(height)
m.Unlock()
// Inform all active deposits about a new block arrival.
for _, fsm := range m.activeDeposits {
select {
case fsm.blockNtfnChan <- uint32(height):
case <-m.runCtx.Done():
return m.runCtx.Err()
}
}
case outpoint := <-m.finalizedDepositChan:
// If deposits notify us about their finalization, we
// update the manager's internal state and flush the
// finalized deposit from memory.
m.finalizeDeposit(outpoint)
case err := <-newBlockErrChan:
return err
case <-m.runCtx.Done():
return m.runCtx.Err()
}
}
}
// recover recovers static address parameters, previous deposits and state
// machines from the database and starts the deposit notifier.
func (m *Manager) recover(ctx context.Context) error {
log.Infof("Recovering static address parameters and deposits...")
// Recover deposits.
deposits, err := m.cfg.Store.AllDeposits(ctx)
if err != nil {
return err
}
for i, d := range deposits {
m.deposits[d.OutPoint] = deposits[i]
// If the current deposit is final it wasn't active when we
// shut down the client last. So we don't need to start a fsm
// for it.
if d.IsInFinalState() {
continue
}
log.Debugf("Recovering deposit %x", d.ID)
// Create a state machine for a given deposit.
fsm, err := NewFSM(
m.runCtx, d, m.cfg,
m.finalizedDepositChan, true,
)
if err != nil {
return err
}
// Send the OnRecover event to the state machine.
go func() {
err = fsm.SendEvent(OnRecover, nil)
if err != nil {
log.Errorf("Error sending OnStart event: %v",
err)
}
}()
m.activeDeposits[d.OutPoint] = fsm
}
return nil
}
// WaitInitComplete waits until the address manager has completed its setup.
func (m *Manager) WaitInitComplete() {
defer log.Debugf("Static address deposit manager initiation complete.")
<-m.initChan
}
// pollDeposits polls new deposits to our static address and notifies the
// manager's event loop about them.
func (m *Manager) pollDeposits(ctx context.Context) {
log.Debugf("waiting for new static address deposits...")
go func() {
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := m.reconcileDeposits(ctx)
if err != nil {
log.Errorf("unable to reconcile "+
"deposits: %v", err)
}
case <-ctx.Done():
return
}
}
}()
}
// reconcileDeposits fetches all spends to our static address from our lnd
// wallet and matches it against the deposits in our memory that we've seen so
// far. It picks the newly identified deposits and starts a state machine per
// deposit to track its progress.
func (m *Manager) reconcileDeposits(ctx context.Context) error {
log.Tracef("Reconciling new deposits...")
utxos, err := m.cfg.AddressManager.ListUnspent(
ctx, MinConfs, MaxConfs,
)
if err != nil {
return fmt.Errorf("unable to list new deposits: %v", err)
}
newDeposits := m.filterNewDeposits(utxos)
if err != nil {
return fmt.Errorf("unable to filter new deposits: %v", err)
}
if len(newDeposits) == 0 {
log.Tracef("No new deposits...")
return nil
}
for _, utxo := range newDeposits {
deposit, err := m.createNewDeposit(ctx, utxo)
if err != nil {
return fmt.Errorf("unable to retain new deposit: %v",
err)
}
log.Debugf("Received deposit: %v", deposit)
err = m.startDepositFsm(deposit)
if err != nil {
return fmt.Errorf("unable to start new deposit FSM: %v",
err)
}
}
return nil
}
// createNewDeposit transforms the wallet utxo into a deposit struct and stores
// it in our database and manager memory.
func (m *Manager) createNewDeposit(ctx context.Context,
utxo *lnwallet.Utxo) (*Deposit, error) {
blockHeight, err := m.getBlockHeight(ctx, utxo)
if err != nil {
return nil, err
}
// Get the sweep pk script.
addr, err := m.cfg.WalletKit.NextAddr(
ctx, lnwallet.DefaultAccountName,
walletrpc.AddressType_TAPROOT_PUBKEY, false,
)
if err != nil {
return nil, err
}
timeoutSweepPkScript, err := txscript.PayToAddrScript(addr)
if err != nil {
return nil, err
}
id, err := GetRandomDepositID()
if err != nil {
return nil, err
}
deposit := &Deposit{
ID: id,
State: Deposited,
OutPoint: utxo.OutPoint,
Value: utxo.Value,
ConfirmationHeight: int64(blockHeight),
TimeOutSweepPkScript: timeoutSweepPkScript,
}
err = m.cfg.Store.CreateDeposit(ctx, deposit)
if err != nil {
return nil, err
}
m.Lock()
m.deposits[deposit.OutPoint] = deposit
m.Unlock()
return deposit, nil
}
// getBlockHeight retrieves the block height of a given utxo.
func (m *Manager) getBlockHeight(ctx context.Context,
utxo *lnwallet.Utxo) (uint32, error) {
addressParams, err := m.cfg.AddressManager.GetStaticAddressParameters(
ctx,
)
if err != nil {
return 0, fmt.Errorf("couldn't get confirmation height for "+
"deposit, %v", err)
}
notifChan, errChan, err := m.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll
ctx, &utxo.OutPoint.Hash, addressParams.PkScript, MinConfs,
int32(m.initiationHeight),
)
if err != nil {
return 0, err
}
select {
case tx := <-notifChan:
return tx.BlockHeight, nil
case err := <-errChan:
return 0, err
case <-ctx.Done():
return 0, ctx.Err()
}
}
// filterNewDeposits filters the given utxos for new deposits that we haven't
// seen before.
func (m *Manager) filterNewDeposits(utxos []*lnwallet.Utxo) []*lnwallet.Utxo {
m.Lock()
defer m.Unlock()
var newDeposits []*lnwallet.Utxo
for _, utxo := range utxos {
_, ok := m.deposits[utxo.OutPoint]
if !ok {
newDeposits = append(newDeposits, utxo)
}
}
return newDeposits
}
// startDepositFsm creates a new state machine flow from the latest deposit to
// our static address.
func (m *Manager) startDepositFsm(deposit *Deposit) error {
// Create a state machine for a given deposit.
fsm, err := NewFSM(
m.runCtx, deposit, m.cfg, m.finalizedDepositChan, false,
)
if err != nil {
return err
}
// Send the start event to the state machine.
go func() {
err = fsm.SendEvent(OnStart, nil)
if err != nil {
log.Errorf("Error sending OnStart event: %v", err)
}
}()
err = fsm.DefaultObserver.WaitForState(m.runCtx, time.Minute, Deposited)
if err != nil {
return err
}
// Add the FSM to the active FSMs map.
m.Lock()
m.activeDeposits[deposit.OutPoint] = fsm
m.Unlock()
return nil
}
func (m *Manager) finalizeDeposit(outpoint wire.OutPoint) {
m.Lock()
delete(m.activeDeposits, outpoint)
delete(m.deposits, outpoint)
m.Unlock()
}
// GetActiveDepositsInState returns all active deposits.
func (m *Manager) GetActiveDepositsInState(stateFilter fsm.StateType) (
[]*Deposit, error) {
m.Lock()
defer m.Unlock()
var deposits []*Deposit
for _, fsm := range m.activeDeposits {
if fsm.deposit.State != stateFilter {
continue
}
deposits = append(deposits, fsm.deposit)
}
return deposits, nil
}
// GetAllDeposits returns all active deposits.
func (m *Manager) GetAllDeposits() ([]*Deposit, error) {
return m.cfg.Store.AllDeposits(m.runCtx)
}
// AllOutpointsActiveDeposits checks if all deposits referenced by the outpoints
// are active and in the specified state.
func (m *Manager) AllOutpointsActiveDeposits(outpoints []wire.OutPoint,
stateFilter fsm.StateType) (
[]*Deposit, bool) {
m.Lock()
defer m.Unlock()
var deposits []*Deposit
for _, o := range outpoints {
if _, ok := m.activeDeposits[o]; !ok {
return nil, false
}
deposit := m.deposits[o]
if deposit.State != stateFilter {
return nil, false
}
deposits = append(deposits, m.deposits[o])
}
return deposits, true
}
// TransitionDeposits allows a caller to transition a set of deposits to a new
// state.
func (m *Manager) TransitionDeposits(deposits []*Deposit, event fsm.EventType,
expectedFinalState fsm.StateType) error {
for _, d := range deposits {
m.Lock()
sm, ok := m.activeDeposits[d.OutPoint]
m.Unlock()
if !ok {
return fmt.Errorf("deposit not found")
}
err := sm.SendEvent(event, nil)
if err != nil {
return err
}
err = sm.DefaultObserver.WaitForState(
m.runCtx, time.Minute, expectedFinalState,
)
if err != nil {
return err
}
}
return nil
}
func (m *Manager) UpdateDeposit(d *Deposit) error {
return m.cfg.Store.UpdateDeposit(m.runCtx, d)
}