daemon: integrate static address manager and sql store

pull/733/head
Slyghtning 6 months ago
parent 0b504f70af
commit e2c887aca1
No known key found for this signature in database
GPG Key ID: F82D456EA023C9BF

@ -17,14 +17,13 @@ import (
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/instantout"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightninglabs/loop/instantout/reservation"
loop_looprpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/staticaddr"
loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/macaroons"
@ -69,6 +68,12 @@ type Daemon struct {
// same process.
swapClientServer
// AddressServer is the embedded RPC server that satisfies the
// static address client RPC interface. We embed this struct so the
// Daemon itself can be registered to an existing grpc.Server to run as
// a subserver in the same process.
*staticaddr.AddressServer
// ErrChan is an error channel that users of the Daemon struct must use
// to detect runtime errors and also whether a shutdown is fully
// completed.
@ -234,6 +239,7 @@ func (d *Daemon) startWebServers() error {
grpc.StreamInterceptor(streamInterceptor),
)
loop_looprpc.RegisterSwapClientServer(d.grpcServer, d)
loop_looprpc.RegisterStaticAddressClientServer(d.grpcServer, d)
// Register our debug server if it is compiled in.
d.registerDebugServer()
@ -545,6 +551,26 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
instantOutManager: instantOutManager,
}
// Create a static address server client.
staticAddressClient := loop_swaprpc.NewStaticAddressServerClient(
swapClient.Conn,
)
store := staticaddr.NewSqlStore(baseDb)
cfg := &staticaddr.ManagerConfig{
AddressClient: staticAddressClient,
SwapClient: swapClient,
Store: store,
WalletKit: d.lnd.WalletKit,
ChainParams: d.lnd.ChainParams,
}
staticAddressManager := staticaddr.NewAddressManager(cfg)
d.AddressServer = staticaddr.NewAddressServer(
staticAddressClient, staticAddressManager,
)
// Retrieve all currently existing swaps from the database.
swapsList, err := d.impl.FetchSwaps(d.mainCtx)
if err != nil {
@ -635,42 +661,21 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}()
}
// Start the instant out manager.
if d.instantOutManager != nil {
d.wg.Add(1)
initChan := make(chan struct{})
go func() {
defer d.wg.Done()
getInfo, err := d.lnd.Client.GetInfo(d.mainCtx)
if err != nil {
d.internalErrChan <- err
return
}
// Start the static address manager.
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Info("Starting instantout manager")
defer log.Info("Instantout manager stopped")
log.Info("Starting static address manager...")
err = staticAddressManager.Run(d.mainCtx)
if err != nil && !errors.Is(context.Canceled, err) {
d.internalErrChan <- err
}
err = d.instantOutManager.Run(
d.mainCtx, initChan, int32(getInfo.BlockHeight),
)
if err != nil && !errors.Is(err, context.Canceled) {
d.internalErrChan <- err
}
}()
log.Info("Static address manager stopped")
}()
// Wait for the instantout server to be ready before starting the
// grpc server.
timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second)
select {
case <-timeOutCtx.Done():
cancel()
return fmt.Errorf("reservation server not ready: %v",
timeOutCtx.Err())
case <-initChan:
cancel()
}
}
staticAddressManager.WaitInitComplete()
// Last, start our internal error handler. This will return exactly one
// error or nil on the main error channel to inform the caller that

Loading…
Cancel
Save