Merge pull request #3253 from Roasbeef/second-dns-seed

multi: add secondary DNS seed for bootstrap, add exponential back off to initial bootstrap
This commit is contained in:
Olaoluwa Osuntokun 2019-06-28 18:32:04 -07:00 committed by GitHub
commit 6a4179e224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 177 additions and 128 deletions

@ -576,6 +576,9 @@ var (
"nodes.lightning.directory", "nodes.lightning.directory",
"soa.nodes.lightning.directory", "soa.nodes.lightning.directory",
}, },
{
"lseed.bitcoinstats.com",
},
}, },
bitcoinTestnetGenesis: { bitcoinTestnetGenesis: {

@ -379,59 +379,64 @@ func (d *DNSSeedBootstrapper) SampleNodeAddrs(numAddrs uint32,
var netAddrs []*lnwire.NetAddress var netAddrs []*lnwire.NetAddress
// We'll continue this loop until we reach our target address limit. // We'll try all the registered DNS seeds, exiting early if one of them
// Each SRV query to the seed will return 25 random nodes, so we can // gives us all the peers we need.
// continue to query until we reach our target. //
// TODO(roasbeef): should combine results from both
search: search:
for uint32(len(netAddrs)) < numAddrs {
for _, dnsSeedTuple := range d.dnsSeeds { for _, dnsSeedTuple := range d.dnsSeeds {
// We'll first query the seed with an SRV record so we // We'll first query the seed with an SRV record so we can
// can obtain a random sample of the encoded public // obtain a random sample of the encoded public keys of nodes.
// keys of nodes. We use the lndLookupSRV function for // We use the lndLookupSRV function for this task.
// this task.
primarySeed := dnsSeedTuple[0] primarySeed := dnsSeedTuple[0]
_, addrs, err := d.net.LookupSRV("nodes", "tcp", primarySeed) _, addrs, err := d.net.LookupSRV("nodes", "tcp", primarySeed)
if err != nil { if err != nil {
log.Tracef("Unable to lookup SRV records via "+ log.Tracef("Unable to lookup SRV records via "+
"primary seed: %v", err) "primary seed (%v): %v", primarySeed, err)
log.Trace("Falling back to secondary") log.Trace("Falling back to secondary")
// If the host of the secondary seed is blank, // If the host of the secondary seed is blank, then
// then we'll bail here as we can't proceed. // we'll bail here as we can't proceed.
if dnsSeedTuple[1] == "" { if dnsSeedTuple[1] == "" {
return nil, fmt.Errorf("Secondary seed is blank") log.Tracef("DNS seed %v has no secondary, "+
"skipping fallback", primarySeed)
continue
} }
// If we get an error when trying to query via // If we get an error when trying to query via the
// the primary seed, we'll fallback to the // primary seed, we'll fallback to the secondary seed
// secondary seed before concluding failure. // before concluding failure.
soaShim := dnsSeedTuple[1] soaShim := dnsSeedTuple[1]
addrs, err = d.fallBackSRVLookup( addrs, err = d.fallBackSRVLookup(
soaShim, primarySeed, soaShim, primarySeed,
) )
if err != nil { if err != nil {
return nil, err log.Tracef("Unable to query fall "+
"back dns seed (%v): %v", soaShim, err)
continue
} }
log.Tracef("Successfully queried fallback DNS seed") log.Tracef("Successfully queried fallback DNS seed")
} }
log.Tracef("Retrieved SRV records from dns seed: %v", log.Tracef("Retrieved SRV records from dns seed: %v",
spew.Sdump(addrs)) newLogClosure(func() string {
return spew.Sdump(addrs)
}),
)
// Next, we'll need to issue an A record request for // Next, we'll need to issue an A record request for each of
// each of the nodes, skipping it if nothing comes // the nodes, skipping it if nothing comes back.
// back.
for _, nodeSrv := range addrs { for _, nodeSrv := range addrs {
if uint32(len(netAddrs)) >= numAddrs { if uint32(len(netAddrs)) >= numAddrs {
break search break search
} }
// With the SRV target obtained, we'll now // With the SRV target obtained, we'll now perform
// perform another query to obtain the IP // another query to obtain the IP address for the
// address for the matching bech32 encoded node // matching bech32 encoded node key. We use the
// key. We use the lndLookup function for this // lndLookup function for this task.
// task.
bechNodeHost := nodeSrv.Target bechNodeHost := nodeSrv.Target
addrs, err := d.net.LookupHost(bechNodeHost) addrs, err := d.net.LookupHost(bechNodeHost)
if err != nil { if err != nil {
@ -446,20 +451,27 @@ search:
log.Tracef("Attempting to convert: %v", bechNodeHost) log.Tracef("Attempting to convert: %v", bechNodeHost)
// If we have a set of valid addresses, then // If the host isn't correctly formatted, then we'll
// we'll need to parse the public key from the // skip it.
// original bech32 encoded string. if len(bechNodeHost) == 0 ||
!strings.Contains(bechNodeHost, ".") {
continue
}
// If we have a set of valid addresses, then we'll need
// to parse the public key from the original bech32
// encoded string.
bechNode := strings.Split(bechNodeHost, ".") bechNode := strings.Split(bechNodeHost, ".")
_, nodeBytes5Bits, err := bech32.Decode(bechNode[0]) _, nodeBytes5Bits, err := bech32.Decode(bechNode[0])
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Once we have the bech32 decoded pubkey, // Once we have the bech32 decoded pubkey, we'll need
// we'll need to convert the 5-bit word // to convert the 5-bit word grouping into our regular
// grouping into our regular 8-bit word // 8-bit word grouping so we can convert it into a
// grouping so we can convert it into a public // public key.
// key.
nodeBytes, err := bech32.ConvertBits( nodeBytes, err := bech32.ConvertBits(
nodeBytes5Bits, 5, 8, false, nodeBytes5Bits, 5, 8, false,
) )
@ -473,9 +485,8 @@ search:
return nil, err return nil, err
} }
// If we have an ignore list, and this node is // If we have an ignore list, and this node is in the
// in the ignore list, then we'll go to the // ignore list, then we'll go to the next candidate.
// next candidate.
if ignore != nil { if ignore != nil {
nID := autopilot.NewNodeID(nodeKey) nID := autopilot.NewNodeID(nodeKey)
if _, ok := ignore[nID]; ok { if _, ok := ignore[nID]; ok {
@ -483,21 +494,22 @@ search:
} }
} }
// Finally we'll convert the host:port peer to // Finally we'll convert the host:port peer to a proper
// a proper TCP address to use within the // TCP address to use within the lnwire.NetAddress. We
// lnwire.NetAddress. We don't need to use // don't need to use the lndResolveTCP function here
// the lndResolveTCP function here because we // because we already have the host:port peer.
// already have the host:port peer. addr := net.JoinHostPort(
addr := net.JoinHostPort(addrs[0], addrs[0],
strconv.FormatUint(uint64(nodeSrv.Port), 10)) strconv.FormatUint(uint64(nodeSrv.Port), 10),
)
tcpAddr, err := net.ResolveTCPAddr("tcp", addr) tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// Finally, with all the information parsed, // Finally, with all the information parsed, we'll
// we'll return this fully valid address as a // return this fully valid address as a connection
// connection attempt. // attempt.
lnAddr := &lnwire.NetAddress{ lnAddr := &lnwire.NetAddress{
IdentityKey: nodeKey, IdentityKey: nodeKey,
Address: tcpAddr, Address: tcpAddr,
@ -509,7 +521,6 @@ search:
netAddrs = append(netAddrs, lnAddr) netAddrs = append(netAddrs, lnAddr)
} }
} }
}
return netAddrs, nil return netAddrs, nil
} }

@ -1600,7 +1600,6 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
// //
// We'll use a 15 second backoff, and double the time every time an // We'll use a 15 second backoff, and double the time every time an
// epoch fails up to a ceiling. // epoch fails up to a ceiling.
const backOffCeiling = time.Minute * 5
backOff := time.Second * 15 backOff := time.Second * 15
// We'll create a new ticker to wake us up every 15 seconds so we can // We'll create a new ticker to wake us up every 15 seconds so we can
@ -1643,8 +1642,8 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
sampleTicker.Stop() sampleTicker.Stop()
backOff *= 2 backOff *= 2
if backOff > backOffCeiling { if backOff > bootstrapBackOffCeiling {
backOff = backOffCeiling backOff = bootstrapBackOffCeiling
} }
srvrLog.Debugf("Backing off peer bootstrapper to "+ srvrLog.Debugf("Backing off peer bootstrapper to "+
@ -1713,15 +1712,27 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
} }
} }
// bootstrapBackOffCeiling is the maximum amount of time we'll wait between
// failed attempts to locate a set of bootstrap peers. We'll slowly double our
// query back off each time we encounter a failure.
const bootstrapBackOffCeiling = time.Minute * 5
// initialPeerBootstrap attempts to continuously connect to peers on startup // initialPeerBootstrap attempts to continuously connect to peers on startup
// until the target number of peers has been reached. This ensures that nodes // until the target number of peers has been reached. This ensures that nodes
// receive an up to date network view as soon as possible. // receive an up to date network view as soon as possible.
func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{}, func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) { numTargetPeers uint32, bootstrappers []discovery.NetworkPeerBootstrapper) {
var wg sync.WaitGroup // We'll start off by waiting 2 seconds between failed attempts, then
// double each time we fail until we hit the bootstrapBackOffCeiling.
var delaySignal <-chan time.Time
delayTime := time.Second * 2
for { // As want to be more aggressive, we'll use a lower back off celling
// then the main peer bootstrap logic.
backOffCeiling := bootstrapBackOffCeiling / 5
for attempts := 0; ; attempts++ {
// Check if the server has been requested to shut down in order // Check if the server has been requested to shut down in order
// to prevent blocking. // to prevent blocking.
if s.Stopped() { if s.Stopped() {
@ -1738,8 +1749,31 @@ func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
return return
} }
// Otherwise, we'll request for the remaining number of peers in if attempts > 0 {
// order to reach our target. srvrLog.Debugf("Waiting %v before trying to locate "+
"bootstrap peers (attempt #%v)", delayTime,
attempts)
// We've completed at least one iterating and haven't
// finished, so we'll start to insert a delay period
// between each attempt.
delaySignal = time.After(delayTime)
select {
case <-delaySignal:
case <-s.quit:
return
}
// After our delay, we'll double the time we wait up to
// the max back off period.
delayTime *= 2
if delayTime > backOffCeiling {
delayTime = backOffCeiling
}
}
// Otherwise, we'll request for the remaining number of peers
// in order to reach our target.
peersNeeded := numTargetPeers - numActivePeers peersNeeded := numTargetPeers - numActivePeers
bootstrapAddrs, err := discovery.MultiSourceBootstrap( bootstrapAddrs, err := discovery.MultiSourceBootstrap(
ignore, peersNeeded, bootstrappers..., ignore, peersNeeded, bootstrappers...,
@ -1752,6 +1786,7 @@ func (s *server) initialPeerBootstrap(ignore map[autopilot.NodeID]struct{},
// Then, we'll attempt to establish a connection to the // Then, we'll attempt to establish a connection to the
// different peer addresses retrieved by our bootstrappers. // different peer addresses retrieved by our bootstrappers.
var wg sync.WaitGroup
for _, bootstrapAddr := range bootstrapAddrs { for _, bootstrapAddr := range bootstrapAddrs {
wg.Add(1) wg.Add(1)
go func(addr *lnwire.NetAddress) { go func(addr *lnwire.NetAddress) {

@ -316,8 +316,8 @@ func (q *sessionQueue) drainBackups() {
// before attempting to dequeue any pending updates. // before attempting to dequeue any pending updates.
stateUpdate, isPending, backupID, err := q.nextStateUpdate() stateUpdate, isPending, backupID, err := q.nextStateUpdate()
if err != nil { if err != nil {
log.Errorf("SessionQueue(%s) unable to get next state "+ log.Errorf("SessionQueue(%v) unable to get next state "+
"update: %v", err) "update: %v", q.ID(), err)
return return
} }
@ -557,7 +557,7 @@ func (q *sessionQueue) sendStateUpdate(conn wtserver.Peer,
// TODO(conner): borked watchtower // TODO(conner): borked watchtower
err = fmt.Errorf("unable to ack seqnum=%d: %v", err = fmt.Errorf("unable to ack seqnum=%d: %v",
stateUpdate.SeqNum, err) stateUpdate.SeqNum, err)
log.Errorf("SessionQueue(%s) failed to ack update: %v", err) log.Errorf("SessionQueue(%v) failed to ack update: %v", q.ID(), err)
return err return err
case err == wtdb.ErrLastAppliedReversion: case err == wtdb.ErrLastAppliedReversion: