server: improve initial peer bootstrapping
In this commit, we address an existing issue with regards to the inital peer bootstrapping stage. At times, the bootstrappers can be unreliable by providing addresses for peers that no longer exist/are currently offline. This would lead to nodes quickly entering an exponential backoff method used to maintain a minimum target of peers without first achieving said target. We address this by separating the peer bootstrapper into two stages: the initial peer bootstrapping and maintaining a target set of nodes to maintain an up-to-date view of the network. The initial peer bootstrapping stage has been made aggressive in order to provide such view of the network as quickly as possible. Once done, we continue on with the existing exponential backoff method responsible for maintaining a target set of nodes.
This commit is contained in:
parent
26636ce994
commit
9593e3772e
197
server.go
197
server.go
@ -39,14 +39,10 @@ import (
|
|||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
const (
|
||||||
// ErrPeerNotConnected signals that the server has no connection to the
|
// defaultMinPeers is the minimum number of peers nodes should always be
|
||||||
// given peer.
|
// connected to.
|
||||||
ErrPeerNotConnected = errors.New("peer is not connected")
|
defaultMinPeers = 3
|
||||||
|
|
||||||
// ErrServerShuttingDown indicates that the server is in the process of
|
|
||||||
// gracefully exiting.
|
|
||||||
ErrServerShuttingDown = errors.New("server is shutting down")
|
|
||||||
|
|
||||||
// defaultBackoff is the starting point for exponential backoff for
|
// defaultBackoff is the starting point for exponential backoff for
|
||||||
// reconnecting to persistent peers.
|
// reconnecting to persistent peers.
|
||||||
@ -57,6 +53,16 @@ var (
|
|||||||
maximumBackoff = time.Hour
|
maximumBackoff = time.Hour
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrPeerNotConnected signals that the server has no connection to the
|
||||||
|
// given peer.
|
||||||
|
ErrPeerNotConnected = errors.New("peer is not connected")
|
||||||
|
|
||||||
|
// ErrServerShuttingDown indicates that the server is in the process of
|
||||||
|
// gracefully exiting.
|
||||||
|
ErrServerShuttingDown = errors.New("server is shutting down")
|
||||||
|
)
|
||||||
|
|
||||||
// server is the main server of the Lightning Network Daemon. The server houses
|
// server is the main server of the Lightning Network Daemon. The server houses
|
||||||
// global state pertaining to the wallet, database, and the rpcserver.
|
// global state pertaining to the wallet, database, and the rpcserver.
|
||||||
// Additionally, the server is also used as a central messaging bus to interact
|
// Additionally, the server is also used as a central messaging bus to interact
|
||||||
@ -734,6 +740,7 @@ func (s *server) Start() error {
|
|||||||
if err := s.chanRouter.Start(); err != nil {
|
if err := s.chanRouter.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
s.connMgr.Start()
|
||||||
|
|
||||||
// With all the relevant sub-systems started, we'll now attempt to
|
// With all the relevant sub-systems started, we'll now attempt to
|
||||||
// establish persistent connections to our direct channel collaborators
|
// establish persistent connections to our direct channel collaborators
|
||||||
@ -742,21 +749,19 @@ func (s *server) Start() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.connMgr.Start()
|
|
||||||
|
|
||||||
// If network bootstrapping hasn't been disabled, then we'll configure
|
// If network bootstrapping hasn't been disabled, then we'll configure
|
||||||
// the set of active bootstrappers, and launch a dedicated goroutine to
|
// the set of active bootstrappers, and launch a dedicated goroutine to
|
||||||
// maintain a set of persistent connections.
|
// maintain a set of persistent connections.
|
||||||
if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) &&
|
if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) &&
|
||||||
!(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) {
|
!(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) {
|
||||||
|
|
||||||
networkBootStrappers, err := initNetworkBootstrappers(s)
|
bootstrappers, err := initNetworkBootstrappers(s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.wg.Add(1)
|
s.wg.Add(1)
|
||||||
go s.peerBootstrapper(3, networkBootStrappers)
|
go s.peerBootstrapper(defaultMinPeers, bootstrappers)
|
||||||
} else {
|
} else {
|
||||||
srvrLog.Infof("Auto peer bootstrapping is disabled")
|
srvrLog.Infof("Auto peer bootstrapping is disabled")
|
||||||
}
|
}
|
||||||
@ -1018,48 +1023,28 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e
|
|||||||
}
|
}
|
||||||
|
|
||||||
// peerBootstrapper is a goroutine which is tasked with attempting to establish
|
// peerBootstrapper is a goroutine which is tasked with attempting to establish
|
||||||
// and maintain a target min number of outbound connections. With this
|
// and maintain a target minimum number of outbound connections. With this
|
||||||
// invariant, we ensure that our node is connected to a diverse set of peers
|
// invariant, we ensure that our node is connected to a diverse set of peers
|
||||||
// and that nodes newly joining the network receive an up to date network view
|
// and that nodes newly joining the network receive an up to date network view
|
||||||
// as soon as possible.
|
// as soon as possible.
|
||||||
func (s *server) peerBootstrapper(numTargetPeers uint32,
|
func (s *server) peerBootstrapper(numTargetPeers uint32,
|
||||||
bootStrappers []discovery.NetworkPeerBootstrapper) {
|
bootstrappers []discovery.NetworkPeerBootstrapper) {
|
||||||
|
|
||||||
defer s.wg.Done()
|
defer s.wg.Done()
|
||||||
|
|
||||||
// To kick things off, we'll attempt to first query the set of
|
// ignore is a set used to keep track of peers already retrieved from
|
||||||
// bootstrappers for enough address to fill our quot.
|
// our bootstrappers in order to avoid duplicates.
|
||||||
bootStrapAddrs, err := discovery.MultiSourceBootstrap(
|
ignore := make(map[autopilot.NodeID]struct{})
|
||||||
nil, numTargetPeers, bootStrappers...,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
// TODO(roasbeef): panic?
|
|
||||||
srvrLog.Errorf("Unable to retrieve initial bootstrap "+
|
|
||||||
"peers: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
srvrLog.Debugf("Attempting to bootstrap connectivity with %v initial "+
|
// We'll start off by aggressively attempting connections to peers in
|
||||||
"peers", len(bootStrapAddrs))
|
// order to be a part of the network as soon as possible.
|
||||||
|
s.initialPeerBootstrap(ignore, numTargetPeers, bootstrappers)
|
||||||
|
|
||||||
// With our initial set of peers obtained, we'll launch a goroutine to
|
// Once done, we'll attempt to maintain our target minimum number of
|
||||||
// attempt to connect out to each of them. We'll be waking up shortly
|
// peers.
|
||||||
// below to sample how many of these connections succeeded.
|
//
|
||||||
for _, addr := range bootStrapAddrs {
|
// We'll use a 15 second backoff, and double the time every time an
|
||||||
go func(a *lnwire.NetAddress) {
|
// epoch fails up to a ceiling.
|
||||||
conn, err := brontide.Dial(s.identityPriv, a, cfg.net.Dial)
|
|
||||||
if err != nil {
|
|
||||||
srvrLog.Errorf("unable to connect to %v: %v",
|
|
||||||
a, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
s.OutboundPeerConnected(nil, conn)
|
|
||||||
}(addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// We'll start with a 15 second backoff, and double the time every time
|
|
||||||
// an epoch fails up to a ceiling.
|
|
||||||
const backOffCeiling = time.Minute * 5
|
const backOffCeiling = time.Minute * 5
|
||||||
backOff := time.Second * 15
|
backOff := time.Second * 15
|
||||||
|
|
||||||
@ -1135,7 +1120,7 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
|||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
|
|
||||||
peerAddrs, err := discovery.MultiSourceBootstrap(
|
peerAddrs, err := discovery.MultiSourceBootstrap(
|
||||||
ignoreList, numNeeded*2, bootStrappers...,
|
ignoreList, numNeeded*2, bootstrappers...,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
srvrLog.Errorf("Unable to retrieve bootstrap "+
|
srvrLog.Errorf("Unable to retrieve bootstrap "+
|
||||||
@ -1151,16 +1136,16 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
|||||||
go func(a *lnwire.NetAddress) {
|
go func(a *lnwire.NetAddress) {
|
||||||
// TODO(roasbeef): can do AS, subnet,
|
// TODO(roasbeef): can do AS, subnet,
|
||||||
// country diversity, etc
|
// country diversity, etc
|
||||||
conn, err := brontide.Dial(s.identityPriv,
|
errChan := make(chan error, 1)
|
||||||
a, cfg.net.Dial)
|
s.connectToPeer(a, errChan)
|
||||||
if err != nil {
|
select {
|
||||||
srvrLog.Errorf("unable to connect "+
|
case err := <-errChan:
|
||||||
"to %v: %v", a, err)
|
srvrLog.Errorf("Unable to "+
|
||||||
|
"connect to %v: %v",
|
||||||
|
a, err)
|
||||||
atomic.AddUint32(&epochErrors, 1)
|
atomic.AddUint32(&epochErrors, 1)
|
||||||
return
|
case <-s.quit:
|
||||||
}
|
}
|
||||||
|
|
||||||
s.OutboundPeerConnected(nil, conn)
|
|
||||||
}(addr)
|
}(addr)
|
||||||
}
|
}
|
||||||
case <-s.quit:
|
case <-s.quit:
|
||||||
@ -1169,6 +1154,77 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initialPeerBootstrap attempts to continuously connect to peers on startup
|
||||||
|
// until the target number of peers has been reached. This ensures that nodes
|
||||||
|
// receive an up to date network view as soon as possible.
|
||||||
|
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
|
||||||
|
numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) {
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Check if the server has been requested to shut down in order
|
||||||
|
// to prevent blocking.
|
||||||
|
if s.Stopped() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// We can exit our aggressive initial peer bootstrapping stage
|
||||||
|
// if we've reached out target number of peers.
|
||||||
|
s.mu.RLock()
|
||||||
|
numActivePeers := uint32(len(s.peersByPub))
|
||||||
|
s.mu.RUnlock()
|
||||||
|
|
||||||
|
if numActivePeers >= numTargetPeers {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise, we'll request for the remaining number of peers in
|
||||||
|
// order to reach our target.
|
||||||
|
peersNeeded := numTargetPeers - numActivePeers
|
||||||
|
bootstrapAddrs, err := discovery.MultiSourceBootstrap(
|
||||||
|
ignore, peersNeeded, bootstrappers...,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
srvrLog.Errorf("Unable to retrieve initial bootstrap "+
|
||||||
|
"peers: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then, we'll attempt to establish a connection to the
|
||||||
|
// different peer addresses retrieved by our bootstrappers.
|
||||||
|
for _, bootstrapAddr := range bootstrapAddrs {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(addr *lnwire.NetAddress) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
go s.connectToPeer(addr, errChan)
|
||||||
|
|
||||||
|
// We'll only allow this connection attempt to
|
||||||
|
// take up to 3 seconds. This allows us to move
|
||||||
|
// quickly by discarding peers that are slowing
|
||||||
|
// us down.
|
||||||
|
select {
|
||||||
|
case err := <-errChan:
|
||||||
|
srvrLog.Errorf("Unable to connect to "+
|
||||||
|
"%v: %v", addr, err)
|
||||||
|
// TODO: tune timeout? 3 seconds might be *too*
|
||||||
|
// aggressive but works well.
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
srvrLog.Tracef("Skipping peer %v due "+
|
||||||
|
"to not establishing a "+
|
||||||
|
"connection within 3 seconds",
|
||||||
|
addr)
|
||||||
|
case <-s.quit:
|
||||||
|
}
|
||||||
|
}(bootstrapAddr)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// initTorController initiliazes the Tor controller backed by lnd and
|
// initTorController initiliazes the Tor controller backed by lnd and
|
||||||
// automatically sets up a v2 onion service in order to listen for inbound
|
// automatically sets up a v2 onion service in order to listen for inbound
|
||||||
// connections over Tor.
|
// connections over Tor.
|
||||||
@ -2294,19 +2350,38 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
|
|||||||
// connect to the target peer. If the we can't make the connection, or
|
// connect to the target peer. If the we can't make the connection, or
|
||||||
// the crypto negotiation breaks down, then return an error to the
|
// the crypto negotiation breaks down, then return an error to the
|
||||||
// caller.
|
// caller.
|
||||||
conn, err := brontide.Dial(s.identityPriv, addr, cfg.net.Dial)
|
errChan := make(chan error, 1)
|
||||||
if err != nil {
|
s.connectToPeer(addr, errChan)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-errChan:
|
||||||
return err
|
return err
|
||||||
|
case <-s.quit:
|
||||||
|
return ErrServerShuttingDown
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once the connection has been made, we can notify the server of the
|
|
||||||
// new connection via our public endpoint, which will require the lock
|
|
||||||
// an add the peer to the server's internal state.
|
|
||||||
s.OutboundPeerConnected(nil, conn)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// connectToPeer establishes a connection to a remote peer. errChan is used to
|
||||||
|
// notify the caller if the connection attempt has failed. Otherwise, it will be
|
||||||
|
// closed.
|
||||||
|
func (s *server) connectToPeer(addr *lnwire.NetAddress, errChan chan<- error) {
|
||||||
|
conn, err := brontide.Dial(s.identityPriv, addr, cfg.net.Dial)
|
||||||
|
if err != nil {
|
||||||
|
srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
|
||||||
|
select {
|
||||||
|
case errChan <- err:
|
||||||
|
case <-s.quit:
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
close(errChan)
|
||||||
|
|
||||||
|
s.OutboundPeerConnected(nil, conn)
|
||||||
|
}
|
||||||
|
|
||||||
// DisconnectPeer sends the request to server to close the connection with peer
|
// DisconnectPeer sends the request to server to close the connection with peer
|
||||||
// identified by public key.
|
// identified by public key.
|
||||||
//
|
//
|
||||||
|
Loading…
Reference in New Issue
Block a user