Merge pull request #1658 from cfromknecht/async-peer-start

[server]: Start Peers Asynchronously
This commit is contained in:
Olaoluwa Osuntokun 2018-08-15 20:49:56 -07:00 committed by GitHub
commit 15eededf45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 265 additions and 214 deletions

16
peer.go

@ -43,7 +43,7 @@ const (
idleTimeout = 5 * time.Minute
// writeMessageTimeout is the timeout used when writing a message to peer.
writeMessageTimeout = 10 * time.Second
writeMessageTimeout = 50 * time.Second
// outgoingQueueLen is the buffer size of the channel which houses
// messages to be sent across the wire, requested by objects outside
@ -567,8 +567,18 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
// WaitForDisconnect waits until the peer has disconnected. A peer may be
// disconnected if the local or remote side terminating the connection, or an
// irrecoverable protocol error has been encountered.
func (p *peer) WaitForDisconnect() {
// irrecoverable protocol error has been encountered. This method will only
// begin watching the peer's waitgroup after the ready channel or the peer's
// quit channel are signaled. The ready channel should only be signaled if a
// call to Start returns no error. Otherwise, if the peer fails to start,
// calling Disconnect will signal the quit channel and the method will not
// block, since no goroutines were spawned.
func (p *peer) WaitForDisconnect(ready chan struct{}) {
select {
case <-ready:
case <-p.quit:
}
p.wg.Wait()
}

463
server.go

@ -1917,152 +1917,6 @@ func (s *server) findPeerByPubStr(pubStr string) (*peer, error) {
return peer, nil
}
// peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
// and then cleans up all resources allocated to the peer, notifies relevant
// sub-systems of its demise, and finally handles re-connecting to the peer if
// it's persistent. If the server intentionally disconnects a peer, it should
// have a corresponding entry in the ignorePeerTermination map which will cause
// the cleanup routine to exit early.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerTerminationWatcher(p *peer) {
defer s.wg.Done()
p.WaitForDisconnect()
srvrLog.Debugf("Peer %v has been disconnected", p)
// If the server is exiting then we can bail out early ourselves as all
// the other sub-systems will already be shutting down.
if s.Stopped() {
return
}
// Next, we'll cancel all pending funding reservations with this node.
// If we tried to initiate any funding flows that haven't yet finished,
// then we need to unlock those committed outputs so they're still
// available for use.
s.fundingMgr.CancelPeerReservations(p.PubKey())
pubKey := p.addr.IdentityKey
// We'll also inform the gossiper that this peer is no longer active,
// so we don't need to maintain sync state for it any longer.
s.authGossiper.PruneSyncState(pubKey)
// Tell the switch to remove all links associated with this peer.
// Passing nil as the target link indicates that all links associated
// with this interface should be closed.
//
// TODO(roasbeef): instead add a PurgeInterfaceLinks function?
links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes)
if err != nil {
srvrLog.Errorf("unable to get channel links: %v", err)
}
for _, link := range links {
p.server.htlcSwitch.RemoveLink(link.ChanID())
}
s.mu.Lock()
defer s.mu.Unlock()
// If the server has already removed this peer, we can short circuit the
// peer termination watcher and skip cleanup.
if _, ok := s.ignorePeerTermination[p]; ok {
delete(s.ignorePeerTermination, p)
pubKey := p.PubKey()
pubStr := string(pubKey[:])
// If a connection callback is present, we'll go ahead and
// execute it now that previous peer has fully disconnected. If
// the callback is not present, this likely implies the peer was
// purposefully disconnected via RPC, and that no reconnect
// should be attempted.
connCallback, ok := s.scheduledPeerConnection[pubStr]
if ok {
delete(s.scheduledPeerConnection, pubStr)
connCallback()
}
return
}
// First, cleanup any remaining state the server has regarding the peer
// in question.
s.removePeer(p)
// Next, check to see if this is a persistent peer or not.
pubStr := string(pubKey.SerializeCompressed())
_, ok := s.persistentPeers[pubStr]
if ok {
// We'll only need to re-launch a connection request if one
// isn't already currently pending.
if _, ok := s.persistentConnReqs[pubStr]; ok {
return
}
// We'll ensure that we locate an advertised address to use
// within the peer's address for reconnection purposes.
//
// TODO(roasbeef): use them all?
if p.inbound {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(
pubKey,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve advertised "+
"address for node %x: %v",
pubKey.SerializeCompressed(), err)
} else {
p.addr.Address = advertisedAddr
}
}
// Otherwise, we'll launch a new connection request in order to
// attempt to maintain a persistent connection with this peer.
connReq := &connmgr.ConnReq{
Addr: p.addr,
Permanent: true,
}
s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq)
// Record the computed backoff in the backoff map.
backoff := s.nextPeerBackoff(pubStr, p.StartTime())
s.persistentPeersBackoff[pubStr] = backoff
// Initialize a retry canceller for this peer if one does not
// exist.
cancelChan, ok := s.persistentRetryCancels[pubStr]
if !ok {
cancelChan = make(chan struct{})
s.persistentRetryCancels[pubStr] = cancelChan
}
// We choose not to wait group this go routine since the Connect
// call can stall for arbitrarily long if we shutdown while an
// outbound connection attempt is being made.
go func() {
srvrLog.Debugf("Scheduling connection re-establishment to "+
"persistent peer %v in %s", p, backoff)
select {
case <-time.After(backoff):
case <-cancelChan:
return
case <-s.quit:
return
}
srvrLog.Debugf("Attempting to re-establish persistent "+
"connection to peer %v", p)
s.connMgr.Connect(connReq)
}()
}
}
// nextPeerBackoff computes the next backoff duration for a peer's pubkey using
// exponential backoff. If no previous backoff was known, the default is
// returned.
@ -2110,64 +1964,6 @@ func (s *server) shouldRequestGraphSync() bool {
return len(s.peersByPub) <= 2
}
// peerConnected is a function that handles initialization a newly connected
// peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly. The inbound
// boolean should be true if the peer initiated the connection to us.
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
inbound bool) {
brontideConn := conn.(*brontide.Conn)
addr := conn.RemoteAddr()
pubKey := brontideConn.RemotePub()
srvrLog.Infof("finalizing connection to %x, inbound=%v",
pubKey.SerializeCompressed(), inbound)
peerAddr := &lnwire.NetAddress{
IdentityKey: pubKey,
Address: addr,
ChainNet: activeNetParams.Net,
}
// With the brontide connection established, we'll now craft the local
// feature vector to advertise to the remote node.
localFeatures := lnwire.NewRawFeatureVector()
// We'll signal that we understand the data loss protection feature,
// and also that we support the new gossip query features.
localFeatures.Set(lnwire.DataLossProtectOptional)
localFeatures.Set(lnwire.GossipQueriesOptional)
// We'll only request a full channel graph sync if we detect that that
// we aren't fully synced yet.
if s.shouldRequestGraphSync() {
// TODO(roasbeef): only do so if gossiper doesn't have active
// peers?
localFeatures.Set(lnwire.InitialRoutingSync)
}
// Now that we've established a connection, create a peer, and it to
// the set of currently active peers.
p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
}
// TODO(roasbeef): update IP address for link-node
// * also mark last-seen, do it one single transaction?
// Attempt to start the peer, if we're unable to do so, then disconnect
// this peer.
if err := p.Start(); err != nil {
p.Disconnect(errors.Errorf("unable to start peer: %v", err))
return
}
s.addPeer(p)
}
// shouldDropConnection determines if our local connection to a remote peer
// should be dropped in the case of concurrent connection establishment. In
// order to deterministically decide which connection should be dropped, we'll
@ -2426,6 +2222,63 @@ func (s *server) cancelConnReqs(pubStr string, skip *uint64) {
delete(s.persistentConnReqs, pubStr)
}
// peerConnected is a function that handles initialization a newly connected
// peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly. The inbound
// boolean should be true if the peer initiated the connection to us.
func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
inbound bool) {
brontideConn := conn.(*brontide.Conn)
addr := conn.RemoteAddr()
pubKey := brontideConn.RemotePub()
srvrLog.Infof("Finalizing connection to %x, inbound=%v",
pubKey.SerializeCompressed(), inbound)
peerAddr := &lnwire.NetAddress{
IdentityKey: pubKey,
Address: addr,
ChainNet: activeNetParams.Net,
}
// With the brontide connection established, we'll now craft the local
// feature vector to advertise to the remote node.
localFeatures := lnwire.NewRawFeatureVector()
// We'll signal that we understand the data loss protection feature,
// and also that we support the new gossip query features.
localFeatures.Set(lnwire.DataLossProtectOptional)
localFeatures.Set(lnwire.GossipQueriesOptional)
// We'll only request a full channel graph sync if we detect that that
// we aren't fully synced yet.
if s.shouldRequestGraphSync() {
// TODO(roasbeef): only do so if gossiper doesn't have active
// peers?
localFeatures.Set(lnwire.InitialRoutingSync)
}
// Now that we've established a connection, create a peer, and it to
// the set of currently active peers.
p, err := newPeer(conn, connReq, s, peerAddr, inbound, localFeatures)
if err != nil {
srvrLog.Errorf("unable to create peer %v", err)
return
}
// TODO(roasbeef): update IP address for link-node
// * also mark last-seen, do it one single transaction?
s.addPeer(p)
// Dispatch a goroutine to asynchronously start the peer. This process
// includes sending and receiving Init messages, which would be a DOS
// vector if we held the server's mutex throughout the procedure.
s.wg.Add(1)
go s.peerInitializer(p)
}
// addPeer adds the passed peer to the server's global state of all active
// peers.
func (s *server) addPeer(p *peer) {
@ -2453,15 +2306,49 @@ func (s *server) addPeer(p *peer) {
} else {
s.outboundPeers[pubStr] = p
}
}
// Launch a goroutine to watch for the unexpected termination of this
// peer, which will ensure all resources are properly cleaned up, and
// re-establish persistent connections when necessary. The peer
// termination watcher will be short circuited if the peer is ever
// added to the ignorePeerTermination map, indicating that the server
// has already handled the removal of this peer.
// peerInitializer asynchronously starts a newly connected peer after it has
// been added to the server's peer map. This method sets up a
// peerTerminationWatcher for the given peer, and ensures that it executes even
// if the peer failed to start. In the event of a successful connection, this
// method reads the negotiated, local feature-bits and spawns the appropriate
// graph synchronization method. Any registered clients of NotifyWhenOnline will
// be signaled of the new peer once the method returns.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerInitializer(p *peer) {
defer s.wg.Done()
// Avoid initializing peers while the server is exiting.
if s.Stopped() {
return
}
// Create a channel that will be used to signal a successful start of
// the link. This prevents the peer termination watcher from beginning
// its duty too early.
ready := make(chan struct{})
// Before starting the peer, launch a goroutine to watch for the
// unexpected termination of this peer, which will ensure all resources
// are properly cleaned up, and re-establish persistent connections when
// necessary. The peer termination watcher will be short circuited if
// the peer is ever added to the ignorePeerTermination map, indicating
// that the server has already handled the removal of this peer.
s.wg.Add(1)
go s.peerTerminationWatcher(p)
go s.peerTerminationWatcher(p, ready)
// Start teh peer! If an error occurs, we Disconnect the peer, which
// will unblock the peerTerminationWatcher.
if err := p.Start(); err != nil {
p.Disconnect(errors.New("unable to start peer: %v"))
return
}
// Otherwise, signal to the peerTerminationWatcher that the peer startup
// was successful, and to begin watching the peer's wait group.
close(ready)
switch {
// If the remote peer knows of the new gossip queries feature, then
@ -2490,6 +2377,11 @@ func (s *server) addPeer(p *peer) {
go s.authGossiper.SynchronizeNode(p)
}
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
s.mu.Lock()
defer s.mu.Unlock()
// Check if there are listeners waiting for this peer to come online.
for _, peerChan := range s.peerConnectedListeners[pubStr] {
select {
@ -2501,6 +2393,155 @@ func (s *server) addPeer(p *peer) {
delete(s.peerConnectedListeners, pubStr)
}
// peerTerminationWatcher waits until a peer has been disconnected unexpectedly,
// and then cleans up all resources allocated to the peer, notifies relevant
// sub-systems of its demise, and finally handles re-connecting to the peer if
// it's persistent. If the server intentionally disconnects a peer, it should
// have a corresponding entry in the ignorePeerTermination map which will cause
// the cleanup routine to exit early. The passed `ready` chan is used to
// synchronize when WaitForDisconnect should begin watching on the peer's
// waitgroup. The ready chan should only be signaled if the peer starts
// successfully, otherwise the peer should be disconnected instead.
//
// NOTE: This MUST be launched as a goroutine.
func (s *server) peerTerminationWatcher(p *peer, ready chan struct{}) {
defer s.wg.Done()
p.WaitForDisconnect(ready)
srvrLog.Debugf("Peer %v has been disconnected", p)
// If the server is exiting then we can bail out early ourselves as all
// the other sub-systems will already be shutting down.
if s.Stopped() {
return
}
// Next, we'll cancel all pending funding reservations with this node.
// If we tried to initiate any funding flows that haven't yet finished,
// then we need to unlock those committed outputs so they're still
// available for use.
s.fundingMgr.CancelPeerReservations(p.PubKey())
pubKey := p.addr.IdentityKey
// We'll also inform the gossiper that this peer is no longer active,
// so we don't need to maintain sync state for it any longer.
s.authGossiper.PruneSyncState(pubKey)
// Tell the switch to remove all links associated with this peer.
// Passing nil as the target link indicates that all links associated
// with this interface should be closed.
//
// TODO(roasbeef): instead add a PurgeInterfaceLinks function?
links, err := p.server.htlcSwitch.GetLinksByInterface(p.pubKeyBytes)
if err != nil {
srvrLog.Errorf("unable to get channel links: %v", err)
}
for _, link := range links {
p.server.htlcSwitch.RemoveLink(link.ChanID())
}
s.mu.Lock()
defer s.mu.Unlock()
// If the server has already removed this peer, we can short circuit the
// peer termination watcher and skip cleanup.
if _, ok := s.ignorePeerTermination[p]; ok {
delete(s.ignorePeerTermination, p)
pubKey := p.PubKey()
pubStr := string(pubKey[:])
// If a connection callback is present, we'll go ahead and
// execute it now that previous peer has fully disconnected. If
// the callback is not present, this likely implies the peer was
// purposefully disconnected via RPC, and that no reconnect
// should be attempted.
connCallback, ok := s.scheduledPeerConnection[pubStr]
if ok {
delete(s.scheduledPeerConnection, pubStr)
connCallback()
}
return
}
// First, cleanup any remaining state the server has regarding the peer
// in question.
s.removePeer(p)
// Next, check to see if this is a persistent peer or not.
pubStr := string(pubKey.SerializeCompressed())
_, ok := s.persistentPeers[pubStr]
if ok {
// We'll only need to re-launch a connection request if one
// isn't already currently pending.
if _, ok := s.persistentConnReqs[pubStr]; ok {
return
}
// We'll ensure that we locate an advertised address to use
// within the peer's address for reconnection purposes.
//
// TODO(roasbeef): use them all?
if p.inbound {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(
pubKey,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve advertised "+
"address for node %x: %v",
pubKey.SerializeCompressed(), err)
} else {
p.addr.Address = advertisedAddr
}
}
// Otherwise, we'll launch a new connection request in order to
// attempt to maintain a persistent connection with this peer.
connReq := &connmgr.ConnReq{
Addr: p.addr,
Permanent: true,
}
s.persistentConnReqs[pubStr] = append(
s.persistentConnReqs[pubStr], connReq)
// Record the computed backoff in the backoff map.
backoff := s.nextPeerBackoff(pubStr, p.StartTime())
s.persistentPeersBackoff[pubStr] = backoff
// Initialize a retry canceller for this peer if one does not
// exist.
cancelChan, ok := s.persistentRetryCancels[pubStr]
if !ok {
cancelChan = make(chan struct{})
s.persistentRetryCancels[pubStr] = cancelChan
}
// We choose not to wait group this go routine since the Connect
// call can stall for arbitrarily long if we shutdown while an
// outbound connection attempt is being made.
go func() {
srvrLog.Debugf("Scheduling connection re-establishment to "+
"persistent peer %v in %s", p, backoff)
select {
case <-time.After(backoff):
case <-cancelChan:
return
case <-s.quit:
return
}
srvrLog.Debugf("Attempting to re-establish persistent "+
"connection to peer %v", p)
s.connMgr.Connect(connReq)
}()
}
}
// removePeer removes the passed peer from the server's state of all active
// peers.
func (s *server) removePeer(p *peer) {