staticaddr: deposit manager and fsm

Slyghtning 2 months ago
parent c56019ed34
commit c04b8b761f
No known key found for this signature in database
GPG Key ID: F82D456EA023C9BF

@ -1,4 +1,4 @@
package staticaddr
package address
import (
"bytes"
@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/chaincfg"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/staticaddr"
"github.com/lightninglabs/loop/staticaddr/script"
"github.com/lightninglabs/loop/swap"
staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc"
@ -31,7 +32,7 @@ type ManagerConfig struct {
// Store is the database store that is used to store static address
// related records.
Store AddressStore
Store Store
// WalletKit is the wallet client that is used to derive new keys from
// lnd's wallet.
@ -46,31 +47,20 @@ type ManagerConfig struct {
type Manager struct {
cfg *ManagerConfig
initChan chan struct{}
sync.Mutex
}
// NewAddressManager creates a new address manager.
func NewAddressManager(cfg *ManagerConfig) *Manager {
// NewManager creates a new address manager.
func NewManager(cfg *ManagerConfig) *Manager {
return &Manager{
cfg: cfg,
initChan: make(chan struct{}),
cfg: cfg,
}
}
// Run runs the address manager.
func (m *Manager) Run(ctx context.Context) error {
log.Debugf("Starting address manager.")
defer log.Debugf("Address manager stopped.")
// Communicate to the caller that the address manager has completed its
// initialization.
close(m.initChan)
<-ctx.Done()
return nil
return ctx.Err()
}
// NewAddress starts a new address creation flow.
@ -113,7 +103,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot,
// Send our clientPubKey to the server and wait for the server to
// respond with he serverPubKey and the static address CSV expiry.
protocolVersion := CurrentRPCProtocolVersion()
protocolVersion := staticaddr.CurrentRPCProtocolVersion()
resp, err := m.cfg.AddressClient.ServerNewAddress(
ctx, &staticaddressrpc.ServerNewAddressRequest{
ProtocolVersion: protocolVersion,
@ -146,7 +136,7 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot,
// Create the static address from the parameters the server provided and
// store all parameters in the database.
addrParams := &AddressParameters{
addrParams := &Parameters{
ClientPubkey: clientPubKey.PubKey,
ServerPubkey: serverPubKey,
PkScript: pkScript,
@ -155,7 +145,9 @@ func (m *Manager) NewAddress(ctx context.Context) (*btcutil.AddressTaproot,
Family: clientPubKey.Family,
Index: clientPubKey.Index,
},
ProtocolVersion: AddressProtocolVersion(protocolVersion),
ProtocolVersion: staticaddr.AddressProtocolVersion(
protocolVersion,
),
}
err = m.cfg.Store.CreateStaticAddress(ctx, addrParams)
if err != nil {
@ -197,12 +189,6 @@ func (m *Manager) getTaprootAddress(clientPubkey,
)
}
// WaitInitComplete waits until the address manager has completed its setup.
func (m *Manager) WaitInitComplete() {
defer log.Debugf("Address manager initiation complete.")
<-m.initChan
}
// ListUnspentRaw returns a list of utxos at the static address.
func (m *Manager) ListUnspentRaw(ctx context.Context, minConfs,
maxConfs int32) (*btcutil.AddressTaproot, []*lnwallet.Utxo, error) {
@ -213,7 +199,7 @@ func (m *Manager) ListUnspentRaw(ctx context.Context, minConfs,
return nil, nil, err
case len(addresses) == 0:
return nil, nil, fmt.Errorf("no address found")
return nil, nil, nil
case len(addresses) > 1:
return nil, nil, fmt.Errorf("more than one address found")
@ -249,3 +235,52 @@ func (m *Manager) ListUnspentRaw(ctx context.Context, minConfs,
return taprootAddress, filteredUtxos, nil
}
// GetStaticAddressParameters returns the parameters of the static address.
func (m *Manager) GetStaticAddressParameters(ctx context.Context) (*Parameters,
error) {
params, err := m.cfg.Store.GetAllStaticAddresses(ctx)
if err != nil {
return nil, err
}
if len(params) == 0 {
return nil, fmt.Errorf("no static address parameters found")
}
return params[0], nil
}
// GetStaticAddress returns a taproot address for the given client and server
// public keys and expiry.
func (m *Manager) GetStaticAddress(ctx context.Context) (*script.StaticAddress,
error) {
params, err := m.GetStaticAddressParameters(ctx)
if err != nil {
return nil, err
}
address, err := script.NewStaticAddress(
input.MuSig2Version100RC2, int64(params.Expiry),
params.ClientPubkey, params.ServerPubkey,
)
if err != nil {
return nil, err
}
return address, nil
}
// ListUnspent returns a list of utxos at the static address.
func (m *Manager) ListUnspent(ctx context.Context, minConfs,
maxConfs int32) ([]*lnwallet.Utxo, error) {
_, utxos, err := m.ListUnspentRaw(ctx, minConfs, maxConfs)
if err != nil {
return nil, err
}
return utxos, nil
}

@ -0,0 +1,148 @@
package deposit
import (
"errors"
"fmt"
"strings"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/loop/staticaddr/script"
)
const (
defaultConfTarget = 3
)
// PublishDepositExpirySweepAction creates and publishes the timeout transaction
// that spends the deposit from the static address timeout leaf to the
// predefined timeout sweep pkscript.
func (f *FSM) PublishDepositExpirySweepAction(_ fsm.EventContext) fsm.EventType {
msgTx := wire.NewMsgTx(2)
params, err := f.cfg.AddressManager.GetStaticAddressParameters(f.ctx)
if err != nil {
return fsm.OnError
}
// Add the deposit outpoint as input to the transaction.
msgTx.AddTxIn(&wire.TxIn{
PreviousOutPoint: f.deposit.OutPoint,
Sequence: params.Expiry,
SignatureScript: nil,
})
// Estimate the fee rate of an expiry spend transaction.
feeRateEstimator, err := f.cfg.WalletKit.EstimateFeeRate(
f.ctx, defaultConfTarget,
)
if err != nil {
return f.HandleError(fmt.Errorf("timeout sweep fee "+
"estimation failed: %v", err))
}
weight := script.ExpirySpendWeight()
fee := feeRateEstimator.FeeForWeight(weight)
// We cap the fee at 20% of the deposit value.
if fee > f.deposit.Value/5 {
return f.HandleError(errors.New("fee is greater than 20% of " +
"the deposit value"))
}
output := &wire.TxOut{
Value: int64(f.deposit.Value - fee),
PkScript: f.deposit.TimeOutSweepPkScript,
}
msgTx.AddTxOut(output)
txOut := &wire.TxOut{
Value: int64(f.deposit.Value),
PkScript: params.PkScript,
}
prevOut := []*wire.TxOut{txOut}
signDesc, err := f.SignDescriptor()
if err != nil {
return f.HandleError(err)
}
rawSigs, err := f.cfg.Signer.SignOutputRaw(
f.ctx, msgTx, []*lndclient.SignDescriptor{signDesc}, prevOut,
)
if err != nil {
return f.HandleError(err)
}
address, err := f.cfg.AddressManager.GetStaticAddress(f.ctx)
if err != nil {
return f.HandleError(err)
}
sig := rawSigs[0]
msgTx.TxIn[0].Witness, err = address.GenTimeoutWitness(sig)
if err != nil {
return f.HandleError(err)
}
txLabel := fmt.Sprintf("timeout sweep for deposit %v",
f.deposit.OutPoint)
err = f.cfg.WalletKit.PublishTransaction(f.ctx, msgTx, txLabel)
if err != nil {
if !strings.Contains(err.Error(), "output already spent") {
log.Errorf("%v: %v", txLabel, err)
f.LastActionError = err
return fsm.OnError
}
} else {
f.Debugf("published timeout sweep with txid: %v",
msgTx.TxHash())
}
return OnExpiryPublished
}
// WaitForExpirySweepAction waits for a sufficient number of confirmations
// before a timeout sweep is considered successful.
func (f *FSM) WaitForExpirySweepAction(_ fsm.EventContext) fsm.EventType {
spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll
f.ctx, nil, f.deposit.TimeOutSweepPkScript, defaultConfTarget,
int32(f.deposit.ConfirmationHeight),
)
if err != nil {
return f.HandleError(err)
}
select {
case err := <-errSpendChan:
log.Debugf("error while sweeping expired deposit: %v", err)
return fsm.OnError
case confirmedTx := <-spendChan:
f.deposit.ExpirySweepTxid = confirmedTx.Tx.TxHash()
return OnExpirySwept
case <-f.ctx.Done():
return fsm.OnError
}
}
// SweptExpiredDepositAction is the final action of the FSM. It signals to the
// manager that the deposit has been swept and the FSM can be removed. It also
// ends the state machine main loop by cancelling its context.
func (f *FSM) SweptExpiredDepositAction(_ fsm.EventContext) fsm.EventType {
select {
case <-f.ctx.Done():
return fsm.OnError
default:
f.finalizedDepositChan <- f.deposit.OutPoint
f.ctx.Done()
}
return fsm.NoOp
}

@ -0,0 +1,74 @@
package deposit
import (
"crypto/rand"
"fmt"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/loop/fsm"
)
// ID is a unique identifier for a deposit.
type ID [IdLength]byte
// FromByteSlice creates a deposit id from a byte slice.
func (r *ID) FromByteSlice(b []byte) error {
if len(b) != IdLength {
return fmt.Errorf("deposit id must be 32 bytes, got %d, %x",
len(b), b)
}
copy(r[:], b)
return nil
}
// Deposit bundles an utxo at a static address together with manager-relevant
// data.
type Deposit struct {
// ID is the unique identifier of the deposit.
ID ID
// State is the current state of the deposit.
State fsm.StateType
// The outpoint of the deposit.
wire.OutPoint
// Value is the amount of the deposit.
Value btcutil.Amount
// ConfirmationHeight is the absolute height at which the deposit was
// first confirmed.
ConfirmationHeight int64
// TimeOutSweepPkScript is the pk script that is used to sweep the
// deposit to after it is expired.
TimeOutSweepPkScript []byte
// ExpirySweepTxid is the transaction id of the expiry sweep.
ExpirySweepTxid chainhash.Hash
}
// IsInPendingState returns true if the deposit is pending.
func (d *Deposit) IsInPendingState() bool {
return !d.IsInFinalState()
}
// IsInFinalState returns true if the deposit is final.
func (d *Deposit) IsInFinalState() bool {
return d.State == Expired || d.State == Failed
}
func (d *Deposit) isExpired(currentHeight, expiry uint32) bool {
return currentHeight >= uint32(d.ConfirmationHeight)+expiry
}
// GetRandomDepositID generates a random deposit ID.
func GetRandomDepositID() (ID, error) {
var id ID
_, err := rand.Read(id[:])
return id, err
}

@ -0,0 +1,303 @@
package deposit
import (
"context"
"errors"
"fmt"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/loop/staticaddr"
"github.com/lightninglabs/loop/staticaddr/address"
"github.com/lightninglabs/loop/staticaddr/script"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
)
const (
DefaultObserverSize = 20
)
var (
ErrProtocolVersionNotSupported = errors.New("protocol version not " +
"supported")
)
// States.
var (
Deposited = fsm.StateType("Deposited")
PublishExpiredDeposit = fsm.StateType("PublishExpiredDeposit")
WaitForExpirySweep = fsm.StateType("WaitForExpirySweep")
Expired = fsm.StateType("Expired")
Failed = fsm.StateType("Failed")
)
// Events.
var (
OnStart = fsm.EventType("OnStart")
OnExpiry = fsm.EventType("OnExpiry")
OnExpiryPublished = fsm.EventType("OnExpiryPublished")
OnExpirySwept = fsm.EventType("OnExpirySwept")
OnRecover = fsm.EventType("OnRecover")
)
// FSM is the state machine that handles the instant out.
type FSM struct {
*fsm.StateMachine
cfg *ManagerConfig
deposit *Deposit
params *address.Parameters
address *script.StaticAddress
ctx context.Context
blockNtfnChan chan uint32
finalizedDepositChan chan wire.OutPoint
}
// NewFSM creates a new state machine that can action on all static address
// feature requests.
func NewFSM(ctx context.Context, deposit *Deposit, cfg *ManagerConfig,
finalizedDepositChan chan wire.OutPoint,
recoverStateMachine bool) (*FSM, error) {
params, err := cfg.AddressManager.GetStaticAddressParameters(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get static address "+
"parameters: %v", err)
}
address, err := cfg.AddressManager.GetStaticAddress(ctx)
if err != nil {
return nil, fmt.Errorf("unable to get static address: %v", err)
}
depoFsm := &FSM{
cfg: cfg,
deposit: deposit,
params: params,
address: address,
ctx: ctx,
blockNtfnChan: make(chan uint32),
finalizedDepositChan: finalizedDepositChan,
}
depositStates := depoFsm.DepositStatesV0()
switch params.ProtocolVersion {
case staticaddr.ProtocolVersion_V0:
default:
return nil, ErrProtocolVersionNotSupported
}
if recoverStateMachine {
depoFsm.StateMachine = fsm.NewStateMachineWithState(
depositStates, deposit.State,
DefaultObserverSize,
)
} else {
depoFsm.StateMachine = fsm.NewStateMachine(
depositStates, DefaultObserverSize,
)
}
depoFsm.ActionEntryFunc = depoFsm.updateDeposit
go func() {
for {
select {
case currentHeight := <-depoFsm.blockNtfnChan:
err := depoFsm.handleBlockNotification(
currentHeight,
)
if err != nil {
log.Errorf("error handling block "+
"notification: %v", err)
}
case <-ctx.Done():
return
}
}
}()
return depoFsm, nil
}
// handleBlockNotification inspects the current block height and sends the
// OnExpiry event to publish the expiry sweep transaction if the deposit timed
// out, or it republishes the expiry sweep transaction if it was not yet swept.
func (f *FSM) handleBlockNotification(currentHeight uint32) error {
params, err := f.cfg.AddressManager.GetStaticAddressParameters(f.ctx)
if err != nil {
return err
}
// If the deposit is expired but not yet sufficiently confirmed, we
// republish the expiry sweep transaction.
if f.deposit.isExpired(currentHeight, params.Expiry) {
if f.deposit.State == WaitForExpirySweep {
f.PublishDepositExpirySweepAction(nil)
} else {
go func() {
err := f.SendEvent(OnExpiry, nil)
if err != nil {
log.Debugf("error sending OnExpiry "+
"event: %v", err)
}
}()
}
}
return nil
}
// DepositStatesV0 returns the states a deposit can be in.
func (f *FSM) DepositStatesV0() fsm.States {
return fsm.States{
fsm.EmptyState: fsm.State{
Transitions: fsm.Transitions{
OnStart: Deposited,
},
Action: fsm.NoOpAction,
},
Deposited: fsm.State{
Transitions: fsm.Transitions{
OnExpiry: PublishExpiredDeposit,
OnRecover: Deposited,
},
Action: fsm.NoOpAction,
},
PublishExpiredDeposit: fsm.State{
Transitions: fsm.Transitions{
OnRecover: PublishExpiredDeposit,
OnExpiryPublished: WaitForExpirySweep,
// If the timeout sweep failed we go back to
// Deposited, hoping that another timeout sweep
// attempt will be successful. Alternatively,
// the client can try to coop-spend the deposit.
fsm.OnError: Deposited,
},
Action: f.PublishDepositExpirySweepAction,
},
WaitForExpirySweep: fsm.State{
Transitions: fsm.Transitions{
OnExpirySwept: Expired,
// Upon recovery, we republish the sweep tx.
OnRecover: PublishExpiredDeposit,
// If the timeout sweep failed we go back to
// Deposited, hoping that another timeout sweep
// attempt will be successful. Alternatively,
// the client can try to coop-spend the deposit.
fsm.OnError: Deposited,
},
Action: f.WaitForExpirySweepAction,
},
Expired: fsm.State{
Transitions: fsm.Transitions{
OnExpiry: Expired,
},
Action: f.SweptExpiredDepositAction,
},
Failed: fsm.State{
Transitions: fsm.Transitions{
OnExpiry: Failed,
},
Action: fsm.NoOpAction,
},
}
}
// DepositEntryFunction is called after every action and updates the deposit in
// the db.
func (f *FSM) updateDeposit(notification fsm.Notification) {
if f.deposit == nil {
return
}
f.Debugf("NextState: %v, PreviousState: %v, Event: %v",
notification.NextState, notification.PreviousState,
notification.Event,
)
f.deposit.State = notification.NextState
// Don't update the deposit if we are in an initial state or if we
// are transitioning from an initial state to a failed state.
state := f.deposit.State
if state == fsm.EmptyState || state == Deposited ||
(notification.PreviousState == Deposited && state == Failed) {
return
}
err := f.cfg.Store.UpdateDeposit(f.ctx, f.deposit)
if err != nil {
f.Errorf("unable to update deposit: %v", err)
}
}
// Infof logs an info message with the deposit outpoint.
func (f *FSM) Infof(format string, args ...interface{}) {
log.Infof(
"Deposit %v: "+format,
append(
[]interface{}{f.deposit.OutPoint},
args...,
)...,
)
}
// Debugf logs a debug message with the deposit outpoint.
func (f *FSM) Debugf(format string, args ...interface{}) {
log.Debugf(
"Deposit %v: "+format,
append(
[]interface{}{f.deposit.OutPoint},
args...,
)...,
)
}
// Errorf logs an error message with the deposit outpoint.
func (f *FSM) Errorf(format string, args ...interface{}) {
log.Errorf(
"Deposit %v: "+format,
append(
[]interface{}{f.deposit.OutPoint},
args...,
)...,
)
}
// SignDescriptor returns the sign descriptor for the static address output.
func (f *FSM) SignDescriptor() (*lndclient.SignDescriptor, error) {
address, err := f.cfg.AddressManager.GetStaticAddress(f.ctx)
if err != nil {
return nil, err
}
return &lndclient.SignDescriptor{
WitnessScript: address.TimeoutLeaf.Script,
KeyDesc: keychain.KeyDescriptor{
PubKey: f.params.ClientPubkey,
},
Output: wire.NewTxOut(
int64(f.deposit.Value), f.params.PkScript,
),
HashType: txscript.SigHashDefault,
InputIndex: 0,
SignMethod: input.TaprootScriptSpendSignMethod,
}, nil
}

@ -0,0 +1,420 @@
package deposit
import (
"context"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lnwallet"
)
const (
// PollInterval is the interval in which we poll for new deposits to our
// static address.
PollInterval = 10 * time.Second
// MinConfs is the minimum number of confirmations we require for a
// deposit to be considered available for loop-ins, coop-spends and
// timeouts.
MinConfs = 3
// MaxConfs is unset since we don't require a max number of
// confirmations for deposits.
MaxConfs = 0
)
// ManagerConfig holds the configuration for the address manager.
type ManagerConfig struct {
// AddressClient is the client that communicates with the loop server
// to manage static addresses.
AddressClient staticaddressrpc.StaticAddressServerClient
AddressManager AddressManager
// SwapClient provides loop rpc functionality.
SwapClient *loop.Client
// Store is the database store that is used to store static address
// related records.
Store Store
// WalletKit is the wallet client that is used to derive new keys from
// lnd's wallet.
WalletKit lndclient.WalletKitClient
// ChainParams is the chain configuration(mainnet, testnet...) this
// manager uses.
ChainParams *chaincfg.Params
// ChainNotifier is the chain notifier that is used to listen for new
// blocks.
ChainNotifier lndclient.ChainNotifierClient
// Signer is the signer client that is used to sign transactions.
Signer lndclient.SignerClient
}
// Manager manages the address state machines.
type Manager struct {
cfg *ManagerConfig
runCtx context.Context
sync.Mutex
// initChan signals the daemon that the address manager has completed
// its initialization.
initChan chan struct{}
// activeDeposits contains all the active static address outputs.
activeDeposits map[wire.OutPoint]*FSM
// initiationHeight stores the currently best known block height.
initiationHeight uint32
// currentHeight stores the currently best known block height.
currentHeight uint32
// deposits contains all the deposits that have ever been made to the
// static address. This field is used to store and recover deposits. It
// also serves as basis for reconciliation of newly detected deposits by
// matching them against deposits in this map that were already seen.
deposits map[wire.OutPoint]*Deposit
// finalizedDepositChan is a channel that receives deposits that have
// been finalized. The manager will adjust its internal state and flush
// finalized deposits from its memory.
finalizedDepositChan chan wire.OutPoint
}
// NewManager creates a new deposit manager.
func NewManager(cfg *ManagerConfig) *Manager {
return &Manager{
cfg: cfg,
initChan: make(chan struct{}),
activeDeposits: make(map[wire.OutPoint]*FSM),
deposits: make(map[wire.OutPoint]*Deposit),
finalizedDepositChan: make(chan wire.OutPoint),
}
}
// Run runs the address manager.
func (m *Manager) Run(ctx context.Context, currentHeight uint32) error {
m.runCtx = ctx
m.Lock()
m.currentHeight, m.initiationHeight = currentHeight, currentHeight
m.Unlock()
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(m.runCtx) //nolint:lll
if err != nil {
return err
}
// Recover previous deposits and static address parameters from the DB.
err = m.recover(m.runCtx)
if err != nil {
return err
}
// Start the deposit notifier.
m.pollDeposits(ctx)
// Communicate to the caller that the address manager has completed its
// initialization.
close(m.initChan)
for {
select {
case height := <-newBlockChan:
m.Lock()
m.currentHeight = uint32(height)
m.Unlock()
// Inform all active deposits about a new block arrival.
for _, fsm := range m.activeDeposits {
select {
case fsm.blockNtfnChan <- uint32(height):
case <-m.runCtx.Done():
return m.runCtx.Err()
}
}
case outpoint := <-m.finalizedDepositChan:
// If deposits notify us about their finalization, we
// update the manager's internal state and flush the
// finalized deposit from memory.
m.finalizeDeposit(outpoint)
case err := <-newBlockErrChan:
return err
case <-m.runCtx.Done():
return m.runCtx.Err()
}
}
}
// recover recovers static address parameters, previous deposits and state
// machines from the database and starts the deposit notifier.
func (m *Manager) recover(ctx context.Context) error {
log.Infof("Recovering static address parameters and deposits...")
// Recover deposits.
deposits, err := m.cfg.Store.AllDeposits(ctx)
if err != nil {
return err
}
for i, d := range deposits {
m.deposits[d.OutPoint] = deposits[i]
// If the current deposit is final it wasn't active when we
// shut down the client last. So we don't need to start a fsm
// for it.
if d.IsInFinalState() {
continue
}
log.Debugf("Recovering deposit %x", d.ID)
// Create a state machine for a given deposit.
fsm, err := NewFSM(
m.runCtx, d, m.cfg,
m.finalizedDepositChan, true,
)
if err != nil {
return err
}
// Send the OnRecover event to the state machine.
go func() {
err = fsm.SendEvent(OnRecover, nil)
if err != nil {
log.Errorf("Error sending OnStart event: %v",
err)
}
}()
m.activeDeposits[d.OutPoint] = fsm
}
return nil
}
// WaitInitComplete waits until the address manager has completed its setup.
func (m *Manager) WaitInitComplete() {
defer log.Debugf("Static address deposit manager initiation complete.")
<-m.initChan
}
// pollDeposits polls new deposits to our static address and notifies the
// manager's event loop about them.
func (m *Manager) pollDeposits(ctx context.Context) {
log.Debugf("waiting for new static address deposits...")
go func() {
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := m.reconcileDeposits(ctx)
if err != nil {
log.Errorf("unable to reconcile "+
"deposits: %v", err)
}
case <-ctx.Done():
return
}
}
}()
}
// reconcileDeposits fetches all spends to our static address from our lnd
// wallet and matches it against the deposits in our memory that we've seen so
// far. It picks the newly identified deposits and starts a state machine per
// deposit to track its progress.
func (m *Manager) reconcileDeposits(ctx context.Context) error {
log.Tracef("Reconciling new deposits...")
utxos, err := m.cfg.AddressManager.ListUnspent(
ctx, MinConfs, MaxConfs,
)
if err != nil {
return fmt.Errorf("unable to list new deposits: %v", err)
}
newDeposits := m.filterNewDeposits(utxos)
if err != nil {
return fmt.Errorf("unable to filter new deposits: %v", err)
}
if len(newDeposits) == 0 {
log.Tracef("No new deposits...")
return nil
}
for _, utxo := range newDeposits {
deposit, err := m.createNewDeposit(ctx, utxo)
if err != nil {
return fmt.Errorf("unable to retain new deposit: %v",
err)
}
log.Debugf("Received deposit: %v", deposit)
err = m.startDepositFsm(deposit)
if err != nil {
return fmt.Errorf("unable to start new deposit FSM: %v",
err)
}
}
return nil
}
// createNewDeposit transforms the wallet utxo into a deposit struct and stores
// it in our database and manager memory.
func (m *Manager) createNewDeposit(ctx context.Context,
utxo *lnwallet.Utxo) (*Deposit, error) {
blockHeight, err := m.getBlockHeight(ctx, utxo)
if err != nil {
return nil, err
}
// Get the sweep pk script.
addr, err := m.cfg.WalletKit.NextAddr(
ctx, lnwallet.DefaultAccountName,
walletrpc.AddressType_TAPROOT_PUBKEY, false,
)
if err != nil {
return nil, err
}
timeoutSweepPkScript, err := txscript.PayToAddrScript(addr)
if err != nil {
return nil, err
}
id, err := GetRandomDepositID()
if err != nil {
return nil, err
}
deposit := &Deposit{
ID: id,
State: Deposited,
OutPoint: utxo.OutPoint,
Value: utxo.Value,
ConfirmationHeight: int64(blockHeight),
TimeOutSweepPkScript: timeoutSweepPkScript,
}
err = m.cfg.Store.CreateDeposit(ctx, deposit)
if err != nil {
return nil, err
}
m.Lock()
m.deposits[deposit.OutPoint] = deposit
m.Unlock()
return deposit, nil
}
// getBlockHeight retrieves the block height of a given utxo.
func (m *Manager) getBlockHeight(ctx context.Context,
utxo *lnwallet.Utxo) (uint32, error) {
addressParams, err := m.cfg.AddressManager.GetStaticAddressParameters(
ctx,
)
if err != nil {
return 0, fmt.Errorf("couldn't get confirmation height for "+
"deposit, %v", err)
}
notifChan, errChan, err := m.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll
ctx, &utxo.OutPoint.Hash, addressParams.PkScript, MinConfs,
int32(m.initiationHeight),
)
if err != nil {
return 0, err
}
select {
case tx := <-notifChan:
return tx.BlockHeight, nil
case err := <-errChan:
return 0, err
case <-ctx.Done():
return 0, ctx.Err()
}
}
// filterNewDeposits filters the given utxos for new deposits that we haven't
// seen before.
func (m *Manager) filterNewDeposits(utxos []*lnwallet.Utxo) []*lnwallet.Utxo {
m.Lock()
defer m.Unlock()
var newDeposits []*lnwallet.Utxo
for _, utxo := range utxos {
_, ok := m.deposits[utxo.OutPoint]
if !ok {
newDeposits = append(newDeposits, utxo)
}
}
return newDeposits
}
// startDepositFsm creates a new state machine flow from the latest deposit to
// our static address.
func (m *Manager) startDepositFsm(deposit *Deposit) error {
// Create a state machine for a given deposit.
fsm, err := NewFSM(
m.runCtx, deposit, m.cfg, m.finalizedDepositChan, false,
)
if err != nil {
return err
}
// Send the start event to the state machine.
go func() {
err = fsm.SendEvent(OnStart, nil)
if err != nil {
log.Errorf("Error sending OnStart event: %v", err)
}
}()
err = fsm.DefaultObserver.WaitForState(m.runCtx, time.Minute, Deposited)
if err != nil {
return err
}
// Add the FSM to the active FSMs map.
m.Lock()
m.activeDeposits[deposit.OutPoint] = fsm
m.Unlock()
return nil
}
func (m *Manager) finalizeDeposit(outpoint wire.OutPoint) {
m.Lock()
delete(m.activeDeposits, outpoint)
delete(m.deposits, outpoint)
m.Unlock()
}
Loading…
Cancel
Save