server: introduce new peerBootstrapper goroutine to seed initial conns

This commit adds a new primary goroutine to the server struct:
peerBootstrapper. If peer boostrapping isn’t disabled in the config,
this new goroutine will be launched to attempt to establish a set of
initial connections for a new node. The logic is pretty straight
forward: first a set of initial connections is attempted, if after our
first epoch, we don’t have enough connections yet, then we’ll attempt
to query for an additional set. In each iteration, if we haven’t been
successful, then we increase our exponential backoff in order to not
spam any of our bootstrapping sources.
This commit is contained in:
Olaoluwa Osuntokun 2017-09-03 16:58:14 -07:00
parent 74ef963124
commit 1196c2e254
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

208
server.go
View File

@ -13,6 +13,7 @@ import (
"github.com/boltdb/bolt"
"github.com/lightningnetwork/lightning-onion"
"github.com/lightningnetwork/lnd/autopilot"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/discovery"
@ -359,6 +360,18 @@ func (s *server) Start() error {
go s.connMgr.Start()
// If network bootstrapping hasn't been disabled, then we'll configure
// the set of active bootstrappers, and launch a dedicated goroutine to
// maintain a set of persistent connections.
if !cfg.NoNetBootstrap {
networkBootStrappers, err := initNetworkBootstrappers(s)
if err != nil {
return err
}
s.wg.Add(1)
go s.peerBootstrapper(3, networkBootStrappers)
}
return nil
}
@ -409,6 +422,201 @@ func (s *server) WaitForShutdown() {
s.wg.Wait()
}
// initNetworkBootstrappers initializes a set of network peer bootstrappers
// based on the server, and currently active bootstrap mechanisms as defined
// within the current configuration.
func initNetworkBootstrappers(s *server) ([]discovery.NetworkPeerBootstrapper, error) {
srvrLog.Infof("Initializing peer network boostrappers!")
var bootStrappers []discovery.NetworkPeerBootstrapper
// First, we'll create an instance of the ChannelGraphBootstrapper as
// this can be used by default if we've already partially seeded the
// network.
chanGraph := autopilot.ChannelGraphFromDatabase(s.chanDB.ChannelGraph())
graphBootstrapper, err := discovery.NewGraphBootstrapper(chanGraph)
if err != nil {
return nil, err
}
bootStrappers = append(bootStrappers, graphBootstrapper)
// If this isn't simnet mode, then one of our additional bootstrapping
// sources will be the set of running DNS seeds.
if !cfg.Bitcoin.SimNet || !cfg.Litecoin.SimNet {
chainHash := reverseChainMap[registeredChains.PrimaryChain()]
dnsSeeds, ok := chainDNSSeeds[chainHash]
// If we have a set of DNS seeds for this chain, then we'll add
// it as an additional boostrapping source.
if ok {
srvrLog.Infof("Creating DNS peer boostrapper with "+
"seeds: %v", dnsSeeds)
dnsBootStrapper, err := discovery.NewDNSSeedBootstrapper(
dnsSeeds,
)
if err != nil {
return nil, err
}
bootStrappers = append(bootStrappers, dnsBootStrapper)
}
}
return bootStrappers, nil
}
// peerBootstrapper is a goroutine which is tasked with attempting to establish
// and maintain a target min number of outbound connections. With this
// invariant, we ensure that our node is connected to a diverse set of peers
// and that nodes newly joining the network receive an up to date network view
// as soon as possible.
func (s *server) peerBootstrapper(numTargetPeers uint32,
bootStrappers []discovery.NetworkPeerBootstrapper) {
defer s.wg.Done()
// To kick things off, we'll attempt to first query the set of
// bootstrappers for enough address to fill our quot.
bootStrapAddrs, err := discovery.MultiSourceBootstrap(
nil, numTargetPeers, bootStrappers...,
)
if err != nil {
// TODO(roasbeef): panic?
srvrLog.Errorf("Unable to retrieve initial bootstrap "+
"peers: %v", err)
return
}
srvrLog.Debug("Attempting to bootstrap connectivity with %v initial "+
"peers", len(bootStrapAddrs))
// With our initial set of peers obtained, we'll launch a goroutine to
// attempt to connect out to each of them. We'll be waking up shortly
// below to sample how many of these connections succeeded.
for _, addr := range bootStrapAddrs {
go func(a *lnwire.NetAddress) {
conn, err := brontide.Dial(s.identityPriv, a)
if err != nil {
srvrLog.Errorf("unable to connect to %v: %v",
a, err)
return
}
s.OutboundPeerConnected(nil, conn)
}(addr)
}
// We'll start with a 15 second backoff, and double the time every time
// an epoch fails up to a ceiling.
const backOffCeliing = time.Minute * 5
backOff := time.Second * 15
// We'll create a new ticker to wake us up every 15 seconds so we can
// see if we've reached our minimum number of peers.
sampleTicker := time.NewTicker(backOff)
defer sampleTicker.Stop()
// We'll use the number of attempts and errors to determine if we need
// to increase the time between discovery epochs.
var epochErrors, epochAttempts uint32
for {
select {
// The ticker has just woken us up, so we'll need to check if
// we need to attempt to connect our to any more peers.
case <-sampleTicker.C:
srvrLog.Infof("e=%v, a=%v",
atomic.LoadUint32(&epochErrors), epochAttempts)
// If all of our attempts failed during this last back
// off period, then will increase our backoff to 5
// minute ceiling to avoid an excessive number of
// queries
//
// TODO(roasbeef): add reverse policy too?
if epochAttempts > 0 &&
atomic.LoadUint32(&epochErrors) >= epochAttempts {
sampleTicker.Stop()
backOff *= 2
if backOff > backOffCeliing {
backOff = backOffCeliing
}
srvrLog.Debugf("Backing off peer bootstrapper to "+
"%v", backOff)
sampleTicker = time.NewTicker(backOff)
continue
}
atomic.StoreUint32(&epochErrors, 0)
epochAttempts = 0
// Obtain the current number of peers, so we can gauge
// if we need to sample more peers or not.
s.mu.Lock()
numActivePeers := uint32(len(s.peersByPub))
s.mu.Unlock()
// If we have enough peers, then we can loop back
// around to the next round as we're done here.
if numActivePeers >= numTargetPeers {
continue
}
// Since we know need more peers, we'll compute the
// exact number we need to reach our threshold.
numNeeded := numTargetPeers - numActivePeers
srvrLog.Debug("Attempting to obtain %v more network "+
"peers", numNeeded)
// With the number of peers we need calculated, we'll
// query the network bootstrappers to sample a set of
// random addrs for us.
s.mu.Lock()
ignoreList := make(map[autopilot.NodeID]struct{})
for _, peer := range s.peersByPub {
nID := autopilot.NewNodeID(peer.addr.IdentityKey)
ignoreList[nID] = struct{}{}
}
s.mu.Unlock()
peerAddrs, err := discovery.MultiSourceBootstrap(
ignoreList, numNeeded*2, bootStrappers...,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve bootstrap "+
"peers: %v", err)
continue
}
// Finally, we'll launch a new goroutine for each
// prospective peer candidates.
for _, addr := range peerAddrs {
epochAttempts += 1
go func(a *lnwire.NetAddress) {
// TODO(roasbeef): can do AS, subnet,
// country diversity, etc
conn, err := brontide.Dial(s.identityPriv, a)
if err != nil {
srvrLog.Errorf("unable to connect "+
"to %v: %v", a, err)
atomic.AddUint32(&epochErrors, 1)
return
}
s.OutboundPeerConnected(nil, conn)
}(addr)
}
case <-s.quit:
return
}
}
}
// genNodeAnnouncement generates and returns the current fully signed node
// announcement. If refresh is true, then the time stamp of the announcement
// will be updated in order to ensure it propagates through the network.