loopd: refactor into Start/Stop methods

pull/202/head
Oliver Gugger 4 years ago
parent 215e5b99d6
commit 506d0c2257
No known key found for this signature in database
GPG Key ID: 8E4256593F177720

@ -10,6 +10,6 @@ func main() {
cfg := loopd.RPCConfig{}
err := loopd.Start(cfg)
if err != nil {
fmt.Println(err)
fmt.Printf("loopd exited with an error: %v\n", err)
}
}

@ -6,11 +6,8 @@ import (
"fmt"
"net"
"net/http"
"os"
"os/signal"
"runtime/pprof"
"sync"
"time"
"sync/atomic"
proxy "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/lightninglabs/loop"
@ -24,6 +21,10 @@ var (
// maxMsgRecvSize is the largest message our REST proxy will receive. We
// set this to 200MiB atm.
maxMsgRecvSize = grpc.MaxCallRecvMsgSize(1 * 1024 * 1024 * 200)
// errOnlyStartOnce is the error that is returned if the daemon is
// started more than once.
errOnlyStartOnce = fmt.Errorf("daemon can only be started once")
)
// listenerCfg holds closures used to retrieve listeners for the gRPC services.
@ -40,93 +41,119 @@ type listenerCfg struct {
// Daemon is the struct that holds one instance of the loop client daemon.
type Daemon struct {
// To be used atomically. Declared first to optimize struct alignment.
started int32
// swapClientServer is the embedded RPC server that satisfies the client
// RPC interface. We embed this struct so the Daemon itself can be
// registered to an existing grpc.Server to run as a subserver in the
// same process.
swapClientServer
cfg *Config
listenerCfg *listenerCfg
// ErrChan is an error channel that users of the Daemon struct must use
// to detect runtime errors and also whether a shutdown is fully
// completed.
ErrChan chan error
cfg *Config
listenerCfg *listenerCfg
internalErrChan chan error
lnd *lndclient.GrpcLndServices
clientCleanup func()
lnd *lndclient.GrpcLndServices
wg sync.WaitGroup
quit chan struct{}
stopOnce sync.Once
mainCtx context.Context
mainCtxCancel func()
grpcServer *grpc.Server
grpcListener net.Listener
restServer *http.Server
restListener net.Listener
restCtxCancel func()
}
// New creates a new instance of the loop client daemon.
func New(config *Config, lisCfg *listenerCfg) *Daemon {
return &Daemon{
// We send exactly one error on this channel if something goes
// wrong at runtime. Or a nil value if the shutdown was
// successful. But in case nobody's listening, we don't want to
// block on it so we buffer it.
ErrChan: make(chan error, 1),
quit: make(chan struct{}),
cfg: config,
listenerCfg: lisCfg,
// We have 3 goroutines that could potentially send an error.
// We react on the first error but in case more than one exits
// with an error we don't want them to block.
internalErrChan: make(chan error, 3),
}
}
// Run runs loopd in daemon mode. It will listen for grpc connections,
// Start starts loopd in daemon mode. It will listen for grpc connections,
// execute commands and pass back swap status information.
func (d *Daemon) Run() error {
func (d *Daemon) Start() error {
// There should be no reason to start the daemon twice. Therefore return
// an error if that's tried.
if atomic.AddInt32(&d.started, 1) != 1 {
return errOnlyStartOnce
}
var err error
d.lnd, err = d.listenerCfg.getLnd(d.cfg.Network, d.cfg.Lnd)
if err != nil {
return err
}
defer d.lnd.Close()
// If no swap server is specified, use the default addresses for mainnet
// and testnet.
if d.cfg.SwapServer == "" {
// TODO(wilmer): Use onion service addresses when proxy is
// active.
switch d.cfg.Network {
case "mainnet":
d.cfg.SwapServer = mainnetServer
case "testnet":
d.cfg.SwapServer = testnetServer
default:
return errors.New("no swap server address specified")
}
}
log.Infof("Swap server address: %v", d.cfg.SwapServer)
// Create an instance of the loop client library.
swapClient, cleanup, err := getClient(d.cfg, &d.lnd.LndServices)
// With lnd connected, initialize everything else, such as the swap
// server client, the swap client RPC server instance and our main swap
// and error handlers. If this fails, then nothing has been started yet
// and we can just return the error.
err = d.initialize()
if err != nil {
return err
}
defer cleanup()
// Retrieve all currently existing swaps from the database.
swapsList, err := swapClient.FetchSwaps()
if err != nil {
return err
// If we get here, we already have started several goroutines. So if
// anything goes wrong now, we need to cleanly shut down again.
startErr := d.startWebServers()
if startErr != nil {
log.Errorf("Error while starting daemon: %v", err)
d.Stop()
stopErr := <-d.ErrChan
if stopErr != nil {
log.Errorf("Error while stopping daemon: %v", stopErr)
}
return startErr
}
swaps := make(map[lntypes.Hash]loop.SwapInfo)
for _, s := range swapsList {
swaps[s.SwapHash] = *s
}
return nil
}
// Instantiate the loopd gRPC server.
d.swapClientServer = swapClientServer{
impl: swapClient,
lnd: &d.lnd.LndServices,
swaps: swaps,
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
}
// startWebServers starts the gRPC and REST servers in goroutines.
func (d *Daemon) startWebServers() error {
var err error
// With our client created, let's now finish setting up and start our
// RPC server.
serverOpts := []grpc.ServerOption{}
grpcServer := grpc.NewServer(serverOpts...)
looprpc.RegisterSwapClientServer(grpcServer, d)
d.grpcServer = grpc.NewServer(serverOpts...)
looprpc.RegisterSwapClientServer(d.grpcServer, d)
// Next, start the gRPC server listening for HTTP/2 connections.
log.Infof("Starting gRPC listener")
grpcListener, err := d.listenerCfg.grpcListener()
d.grpcListener, err = d.listenerCfg.grpcListener()
if err != nil {
return fmt.Errorf("RPC server unable to listen on %s",
d.cfg.RPCListen)
return fmt.Errorf("RPC server unable to listen on %s: %v",
d.cfg.RPCListen, err)
}
defer grpcListener.Close()
// The default JSON marshaler of the REST proxy only sets OrigName to
// true, which instructs it to use the same field names as specified in
@ -142,7 +169,7 @@ func (d *Daemon) Run() error {
// We'll also create and start an accompanying proxy to serve clients
// through REST.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
d.restCtxCancel = cancel
mux := proxy.NewServeMux(customMarshalerOption)
var restHandler http.Handler = mux
if d.cfg.CORSOrigin != "" {
@ -159,100 +186,221 @@ func (d *Daemon) Run() error {
return err
}
restListener, err := d.listenerCfg.restListener()
d.restListener, err = d.listenerCfg.restListener()
if err != nil {
return fmt.Errorf("REST proxy unable to listen on %s",
d.cfg.RESTListen)
return fmt.Errorf("REST proxy unable to listen on %s: %v",
d.cfg.RESTListen, err)
}
// A nil listener indicates REST is disabled.
if restListener != nil {
if d.restListener != nil {
log.Infof("Starting REST proxy listener")
defer restListener.Close()
proxy := &http.Server{Handler: restHandler}
d.restServer = &http.Server{Handler: restHandler}
d.wg.Add(1)
go func() {
err := proxy.Serve(restListener)
defer d.wg.Done()
log.Infof("REST proxy listening on %s",
d.restListener.Addr())
err := d.restServer.Serve(d.restListener)
// ErrServerClosed is always returned when the proxy is
// shut down, so don't log it.
if err != nil && err != http.ErrServerClosed {
log.Error(err)
// Notify the main error handler goroutine that
// we exited unexpectedly here. We don't have to
// worry about blocking as the internal error
// channel is sufficiently buffered.
d.internalErrChan <- err
}
}()
} else {
log.Infof("REST proxy disabled")
}
mainCtx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
// Start the grpc server.
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Infof("RPC server listening on %s", d.grpcListener.Addr())
err = d.grpcServer.Serve(d.grpcListener)
if err != nil && err != grpc.ErrServerStopped {
// Notify the main error handler goroutine that
// we exited unexpectedly here. We don't have to
// worry about blocking as the internal error
// channel is sufficiently buffered.
d.internalErrChan <- err
}
}()
return nil
}
// initialize creates and initializes an instance of the swap server client,
// the swap client RPC server instance and our main swap and error handlers. If
// this method fails with an error then no goroutine was started yet and no
// cleanup is necessary. If it succeeds, then goroutines have been spawned.
func (d *Daemon) initialize() error {
// If no swap server is specified, use the default addresses for mainnet
// and testnet.
if d.cfg.SwapServer == "" {
// TODO(wilmer): Use onion service addresses when proxy is
// active.
switch d.cfg.Network {
case "mainnet":
d.cfg.SwapServer = mainnetServer
case "testnet":
d.cfg.SwapServer = testnetServer
default:
return errors.New("no swap server address specified")
}
}
log.Infof("Swap server address: %v", d.cfg.SwapServer)
// Create an instance of the loop client library.
swapclient, clientCleanup, err := getClient(d.cfg, &d.lnd.LndServices)
if err != nil {
return err
}
d.clientCleanup = clientCleanup
// Both the client RPC server and and the swap server client should
// stop on main context cancel. So we create it early and pass it down.
d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background())
// Now finally fully initialize the swap client RPC server instance.
d.swapClientServer = swapClientServer{
impl: swapclient,
lnd: &d.lnd.LndServices,
swaps: make(map[lntypes.Hash]loop.SwapInfo),
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
}
// Retrieve all currently existing swaps from the database.
swapsList, err := d.impl.FetchSwaps()
if err != nil {
// The client is the only thing we started yet, so if we clean
// up its connection now, nothing else needs to be shut down at
// this point.
clientCleanup()
return err
}
for _, s := range swapsList {
d.swaps[s.SwapHash] = *s
}
// Start the swap client itself.
wg.Add(1)
d.wg.Add(1)
go func() {
defer wg.Done()
defer d.wg.Done()
log.Infof("Starting swap client")
err := swapClient.Run(mainCtx, d.statusChan)
err := d.impl.Run(d.mainCtx, d.statusChan)
if err != nil {
log.Error(err)
// Notify the main error handler goroutine that
// we exited unexpectedly here. We don't have to
// worry about blocking as the internal error
// channel is sufficiently buffered.
d.internalErrChan <- err
}
log.Infof("Swap client stopped")
log.Infof("Stopping gRPC server")
grpcServer.Stop()
cancel()
}()
// Start a goroutine that broadcasts swap updates to clients.
wg.Add(1)
d.wg.Add(1)
go func() {
defer wg.Done()
defer d.wg.Done()
log.Infof("Waiting for updates")
d.processStatusUpdates(mainCtx)
d.processStatusUpdates(d.mainCtx)
}()
// Start the grpc server.
wg.Add(1)
// Last, start our internal error handler. This will return exactly one
// error or nil on the main error channel to inform the caller that
// something went wrong or that shutdown is complete. We don't add to
// the wait group here because this goroutine will itself wait for the
// stop to complete and signal its completion through the main error
// channel.
go func() {
defer wg.Done()
log.Infof("RPC server listening on %s", grpcListener.Addr())
if restListener != nil {
log.Infof("REST proxy listening on %s", restListener.Addr())
var runtimeErr error
// There are only two ways this goroutine can exit. Either there
// is an internal error or the caller requests shutdown. In both
// cases we wait for the stop to complete before we signal the
// caller that we're done.
select {
case runtimeErr = <-d.internalErrChan:
log.Errorf("Runtime error in daemon, shutting down: "+
"%v", runtimeErr)
case <-d.quit:
}
err = grpcServer.Serve(grpcListener)
if err != nil {
log.Error(err)
}
// We need to shutdown before sending the error on the channel,
// otherwise a caller might exit the process too early.
d.stop()
log.Info("Daemon exited")
// The caller expects exactly one message. So we send the error
// even if it's nil because we cleanly shut down.
d.ErrChan <- runtimeErr
}()
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, os.Interrupt)
return nil
}
// Run until the users terminates loopd or an error occurred.
select {
case <-interruptChannel:
log.Infof("Received SIGINT (Ctrl+C).")
// Stop tries to gracefully shut down the daemon. A caller needs to wait for a
// message on the main error channel indicating that the shutdown is completed.
func (d *Daemon) Stop() {
d.stopOnce.Do(func() {
close(d.quit)
})
}
// TODO: Remove debug code.
// Debug code to dump goroutines on hanging exit.
go func() {
time.Sleep(5 * time.Second)
_ = pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
}()
// stop does the actual shutdown and blocks until all goroutines have exit.
func (d *Daemon) stop() {
// First of all, we can cancel the main context that all event handlers
// are using. This should stop all swap activity and all event handlers
// should exit.
if d.mainCtxCancel != nil {
d.mainCtxCancel()
}
cancel()
case <-mainCtx.Done():
// As there is no swap activity anymore, we can forcefully shutdown the
// gRPC and HTTP servers now.
log.Infof("Stopping gRPC server")
if d.grpcServer != nil {
d.grpcServer.Stop()
}
log.Infof("Stopping REST server")
if d.restServer != nil {
// Don't return the error here, we first want to give everything
// else a chance to shut down cleanly.
err := d.restServer.Close()
if err != nil {
log.Errorf("Error stopping REST server: %v", err)
}
}
if d.restCtxCancel != nil {
d.restCtxCancel()
}
wg.Wait()
// Next, shut down the connections to lnd and the swap server.
if d.lnd != nil {
d.lnd.Close()
}
if d.clientCleanup != nil {
d.clientCleanup()
}
return nil
// Everything should be shutting down now, wait for completion.
d.wg.Wait()
}
// allowCORS wraps the given http.Handler with a function that adds the

@ -13,6 +13,7 @@ import (
"github.com/lightninglabs/loop/lndclient"
"github.com/lightningnetwork/lnd/build"
"github.com/lightningnetwork/lnd/lnrpc/verrpc"
"github.com/lightningnetwork/lnd/signal"
)
const defaultConfigFilename = "loopd.conf"
@ -168,8 +169,26 @@ func Start(rpcCfg RPCConfig) error {
// Execute command.
if parser.Active == nil {
signal.Intercept()
daemon := New(&config, lisCfg)
return daemon.Run()
if err := daemon.Start(); err != nil {
return err
}
select {
case <-signal.ShutdownChannel():
log.Infof("Received SIGINT (Ctrl+C).")
daemon.Stop()
// The above stop will return immediately. But we'll be
// notified on the error channel once the process is
// complete.
return <-daemon.ErrChan
case err := <-daemon.ErrChan:
return err
}
}
if parser.Active.Name == "view" {

@ -39,6 +39,7 @@ type swapClientServer struct {
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
mainCtx context.Context
}
// LoopOut initiates an loop out swap with the given parameters. The call
@ -264,8 +265,14 @@ func (s *swapClientServer) Monitor(in *looprpc.MonitorRequest,
if err := send(swap); err != nil {
return err
}
// The client cancels the subscription.
case <-server.Context().Done():
return nil
// The server is shutting down.
case <-s.mainCtx.Done():
return fmt.Errorf("server is shutting down")
}
}
}

Loading…
Cancel
Save