Merge pull request #715 from cfromknecht/reconnect-backoff

server: adds truncated exponential backoff + rand for retry
This commit is contained in:
Olaoluwa Osuntokun 2018-02-08 19:37:31 -08:00 committed by GitHub
commit 1b504b6041
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,10 +2,12 @@ package main
import ( import (
"bytes" "bytes"
"crypto/rand"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
"image/color" "image/color"
"math/big"
"net" "net"
"strconv" "strconv"
"sync" "sync"
@ -42,6 +44,14 @@ var (
// ErrServerShuttingDown indicates that the server is in the process of // ErrServerShuttingDown indicates that the server is in the process of
// gracefully exiting. // gracefully exiting.
ErrServerShuttingDown = errors.New("server is shutting down") ErrServerShuttingDown = errors.New("server is shutting down")
// defaultBackoff is the starting point for exponential backoff for
// reconnecting to persistent peers.
defaultBackoff = time.Second
// maximumBackoff is the largest backoff we will permit when
// reattempting connections to persistent peers.
maximumBackoff = time.Minute
) )
// server is the main server of the Lightning Network Daemon. The server houses // server is the main server of the Lightning Network Daemon. The server houses
@ -74,6 +84,7 @@ type server struct {
peerConnectedListeners map[string][]chan<- struct{} peerConnectedListeners map[string][]chan<- struct{}
persistentPeers map[string]struct{} persistentPeers map[string]struct{}
persistentPeersBackoff map[string]time.Duration
persistentConnReqs map[string][]*connmgr.ConnReq persistentConnReqs map[string][]*connmgr.ConnReq
// ignorePeerTermination tracks peers for which the server has initiated // ignorePeerTermination tracks peers for which the server has initiated
@ -158,6 +169,7 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
lightningID: sha256.Sum256(serializedPubKey), lightningID: sha256.Sum256(serializedPubKey),
persistentPeers: make(map[string]struct{}), persistentPeers: make(map[string]struct{}),
persistentPeersBackoff: make(map[string]time.Duration),
persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentConnReqs: make(map[string][]*connmgr.ConnReq),
ignorePeerTermination: make(map[*peer]struct{}), ignorePeerTermination: make(map[*peer]struct{}),
@ -663,6 +675,9 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
targetPub := string(conn.RemotePub().SerializeCompressed()) targetPub := string(conn.RemotePub().SerializeCompressed())
s.mu.Lock() s.mu.Lock()
s.persistentPeers[targetPub] = struct{}{} s.persistentPeers[targetPub] = struct{}{}
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = defaultBackoff
}
s.mu.Unlock() s.mu.Unlock()
s.OutboundPeerConnected(nil, conn) s.OutboundPeerConnected(nil, conn)
@ -775,6 +790,9 @@ func (s *server) peerBootstrapper(numTargetPeers uint32,
targetPub := string(conn.RemotePub().SerializeCompressed()) targetPub := string(conn.RemotePub().SerializeCompressed())
s.mu.Lock() s.mu.Lock()
s.persistentPeers[targetPub] = struct{}{} s.persistentPeers[targetPub] = struct{}{}
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = defaultBackoff
}
s.mu.Unlock() s.mu.Unlock()
s.OutboundPeerConnected(nil, conn) s.OutboundPeerConnected(nil, conn)
@ -941,6 +959,9 @@ func (s *server) establishPersistentConnections() error {
// Add this peer to the set of peers we should maintain a // Add this peer to the set of peers we should maintain a
// persistent connection with. // persistent connection with.
s.persistentPeers[pubStr] = struct{}{} s.persistentPeers[pubStr] = struct{}{}
if _, ok := s.persistentPeersBackoff[pubStr]; !ok {
s.persistentPeersBackoff[pubStr] = defaultBackoff
}
for _, address := range nodeAddr.addresses { for _, address := range nodeAddr.addresses {
// Create a wrapper address which couples the IP and // Create a wrapper address which couples the IP and
@ -1261,24 +1282,39 @@ func (s *server) peerTerminationWatcher(p *peer) {
return return
} }
srvrLog.Debugf("Attempting to re-establish persistent "+ // Otherwise, we'll launch a new connection request in order to
"connection to peer %v", p) // attempt to maintain a persistent connection with this peer.
// If so, then we'll attempt to re-establish a persistent
// connection to the peer.
// TODO(roasbeef): look up latest info for peer in database // TODO(roasbeef): look up latest info for peer in database
connReq := &connmgr.ConnReq{ connReq := &connmgr.ConnReq{
Addr: p.addr, Addr: p.addr,
Permanent: true, Permanent: true,
} }
// Otherwise, we'll launch a new connection requests in order
// to attempt to maintain a persistent connection with this
// peer.
s.persistentConnReqs[pubStr] = append( s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq) s.persistentConnReqs[pubStr], connReq)
go s.connMgr.Connect(connReq) // Compute the subsequent backoff duration.
currBackoff := s.persistentPeersBackoff[pubStr]
nextBackoff := computeNextBackoff(currBackoff)
s.persistentPeersBackoff[pubStr] = nextBackoff
// We choose not to wait group this go routine since the Connect
// call can stall for arbitrarily long if we shutdown while an
// outbound connection attempt is being made.
go func() {
srvrLog.Debugf("Scheduling connection re-establishment to "+
"persistent peer %v in %s", p, nextBackoff)
select {
case <-time.After(nextBackoff):
case <-s.quit:
return
}
srvrLog.Debugf("Attempting to re-establish persistent "+
"connection to peer %v", p)
s.connMgr.Connect(connReq)
}()
} }
} }
@ -1675,6 +1711,9 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
} }
s.persistentPeers[targetPub] = struct{}{} s.persistentPeers[targetPub] = struct{}{}
if _, ok := s.persistentPeersBackoff[targetPub]; !ok {
s.persistentPeersBackoff[targetPub] = defaultBackoff
}
s.persistentConnReqs[targetPub] = append( s.persistentConnReqs[targetPub] = append(
s.persistentConnReqs[targetPub], connReq) s.persistentConnReqs[targetPub], connReq)
s.mu.Unlock() s.mu.Unlock()
@ -1727,6 +1766,7 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
// them from this map so we don't attempt to re-connect after we // them from this map so we don't attempt to re-connect after we
// disconnect. // disconnect.
delete(s.persistentPeers, pubStr) delete(s.persistentPeers, pubStr)
delete(s.persistentPeersBackoff, pubStr)
// Remove the current peer from the server's internal state and signal // Remove the current peer from the server's internal state and signal
// that the peer termination watcher does not need to execute for this // that the peer termination watcher does not need to execute for this
@ -1850,3 +1890,31 @@ func parseHexColor(colorStr string) (color.RGBA, error) {
return color.RGBA{R: colorBytes[0], G: colorBytes[1], B: colorBytes[2]}, nil return color.RGBA{R: colorBytes[0], G: colorBytes[1], B: colorBytes[2]}, nil
} }
// computeNextBackoff uses a truncated exponential backoff to compute the next
// backoff using the value of the exiting backoff. The returned duration is
// randomized in either direction by 1/20 to prevent tight loops from
// stabilizing.
func computeNextBackoff(currBackoff time.Duration) time.Duration {
// Double the current backoff, truncating if it exceeds our maximum.
nextBackoff := 2 * currBackoff
if nextBackoff > maximumBackoff {
nextBackoff = maximumBackoff
}
// Using 1/10 of our duration as a margin, compute a random offset to
// avoid the nodes entering connection cycles.
margin := nextBackoff / 10
var wiggle big.Int
wiggle.SetUint64(uint64(margin))
if _, err := rand.Int(rand.Reader, &wiggle); err != nil {
// Randomizing is not mission critical, so we'll just return the
// current backoff.
return nextBackoff
}
// Otherwise add in our wiggle, but subtract out half of the margin so
// that the backoff can tweaked by 1/20 in either direction.
return nextBackoff + (time.Duration(wiggle.Uint64()) - margin/2)
}