From aec8c8dc777fbbee815cbe34ad9ff22448810615 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 5 Jun 2017 15:18:06 -0700 Subject: [PATCH] lnd: decouple rpcServer from server, wait till chain synced before starting server MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit overhauls the way that lnd is created with the goal of ensuring the chain backends are fully synced up before the daemon itself starts. The rpcServer has been slightly decoupled from the server itself s.t we can start the rpcServer independently of the server. This is required as we’ll now wait (unless we’re in simnet mode) for the chain to fully sync up before we even _start_ any of the server’s goroutines. --- lnd.go | 132 ++++++++++++++++++++++++++++++++++++++++++++++----- rpcserver.go | 1 + server.go | 72 ++-------------------------- 3 files changed, 126 insertions(+), 79 deletions(-) diff --git a/lnd.go b/lnd.go index 30b6ce92..7dffec10 100644 --- a/lnd.go +++ b/lnd.go @@ -1,6 +1,7 @@ package main import ( + "crypto/rand" "fmt" "net" "net/http" @@ -8,15 +9,19 @@ import ( "os" "runtime" "strconv" + "time" "golang.org/x/net/context" "google.golang.org/grpc" + "github.com/btcsuite/btcd/btcec" flags "github.com/btcsuite/go-flags" proxy "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" ) var ( @@ -79,34 +84,89 @@ func lndMain() error { primaryChain := registeredChains.PrimaryChain() registeredChains.RegisterChain(primaryChain, activeChainControl) + idPrivKey, err := activeChainControl.wallet.GetIdentitykey() + if err != nil { + return err + } + idPrivKey.Curve = btcec.S256() + // Set up the core server which will listen for incoming peer // connections. defaultListenAddrs := []string{ net.JoinHostPort("", strconv.Itoa(cfg.PeerPort)), } - - // With all the relevant chains initialized, we can finally start the - // server itself. - server, err := newServer(defaultListenAddrs, chanDB, activeChainControl) + server, err := newServer(defaultListenAddrs, chanDB, activeChainControl, + idPrivKey) if err != nil { srvrLog.Errorf("unable to create server: %v\n", err) return err } - if err := server.Start(); err != nil { - srvrLog.Errorf("unable to create to start server: %v\n", err) + + nodeSigner := newNodeSigner(idPrivKey) + var chanIDSeed [32]byte + if _, err := rand.Read(chanIDSeed[:]); err != nil { return err } + fundingMgr, err := newFundingManager(fundingConfig{ + IDKey: idPrivKey.PubKey(), + Wallet: activeChainControl.wallet, + Notifier: activeChainControl.chainNotifier, + FeeEstimator: activeChainControl.feeEstimator, + SignMessage: func(pubKey *btcec.PublicKey, + msg []byte) (*btcec.Signature, error) { - addInterruptHandler(func() { - ltndLog.Infof("Gracefully shutting down the server...") - server.Stop() - server.WaitForShutdown() + if pubKey.IsEqual(idPrivKey.PubKey()) { + return nodeSigner.SignMessage(pubKey, msg) + } + + return activeChainControl.msgSigner.SignMessage( + pubKey, msg, + ) + }, + SendAnnouncement: func(msg lnwire.Message) error { + server.discoverSrv.ProcessLocalAnnouncement(msg, + idPrivKey.PubKey()) + return nil + }, + ArbiterChan: server.breachArbiter.newContracts, + SendToPeer: server.sendToPeer, + FindPeer: server.findPeer, + TempChanIDSeed: chanIDSeed, + FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { + dbChannels, err := chanDB.FetchAllChannels() + if err != nil { + return nil, err + } + + for _, channel := range dbChannels { + if chanID.IsChanPoint(channel.ChanID) { + return lnwallet.NewLightningChannel( + activeChainControl.signer, + activeChainControl.chainNotifier, + activeChainControl.feeEstimator, + channel) + } + } + + return nil, fmt.Errorf("unable to find channel") + }, }) + if err != nil { + return err + } + if err := fundingMgr.Start(); err != nil { + return err + } + server.fundingMgr = fundingMgr // Initialize, and register our implementation of the gRPC server. var opts []grpc.ServerOption + rpcServer := newRPCServer(server) + if err := rpcServer.Start(); err != nil { + return err + } grpcServer := grpc.NewServer(opts...) - lnrpc.RegisterLightningServer(grpcServer, server.rpcServer) + lnrpc.RegisterLightningServer(grpcServer, rpcServer) // Next, Start the grpc server listening for HTTP/2 connections. grpcEndpoint := fmt.Sprintf("localhost:%d", loadedConfig.RPCPort) @@ -137,6 +197,56 @@ func lndMain() error { http.ListenAndServe(":8080", mux) }() + // If we're not in simnet mode, We'll wait until we're fully synced to + // continue the start up of the remainder of the daemon. This ensures + // that we don't accept any possibly invalid state transitions, or + // accept channels with spent funds. + if !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) { + _, bestHeight, err := activeChainControl.chainIO.GetBestBlock() + if err != nil { + return err + } + + ltndLog.Infof("Waiting for chain backend to finish sync, "+ + "start_height=%v", bestHeight) + + for { + synced, err := activeChainControl.wallet.IsSynced() + if err != nil { + return err + } + + if synced { + break + } + + time.Sleep(time.Second * 1) + } + + _, bestHeight, err = activeChainControl.chainIO.GetBestBlock() + if err != nil { + return err + } + + ltndLog.Infof("Chain backend is fully synced (end_height=%v)!", + bestHeight) + } + + // With all the relevant chains initialized, we can finally start the + // server itself. + if err := server.Start(); err != nil { + srvrLog.Errorf("unable to create to start server: %v\n", err) + return err + } + + addInterruptHandler(func() { + ltndLog.Infof("Gracefully shutting down the server...") + rpcServer.Stop() + fundingMgr.Stop() + server.Stop() + server.WaitForShutdown() + }) + // Wait for shutdown signal from either a graceful server stop or from // the interrupt handler. <-shutdownChannel diff --git a/rpcserver.go b/rpcserver.go index 6f0079ec..347bc23f 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -781,6 +781,7 @@ func (r *rpcServer) GetInfo(ctx context.Context, for i, chain := range registeredChains.ActiveChains() { activeChains[i] = chain.String() } + // TODO(roasbeef): add synced height n stuff return &lnrpc.GetInfoResponse{ IdentityPubkey: hex.EncodeToString(idPub), diff --git a/server.go b/server.go index 3d7be8f7..c684379b 100644 --- a/server.go +++ b/server.go @@ -2,7 +2,6 @@ package main import ( "bytes" - "crypto/rand" "crypto/sha256" "encoding/hex" "fmt" @@ -18,7 +17,6 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/lnrpc" - "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/btcec" @@ -57,8 +55,6 @@ type server struct { inboundPeers map[string]*peer outboundPeers map[string]*peer - rpcServer *rpcServer - cc *chainControl fundingMgr *fundingManager @@ -103,12 +99,10 @@ type server struct { // newServer creates a new instance of the server which is to listen using the // passed listener address. -func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl) (*server, error) { - privKey, err := cc.wallet.GetIdentitykey() - if err != nil { - return nil, err - } - privKey.Curve = btcec.S256() +func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, + privKey *btcec.PrivateKey) (*server, error) { + + var err error listeners := make([]net.Listener, len(listenAddrs)) for i, addr := range listenAddrs { @@ -264,59 +258,9 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl) (*s return nil, err } - s.rpcServer = newRPCServer(s) s.breachArbiter = newBreachArbiter(cc.wallet, chanDB, cc.chainNotifier, s.htlcSwitch, s.cc.chainIO, s.cc.feeEstimator) - var chanIDSeed [32]byte - if _, err := rand.Read(chanIDSeed[:]); err != nil { - return nil, err - } - - s.fundingMgr, err = newFundingManager(fundingConfig{ - IDKey: s.identityPriv.PubKey(), - Wallet: cc.wallet, - ChainIO: s.cc.chainIO, - Notifier: s.cc.chainNotifier, - FeeEstimator: s.cc.feeEstimator, - SignMessage: func(pubKey *btcec.PublicKey, msg []byte) (*btcec.Signature, error) { - if pubKey.IsEqual(s.identityPriv.PubKey()) { - return s.nodeSigner.SignMessage(pubKey, msg) - } - - return cc.msgSigner.SignMessage(pubKey, msg) - }, - SendAnnouncement: func(msg lnwire.Message) error { - s.discoverSrv.ProcessLocalAnnouncement(msg, - s.identityPriv.PubKey()) - return nil - }, - ArbiterChan: s.breachArbiter.newContracts, - SendToPeer: s.sendToPeer, - FindPeer: s.findPeer, - TempChanIDSeed: chanIDSeed, - FindChannel: func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error) { - dbChannels, err := chanDB.FetchAllChannels() - if err != nil { - return nil, err - } - - for _, channel := range dbChannels { - if chanID.IsChanPoint(channel.ChanID) { - return lnwallet.NewLightningChannel( - cc.signer, cc.chainNotifier, - cc.feeEstimator, - channel) - } - } - - return nil, fmt.Errorf("unable to find channel") - }, - }) - if err != nil { - return nil, err - } - // TODO(roasbeef): introduce closure and config system to decouple the // initialization above ^ @@ -357,12 +301,6 @@ func (s *server) Start() error { return err } - if err := s.rpcServer.Start(); err != nil { - return err - } - if err := s.fundingMgr.Start(); err != nil { - return err - } if err := s.htlcSwitch.Start(); err != nil { return err } @@ -403,8 +341,6 @@ func (s *server) Stop() error { // Shutdown the wallet, funding manager, and the rpc server. s.cc.chainNotifier.Stop() - s.rpcServer.Stop() - s.fundingMgr.Stop() s.chanRouter.Stop() s.htlcSwitch.Stop() s.utxoNursery.Stop()