diff --git a/discovery/bootstrapper.go b/discovery/bootstrapper.go index be192ef1..412dec65 100644 --- a/discovery/bootstrapper.go +++ b/discovery/bootstrapper.go @@ -4,10 +4,13 @@ import ( "bytes" "crypto/rand" "crypto/sha256" + "errors" "fmt" + prand "math/rand" "net" "strconv" "strings" + "time" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/autopilot" @@ -18,6 +21,10 @@ import ( "github.com/roasbeef/btcutil/bech32" ) +func init() { + prand.Seed(time.Now().Unix()) +} + // NetworkPeerBootstrapper is an interface that represents an initial peer // bootstrap mechanism. This interface is to be used to bootstrap a new peer to // the connection by providing it with the pubkey+address of a set of existing @@ -44,40 +51,64 @@ type NetworkPeerBootstrapper interface { // the ignore map is populated, then the bootstrappers will be instructed to // skip those nodes. func MultiSourceBootstrap(ignore map[autopilot.NodeID]struct{}, numAddrs uint32, - bootStrappers ...NetworkPeerBootstrapper) ([]*lnwire.NetAddress, error) { + bootstrappers ...NetworkPeerBootstrapper) ([]*lnwire.NetAddress, error) { + + // We'll randomly shuffle our bootstrappers before querying them in + // order to avoid from querying the same bootstrapper method over and + // over, as some of these might tend to provide better/worse results + // than others. + bootstrappers = shuffleBootstrappers(bootstrappers) var addrs []*lnwire.NetAddress - for _, bootStrapper := range bootStrappers { + for _, bootstrapper := range bootstrappers { // If we already have enough addresses, then we can exit early // w/o querying the additional bootstrappers. if uint32(len(addrs)) >= numAddrs { break } - log.Infof("Attempting to bootstrap with: %v", bootStrapper.Name()) + log.Infof("Attempting to bootstrap with: %v", bootstrapper.Name()) // If we still need additional addresses, then we'll compute // the number of address remaining that we need to fetch. numAddrsLeft := numAddrs - uint32(len(addrs)) log.Tracef("Querying for %v addresses", numAddrsLeft) - netAddrs, err := bootStrapper.SampleNodeAddrs(numAddrsLeft, ignore) + netAddrs, err := bootstrapper.SampleNodeAddrs(numAddrsLeft, ignore) if err != nil { // If we encounter an error with a bootstrapper, then // we'll continue on to the next available // bootstrapper. log.Errorf("Unable to query bootstrapper %v: %v", - bootStrapper.Name(), err) + bootstrapper.Name(), err) continue } addrs = append(addrs, netAddrs...) } + if len(addrs) == 0 { + return nil, errors.New("no addresses found") + } + log.Infof("Obtained %v addrs to bootstrap network with", len(addrs)) return addrs, nil } +// shuffleBootstrappers shuffles the set of bootstrappers in order to avoid +// querying the same bootstrapper over and over. To shuffle the set of +// candidates, we use a version of the Fisher–Yates shuffle algorithm. +func shuffleBootstrappers(candidates []NetworkPeerBootstrapper) []NetworkPeerBootstrapper { + shuffled := make([]NetworkPeerBootstrapper, len(candidates)) + perm := prand.Perm(len(candidates)) + + for i, v := range perm { + shuffled[v] = candidates[i] + } + + return shuffled +} + // ChannelGraphBootstrapper is an implementation of the NetworkPeerBootstrapper // which attempts to retrieve advertised peers directly from the active channel // graph. This instance requires a backing autopilot.ChannelGraph instance in diff --git a/server.go b/server.go index a324b445..45ad1dbb 100644 --- a/server.go +++ b/server.go @@ -39,14 +39,10 @@ import ( "golang.org/x/net/context" ) -var ( - // ErrPeerNotConnected signals that the server has no connection to the - // given peer. - ErrPeerNotConnected = errors.New("peer is not connected") - - // ErrServerShuttingDown indicates that the server is in the process of - // gracefully exiting. - ErrServerShuttingDown = errors.New("server is shutting down") +const ( + // defaultMinPeers is the minimum number of peers nodes should always be + // connected to. + defaultMinPeers = 3 // defaultBackoff is the starting point for exponential backoff for // reconnecting to persistent peers. @@ -57,6 +53,16 @@ var ( maximumBackoff = time.Hour ) +var ( + // ErrPeerNotConnected signals that the server has no connection to the + // given peer. + ErrPeerNotConnected = errors.New("peer is not connected") + + // ErrServerShuttingDown indicates that the server is in the process of + // gracefully exiting. + ErrServerShuttingDown = errors.New("server is shutting down") +) + // server is the main server of the Lightning Network Daemon. The server houses // global state pertaining to the wallet, database, and the rpcserver. // Additionally, the server is also used as a central messaging bus to interact @@ -734,6 +740,7 @@ func (s *server) Start() error { if err := s.chanRouter.Start(); err != nil { return err } + s.connMgr.Start() // With all the relevant sub-systems started, we'll now attempt to // establish persistent connections to our direct channel collaborators @@ -742,21 +749,19 @@ func (s *server) Start() error { return err } - go s.connMgr.Start() - // If network bootstrapping hasn't been disabled, then we'll configure // the set of active bootstrappers, and launch a dedicated goroutine to // maintain a set of persistent connections. if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) && !(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) { - networkBootStrappers, err := initNetworkBootstrappers(s) + bootstrappers, err := initNetworkBootstrappers(s) if err != nil { return err } s.wg.Add(1) - go s.peerBootstrapper(3, networkBootStrappers) + go s.peerBootstrapper(defaultMinPeers, bootstrappers) } else { srvrLog.Infof("Auto peer bootstrapping is disabled") } @@ -1018,48 +1023,28 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e } // peerBootstrapper is a goroutine which is tasked with attempting to establish -// and maintain a target min number of outbound connections. With this +// and maintain a target minimum number of outbound connections. With this // invariant, we ensure that our node is connected to a diverse set of peers // and that nodes newly joining the network receive an up to date network view // as soon as possible. func (s *server) peerBootstrapper(numTargetPeers uint32, - bootStrappers []discovery.NetworkPeerBootstrapper) { + bootstrappers []discovery.NetworkPeerBootstrapper) { defer s.wg.Done() - // To kick things off, we'll attempt to first query the set of - // bootstrappers for enough address to fill our quot. - bootStrapAddrs, err := discovery.MultiSourceBootstrap( - nil, numTargetPeers, bootStrappers..., - ) - if err != nil { - // TODO(roasbeef): panic? - srvrLog.Errorf("Unable to retrieve initial bootstrap "+ - "peers: %v", err) - return - } + // ignore is a set used to keep track of peers already retrieved from + // our bootstrappers in order to avoid duplicates. + ignore := make(map[autopilot.NodeID]struct{}) - srvrLog.Debugf("Attempting to bootstrap connectivity with %v initial "+ - "peers", len(bootStrapAddrs)) + // We'll start off by aggressively attempting connections to peers in + // order to be a part of the network as soon as possible. + s.initialPeerBootstrap(ignore, numTargetPeers, bootstrappers) - // With our initial set of peers obtained, we'll launch a goroutine to - // attempt to connect out to each of them. We'll be waking up shortly - // below to sample how many of these connections succeeded. - for _, addr := range bootStrapAddrs { - go func(a *lnwire.NetAddress) { - conn, err := brontide.Dial(s.identityPriv, a, cfg.net.Dial) - if err != nil { - srvrLog.Errorf("unable to connect to %v: %v", - a, err) - return - } - - s.OutboundPeerConnected(nil, conn) - }(addr) - } - - // We'll start with a 15 second backoff, and double the time every time - // an epoch fails up to a ceiling. + // Once done, we'll attempt to maintain our target minimum number of + // peers. + // + // We'll use a 15 second backoff, and double the time every time an + // epoch fails up to a ceiling. const backOffCeiling = time.Minute * 5 backOff := time.Second * 15 @@ -1135,7 +1120,7 @@ func (s *server) peerBootstrapper(numTargetPeers uint32, s.mu.RUnlock() peerAddrs, err := discovery.MultiSourceBootstrap( - ignoreList, numNeeded*2, bootStrappers..., + ignoreList, numNeeded*2, bootstrappers..., ) if err != nil { srvrLog.Errorf("Unable to retrieve bootstrap "+ @@ -1151,16 +1136,16 @@ func (s *server) peerBootstrapper(numTargetPeers uint32, go func(a *lnwire.NetAddress) { // TODO(roasbeef): can do AS, subnet, // country diversity, etc - conn, err := brontide.Dial(s.identityPriv, - a, cfg.net.Dial) - if err != nil { - srvrLog.Errorf("unable to connect "+ - "to %v: %v", a, err) + errChan := make(chan error, 1) + s.connectToPeer(a, errChan) + select { + case err := <-errChan: + srvrLog.Errorf("Unable to "+ + "connect to %v: %v", + a, err) atomic.AddUint32(&epochErrors, 1) - return + case <-s.quit: } - - s.OutboundPeerConnected(nil, conn) }(addr) } case <-s.quit: @@ -1169,6 +1154,77 @@ func (s *server) peerBootstrapper(numTargetPeers uint32, } } +// initialPeerBootstrap attempts to continuously connect to peers on startup +// until the target number of peers has been reached. This ensures that nodes +// receive an up to date network view as soon as possible. +func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{}, + numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) { + + var wg sync.WaitGroup + + for { + // Check if the server has been requested to shut down in order + // to prevent blocking. + if s.Stopped() { + return + } + + // We can exit our aggressive initial peer bootstrapping stage + // if we've reached out target number of peers. + s.mu.RLock() + numActivePeers := uint32(len(s.peersByPub)) + s.mu.RUnlock() + + if numActivePeers >= numTargetPeers { + return + } + + // Otherwise, we'll request for the remaining number of peers in + // order to reach our target. + peersNeeded := numTargetPeers - numActivePeers + bootstrapAddrs, err := discovery.MultiSourceBootstrap( + ignore, peersNeeded, bootstrappers..., + ) + if err != nil { + srvrLog.Errorf("Unable to retrieve initial bootstrap "+ + "peers: %v", err) + continue + } + + // Then, we'll attempt to establish a connection to the + // different peer addresses retrieved by our bootstrappers. + for _, bootstrapAddr := range bootstrapAddrs { + wg.Add(1) + go func(addr *lnwire.NetAddress) { + defer wg.Done() + + errChan := make(chan error, 1) + go s.connectToPeer(addr, errChan) + + // We'll only allow this connection attempt to + // take up to 3 seconds. This allows us to move + // quickly by discarding peers that are slowing + // us down. + select { + case err := <-errChan: + srvrLog.Errorf("Unable to connect to "+ + "%v: %v", addr, err) + // TODO: tune timeout? 3 seconds might be *too* + // aggressive but works well. + case <-time.After(3 * time.Second): + srvrLog.Tracef("Skipping peer %v due "+ + "to not establishing a "+ + "connection within 3 seconds", + addr) + case <-s.quit: + } + }(bootstrapAddr) + } + + wg.Wait() + } +} + // initTorController initiliazes the Tor controller backed by lnd and // automatically sets up a v2 onion service in order to listen for inbound // connections over Tor. @@ -2294,19 +2350,38 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { // connect to the target peer. If the we can't make the connection, or // the crypto negotiation breaks down, then return an error to the // caller. - conn, err := brontide.Dial(s.identityPriv, addr, cfg.net.Dial) - if err != nil { + errChan := make(chan error, 1) + s.connectToPeer(addr, errChan) + + select { + case err := <-errChan: return err + case <-s.quit: + return ErrServerShuttingDown } - // Once the connection has been made, we can notify the server of the - // new connection via our public endpoint, which will require the lock - // an add the peer to the server's internal state. - s.OutboundPeerConnected(nil, conn) - return nil } +// connectToPeer establishes a connection to a remote peer. errChan is used to +// notify the caller if the connection attempt has failed. Otherwise, it will be +// closed. +func (s *server) connectToPeer(addr *lnwire.NetAddress, errChan chan<- error) { + conn, err := brontide.Dial(s.identityPriv, addr, cfg.net.Dial) + if err != nil { + srvrLog.Errorf("Unable to connect to %v: %v", addr, err) + select { + case errChan <- err: + case <-s.quit: + } + return + } + + close(errChan) + + s.OutboundPeerConnected(nil, conn) +} + // DisconnectPeer sends the request to server to close the connection with peer // identified by public key. //