Merge pull request #1205 from wpaulino/initial-peer-bootstrap
server: improve initial peer bootstrapping
This commit is contained in:
commit
dddbfa7efe
@ -4,10 +4,13 @@ import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
prand "math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/lightningnetwork/lnd/autopilot"
|
||||
@ -18,6 +21,10 @@ import (
|
||||
"github.com/roasbeef/btcutil/bech32"
|
||||
)
|
||||
|
||||
func init() {
|
||||
prand.Seed(time.Now().Unix())
|
||||
}
|
||||
|
||||
// NetworkPeerBootstrapper is an interface that represents an initial peer
|
||||
// bootstrap mechanism. This interface is to be used to bootstrap a new peer to
|
||||
// the connection by providing it with the pubkey+address of a set of existing
|
||||
@ -44,40 +51,64 @@ type NetworkPeerBootstrapper interface {
|
||||
// the ignore map is populated, then the bootstrappers will be instructed to
|
||||
// skip those nodes.
|
||||
func MultiSourceBootstrap(ignore map[autopilot.NodeID]struct{}, numAddrs uint32,
|
||||
bootStrappers ...NetworkPeerBootstrapper) ([]*lnwire.NetAddress, error) {
|
||||
bootstrappers ...NetworkPeerBootstrapper) ([]*lnwire.NetAddress, error) {
|
||||
|
||||
// We'll randomly shuffle our bootstrappers before querying them in
|
||||
// order to avoid from querying the same bootstrapper method over and
|
||||
// over, as some of these might tend to provide better/worse results
|
||||
// than others.
|
||||
bootstrappers = shuffleBootstrappers(bootstrappers)
|
||||
|
||||
var addrs []*lnwire.NetAddress
|
||||
for _, bootStrapper := range bootStrappers {
|
||||
for _, bootstrapper := range bootstrappers {
|
||||
// If we already have enough addresses, then we can exit early
|
||||
// w/o querying the additional bootstrappers.
|
||||
if uint32(len(addrs)) >= numAddrs {
|
||||
break
|
||||
}
|
||||
|
||||
log.Infof("Attempting to bootstrap with: %v", bootStrapper.Name())
|
||||
log.Infof("Attempting to bootstrap with: %v", bootstrapper.Name())
|
||||
|
||||
// If we still need additional addresses, then we'll compute
|
||||
// the number of address remaining that we need to fetch.
|
||||
numAddrsLeft := numAddrs - uint32(len(addrs))
|
||||
log.Tracef("Querying for %v addresses", numAddrsLeft)
|
||||
netAddrs, err := bootStrapper.SampleNodeAddrs(numAddrsLeft, ignore)
|
||||
netAddrs, err := bootstrapper.SampleNodeAddrs(numAddrsLeft, ignore)
|
||||
if err != nil {
|
||||
// If we encounter an error with a bootstrapper, then
|
||||
// we'll continue on to the next available
|
||||
// bootstrapper.
|
||||
log.Errorf("Unable to query bootstrapper %v: %v",
|
||||
bootStrapper.Name(), err)
|
||||
bootstrapper.Name(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
addrs = append(addrs, netAddrs...)
|
||||
}
|
||||
|
||||
if len(addrs) == 0 {
|
||||
return nil, errors.New("no addresses found")
|
||||
}
|
||||
|
||||
log.Infof("Obtained %v addrs to bootstrap network with", len(addrs))
|
||||
|
||||
return addrs, nil
|
||||
}
|
||||
|
||||
// shuffleBootstrappers shuffles the set of bootstrappers in order to avoid
|
||||
// querying the same bootstrapper over and over. To shuffle the set of
|
||||
// candidates, we use a version of the Fisher–Yates shuffle algorithm.
|
||||
func shuffleBootstrappers(candidates []NetworkPeerBootstrapper) []NetworkPeerBootstrapper {
|
||||
shuffled := make([]NetworkPeerBootstrapper, len(candidates))
|
||||
perm := prand.Perm(len(candidates))
|
||||
|
||||
for i, v := range perm {
|
||||
shuffled[v] = candidates[i]
|
||||
}
|
||||
|
||||
return shuffled
|
||||
}
|
||||
|
||||
// ChannelGraphBootstrapper is an implementation of the NetworkPeerBootstrapper
|
||||
// which attempts to retrieve advertised peers directly from the active channel
|
||||
// graph. This instance requires a backing autopilot.ChannelGraph instance in
|
||||
|
197
server.go
197
server.go
@ -39,14 +39,10 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrPeerNotConnected signals that the server has no connection to the
|
||||
// given peer.
|
||||
ErrPeerNotConnected = errors.New("peer is not connected")
|
||||
|
||||
// ErrServerShuttingDown indicates that the server is in the process of
|
||||
// gracefully exiting.
|
||||
ErrServerShuttingDown = errors.New("server is shutting down")
|
||||
const (
|
||||
// defaultMinPeers is the minimum number of peers nodes should always be
|
||||
// connected to.
|
||||
defaultMinPeers = 3
|
||||
|
||||
// defaultBackoff is the starting point for exponential backoff for
|
||||
// reconnecting to persistent peers.
|
||||
@ -57,6 +53,16 @@ var (
|
||||
maximumBackoff = time.Hour
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrPeerNotConnected signals that the server has no connection to the
|
||||
// given peer.
|
||||
ErrPeerNotConnected = errors.New("peer is not connected")
|
||||
|
||||
// ErrServerShuttingDown indicates that the server is in the process of
|
||||
// gracefully exiting.
|
||||
ErrServerShuttingDown = errors.New("server is shutting down")
|
||||
)
|
||||
|
||||
// server is the main server of the Lightning Network Daemon. The server houses
|
||||
// global state pertaining to the wallet, database, and the rpcserver.
|
||||
// Additionally, the server is also used as a central messaging bus to interact
|
||||
@ -734,6 +740,7 @@ func (s *server) Start() error {
|
||||
if err := s.chanRouter.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.connMgr.Start()
|
||||
|
||||
// With all the relevant sub-systems started, we'll now attempt to
|
||||
// establish persistent connections to our direct channel collaborators
|
||||
@ -742,21 +749,19 @@ func (s *server) Start() error {
|
||||
return err
|
||||
}
|
||||
|
||||
go s.connMgr.Start()
|
||||
|
||||
// If network bootstrapping hasn't been disabled, then we'll configure
|
||||
// the set of active bootstrappers, and launch a dedicated goroutine to
|
||||
// maintain a set of persistent connections.
|
||||
if !cfg.NoNetBootstrap && !(cfg.Bitcoin.SimNet || cfg.Litecoin.SimNet) &&
|
||||
!(cfg.Bitcoin.RegTest || cfg.Litecoin.RegTest) {
|
||||
|
||||
networkBootStrappers, err := initNetworkBootstrappers(s)
|
||||
bootstrappers, err := initNetworkBootstrappers(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.peerBootstrapper(3, networkBootStrappers)
|
||||
go s.peerBootstrapper(defaultMinPeers, bootstrappers)
|
||||
} else {
|
||||
srvrLog.Infof("Auto peer bootstrapping is disabled")
|
||||
}
|
||||
@ -1018,48 +1023,28 @@ func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, e
|
||||
}
|
||||
|
||||
// peerBootstrapper is a goroutine which is tasked with attempting to establish
|
||||
// and maintain a target min number of outbound connections. With this
|
||||
// and maintain a target minimum number of outbound connections. With this
|
||||
// invariant, we ensure that our node is connected to a diverse set of peers
|
||||
// and that nodes newly joining the network receive an up to date network view
|
||||
// as soon as possible.
|
||||
func (s *server) peerBootstrapper(numTargetPeers uint32,
|
||||
bootStrappers []discovery.NetworkPeerBootstrapper) {
|
||||
bootstrappers []discovery.NetworkPeerBootstrapper) {
|
||||
|
||||
defer s.wg.Done()
|
||||
|
||||
// To kick things off, we'll attempt to first query the set of
|
||||
// bootstrappers for enough address to fill our quot.
|
||||
bootStrapAddrs, err := discovery.MultiSourceBootstrap(
|
||||
nil, numTargetPeers, bootStrappers...,
|
||||
)
|
||||
if err != nil {
|
||||
// TODO(roasbeef): panic?
|
||||
srvrLog.Errorf("Unable to retrieve initial bootstrap "+
|
||||
"peers: %v", err)
|
||||
return
|
||||
}
|
||||
// ignore is a set used to keep track of peers already retrieved from
|
||||
// our bootstrappers in order to avoid duplicates.
|
||||
ignore := make(map[autopilot.NodeID]struct{})
|
||||
|
||||
srvrLog.Debugf("Attempting to bootstrap connectivity with %v initial "+
|
||||
"peers", len(bootStrapAddrs))
|
||||
// We'll start off by aggressively attempting connections to peers in
|
||||
// order to be a part of the network as soon as possible.
|
||||
s.initialPeerBootstrap(ignore, numTargetPeers, bootstrappers)
|
||||
|
||||
// With our initial set of peers obtained, we'll launch a goroutine to
|
||||
// attempt to connect out to each of them. We'll be waking up shortly
|
||||
// below to sample how many of these connections succeeded.
|
||||
for _, addr := range bootStrapAddrs {
|
||||
go func(a *lnwire.NetAddress) {
|
||||
conn, err := brontide.Dial(s.identityPriv, a, cfg.net.Dial)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("unable to connect to %v: %v",
|
||||
a, err)
|
||||
return
|
||||
}
|
||||
|
||||
s.OutboundPeerConnected(nil, conn)
|
||||
}(addr)
|
||||
}
|
||||
|
||||
// We'll start with a 15 second backoff, and double the time every time
|
||||
// an epoch fails up to a ceiling.
|
||||
// Once done, we'll attempt to maintain our target minimum number of
|
||||
// peers.
|
||||
//
|
||||
// We'll use a 15 second backoff, and double the time every time an
|
||||
// epoch fails up to a ceiling.
|
||||
const backOffCeiling = time.Minute * 5
|
||||
backOff := time.Second * 15
|
||||
|
||||
@ -1135,7 +1120,7 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
||||
s.mu.RUnlock()
|
||||
|
||||
peerAddrs, err := discovery.MultiSourceBootstrap(
|
||||
ignoreList, numNeeded*2, bootStrappers...,
|
||||
ignoreList, numNeeded*2, bootstrappers...,
|
||||
)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("Unable to retrieve bootstrap "+
|
||||
@ -1151,16 +1136,16 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
||||
go func(a *lnwire.NetAddress) {
|
||||
// TODO(roasbeef): can do AS, subnet,
|
||||
// country diversity, etc
|
||||
conn, err := brontide.Dial(s.identityPriv,
|
||||
a, cfg.net.Dial)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("unable to connect "+
|
||||
"to %v: %v", a, err)
|
||||
errChan := make(chan error, 1)
|
||||
s.connectToPeer(a, errChan)
|
||||
select {
|
||||
case err := <-errChan:
|
||||
srvrLog.Errorf("Unable to "+
|
||||
"connect to %v: %v",
|
||||
a, err)
|
||||
atomic.AddUint32(&epochErrors, 1)
|
||||
return
|
||||
case <-s.quit:
|
||||
}
|
||||
|
||||
s.OutboundPeerConnected(nil, conn)
|
||||
}(addr)
|
||||
}
|
||||
case <-s.quit:
|
||||
@ -1169,6 +1154,77 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
|
||||
}
|
||||
}
|
||||
|
||||
// initialPeerBootstrap attempts to continuously connect to peers on startup
|
||||
// until the target number of peers has been reached. This ensures that nodes
|
||||
// receive an up to date network view as soon as possible.
|
||||
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
|
||||
numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for {
|
||||
// Check if the server has been requested to shut down in order
|
||||
// to prevent blocking.
|
||||
if s.Stopped() {
|
||||
return
|
||||
}
|
||||
|
||||
// We can exit our aggressive initial peer bootstrapping stage
|
||||
// if we've reached out target number of peers.
|
||||
s.mu.RLock()
|
||||
numActivePeers := uint32(len(s.peersByPub))
|
||||
s.mu.RUnlock()
|
||||
|
||||
if numActivePeers >= numTargetPeers {
|
||||
return
|
||||
}
|
||||
|
||||
// Otherwise, we'll request for the remaining number of peers in
|
||||
// order to reach our target.
|
||||
peersNeeded := numTargetPeers - numActivePeers
|
||||
bootstrapAddrs, err := discovery.MultiSourceBootstrap(
|
||||
ignore, peersNeeded, bootstrappers...,
|
||||
)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("Unable to retrieve initial bootstrap "+
|
||||
"peers: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Then, we'll attempt to establish a connection to the
|
||||
// different peer addresses retrieved by our bootstrappers.
|
||||
for _, bootstrapAddr := range bootstrapAddrs {
|
||||
wg.Add(1)
|
||||
go func(addr *lnwire.NetAddress) {
|
||||
defer wg.Done()
|
||||
|
||||
errChan := make(chan error, 1)
|
||||
go s.connectToPeer(addr, errChan)
|
||||
|
||||
// We'll only allow this connection attempt to
|
||||
// take up to 3 seconds. This allows us to move
|
||||
// quickly by discarding peers that are slowing
|
||||
// us down.
|
||||
select {
|
||||
case err := <-errChan:
|
||||
srvrLog.Errorf("Unable to connect to "+
|
||||
"%v: %v", addr, err)
|
||||
// TODO: tune timeout? 3 seconds might be *too*
|
||||
// aggressive but works well.
|
||||
case <-time.After(3 * time.Second):
|
||||
srvrLog.Tracef("Skipping peer %v due "+
|
||||
"to not establishing a "+
|
||||
"connection within 3 seconds",
|
||||
addr)
|
||||
case <-s.quit:
|
||||
}
|
||||
}(bootstrapAddr)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
// initTorController initiliazes the Tor controller backed by lnd and
|
||||
// automatically sets up a v2 onion service in order to listen for inbound
|
||||
// connections over Tor.
|
||||
@ -2294,19 +2350,38 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
|
||||
// connect to the target peer. If the we can't make the connection, or
|
||||
// the crypto negotiation breaks down, then return an error to the
|
||||
// caller.
|
||||
conn, err := brontide.Dial(s.identityPriv, addr, cfg.net.Dial)
|
||||
if err != nil {
|
||||
errChan := make(chan error, 1)
|
||||
s.connectToPeer(addr, errChan)
|
||||
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return err
|
||||
case <-s.quit:
|
||||
return ErrServerShuttingDown
|
||||
}
|
||||
|
||||
// Once the connection has been made, we can notify the server of the
|
||||
// new connection via our public endpoint, which will require the lock
|
||||
// an add the peer to the server's internal state.
|
||||
s.OutboundPeerConnected(nil, conn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// connectToPeer establishes a connection to a remote peer. errChan is used to
|
||||
// notify the caller if the connection attempt has failed. Otherwise, it will be
|
||||
// closed.
|
||||
func (s *server) connectToPeer(addr *lnwire.NetAddress, errChan chan<- error) {
|
||||
conn, err := brontide.Dial(s.identityPriv, addr, cfg.net.Dial)
|
||||
if err != nil {
|
||||
srvrLog.Errorf("Unable to connect to %v: %v", addr, err)
|
||||
select {
|
||||
case errChan <- err:
|
||||
case <-s.quit:
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
close(errChan)
|
||||
|
||||
s.OutboundPeerConnected(nil, conn)
|
||||
}
|
||||
|
||||
// DisconnectPeer sends the request to server to close the connection with peer
|
||||
// identified by public key.
|
||||
//
|
||||
|
Loading…
Reference in New Issue
Block a user