From bf515a234138dd4614852b4ce424586ff23f7fb2 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 7 Jun 2018 20:11:27 -0700 Subject: [PATCH] discovery/gossiper: mostly deprecate SendToPeer This commit changes the gossiper to direct messages to peer objects, instead of sending them through the server every time. The primary motivation is to reduce contention on the server's mutex and, more importantly, avoid deadlocks in the Triangle of Death. --- discovery/gossiper.go | 181 +++++++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 72 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 64302de9..56bd7929 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -14,6 +14,7 @@ import ( "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/multimutex" @@ -35,8 +36,9 @@ var ( // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { - peer *btcec.PublicKey - msg lnwire.Message + peer lnpeer.Peer + source *btcec.PublicKey + msg lnwire.Message isRemote bool @@ -97,6 +99,11 @@ type Config struct { // messages to a particular peer identified by the target public key. SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error + // FindPeer returns the actively registered peer for a given remote + // public key. An error is returned if the peer was not found or a + // shutdown has been requested. + FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error) + // NotifyWhenOnline is a function that allows the gossiper to be // notified when a certain peer comes online, allowing it to // retry sending a peer message. @@ -147,8 +154,8 @@ type AuthenticatedGossiper struct { // as we know it. To be used atomically. bestHeight uint32 - quit chan struct{} - wg sync.WaitGroup + quit chan struct{} + wg sync.WaitGroup // cfg is a copy of the configuration struct that the gossiper service // was initialized with. @@ -244,7 +251,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { // the entire network graph is read from disk, then serialized to the format // defined within the current wire protocol. This cache of graph data is then // sent directly to the target node. -func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error { +func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error { // TODO(roasbeef): need to also store sig data in db // * will be nice when we switch to pairing sigs would only need one ^_^ @@ -355,12 +362,12 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error { } log.Infof("Syncing channel graph state with %x, sending %v "+ - "vertexes and %v edges", pub.SerializeCompressed(), + "vertexes and %v edges", syncPeer.PubKey(), numNodes, numEdges) // With all the announcement messages gathered, send them all in a // single batch to the target peer. - return d.cfg.SendToPeer(pub, announceMessages...) + return syncPeer.SendMessage(false, announceMessages...) } // PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the @@ -452,12 +459,13 @@ func (d *AuthenticatedGossiper) Stop() { // peers. Remote channel announcements should contain the announcement proof // and be fully validated. func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, - src *btcec.PublicKey) chan error { + peer lnpeer.Peer) chan error { nMsg := &networkMsg{ msg: msg, isRemote: true, - peer: src, + peer: peer, + source: peer.IdentityKey(), err: make(chan error, 1), } @@ -478,12 +486,12 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // entire channel announcement and update messages will be re-constructed and // broadcast to the rest of the network. func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message, - src *btcec.PublicKey) chan error { + source *btcec.PublicKey) chan error { nMsg := &networkMsg{ msg: msg, isRemote: false, - peer: src, + source: source, err: make(chan error, 1), } @@ -587,7 +595,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { // Channel announcements are identified by the short channel id field. case *lnwire.ChannelAnnouncement: deDupKey := msg.ShortChannelID - sender := routing.NewVertex(message.peer) + sender := routing.NewVertex(message.source) mws, ok := d.channelAnnouncements[deDupKey] if !ok { @@ -609,7 +617,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { // Channel updates are identified by the (short channel id, flags) // tuple. case *lnwire.ChannelUpdate: - sender := routing.NewVertex(message.peer) + sender := routing.NewVertex(message.source) deDupKey := channelUpdateID{ msg.ShortChannelID, msg.Flags, @@ -658,7 +666,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) { // Node announcements are identified by the Vertex field. Use the // NodeID to create the corresponding Vertex. case *lnwire.NodeAnnouncement: - sender := routing.NewVertex(message.peer) + sender := routing.NewVertex(message.source) deDupKey := routing.Vertex(msg.NodeID) // We do the same for node announcements as we did for channel @@ -875,7 +883,7 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { // gossip syncer for an inbound message so we can properly dispatch the // incoming message. If a gossip syncer isn't found, then one will be created // for the target peer. -func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSyncer { +func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) { target := routing.NewVertex(pub) // First, we'll try to find an existing gossiper for this peer. @@ -885,16 +893,26 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy // If one exists, then we'll return it directly. if ok { - return syncer + return syncer, nil } - // Otherwise, we'll obtain the mutex, then check again if a gossiper - // was added after we dropped the read mutex. + // A known gossip syncer doesn't exist, so we may have to create one + // from scratch. To do so, we'll query for a reference directly to the + // active peer. + syncPeer, err := d.cfg.FindPeer(pub) + if err != nil { + log.Debugf("unable to find gossip peer %v: %v", + pub.SerializeCompressed(), err) + return nil, err + } + + // Finally, we'll obtain the exclusive mutex, then check again if a + // gossiper was added after we dropped the read mutex. d.syncerMtx.Lock() syncer, ok = d.peerSyncers[target] if ok { d.syncerMtx.Unlock() - return syncer + return syncer, nil } // At this point, a syncer doesn't yet exist, so we'll create a new one @@ -905,7 +923,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy channelSeries: d.cfg.ChanSeries, encodingType: lnwire.EncodingSortedPlain, sendToPeer: func(msgs ...lnwire.Message) error { - return d.cfg.SendToPeer(pub, msgs...) + return syncPeer.SendMessage(false, msgs...) }, }) copy(syncer.peerPub[:], pub.SerializeCompressed()) @@ -914,7 +932,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy d.syncerMtx.Unlock() - return syncer + return syncer, nil } // networkHandler is the primary goroutine that drives this service. The roles @@ -992,15 +1010,20 @@ func (d *AuthenticatedGossiper) networkHandler() { // then we'll dispatch that directly to the proper // gossipSyncer. case *lnwire.GossipTimestampRange: - syncer := d.findGossipSyncer(announcement.peer) + syncer, err := d.findGossipSyncer( + announcement.source, + ) + if err != nil { + continue + } // If we've found the message target, then // we'll dispatch the message directly to it. - err := syncer.ApplyGossipFilter(msg) + err = syncer.ApplyGossipFilter(msg) if err != nil { log.Warnf("unable to apply gossip "+ "filter for peer=%x: %v", - announcement.peer.SerializeCompressed(), err) + announcement.peer.PubKey(), err) } continue @@ -1012,7 +1035,12 @@ func (d *AuthenticatedGossiper) networkHandler() { *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: - syncer := d.findGossipSyncer(announcement.peer) + syncer, err := d.findGossipSyncer( + announcement.source, + ) + if err != nil { + continue + } syncer.ProcessQueryMsg(announcement.msg) continue @@ -1193,19 +1221,19 @@ func (d *AuthenticatedGossiper) networkHandler() { // needed to handle new queries. The recvUpdates bool indicates if we should // continue to receive real-time updates from the remote peer once we've synced // channel state. -func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates bool) { +func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) { d.syncerMtx.Lock() defer d.syncerMtx.Unlock() // If we already have a syncer, then we'll exit early as we don't want // to override it. - nodeID := routing.NewVertex(peer) + nodeID := routing.Vertex(syncPeer.PubKey()) if _, ok := d.peerSyncers[nodeID]; ok { return } log.Infof("Creating new gossipSyncer for peer=%x", - peer.SerializeCompressed()) + nodeID) syncer := newGossiperSyncer(gossipSyncerCfg{ chainHash: d.cfg.ChainHash, @@ -1213,10 +1241,10 @@ func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates channelSeries: d.cfg.ChanSeries, encodingType: lnwire.EncodingSortedPlain, sendToPeer: func(msgs ...lnwire.Message) error { - return d.cfg.SendToPeer(peer, msgs...) + return syncPeer.SendMessage(false, msgs...) }, }) - copy(syncer.peerPub[:], peer.SerializeCompressed()) + copy(syncer.peerPub[:], nodeID[:]) d.peerSyncers[nodeID] = syncer syncer.Start() @@ -1429,8 +1457,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( // We set ourselves as the source of this message to indicate // that we shouldn't skip any peers when sending this message. chanUpdates = append(chanUpdates, networkMsg{ - peer: d.selfKey, - msg: chanUpdate, + source: d.selfKey, + msg: chanUpdate, }) } @@ -1502,19 +1530,19 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn // our peers. announcements := make([]networkMsg, 0, 3) announcements = append(announcements, networkMsg{ - msg: chanAnn, - peer: d.selfKey, + source: d.selfKey, + msg: chanAnn, }) if e1Ann != nil { announcements = append(announcements, networkMsg{ - msg: e1Ann, - peer: d.selfKey, + source: d.selfKey, + msg: e1Ann, }) } if e2Ann != nil { announcements = append(announcements, networkMsg{ - msg: e2Ann, - peer: d.selfKey, + source: d.selfKey, + msg: e2Ann, }) } @@ -1591,8 +1619,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // Node announcement was successfully proceeded and know it // might be broadcast to other connected nodes. announcements = append(announcements, networkMsg{ - msg: msg, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: msg, }) nMsg.err <- nil @@ -1776,7 +1805,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n msg, nMsg.peer) } else { err = <-d.ProcessLocalAnnouncement( - msg, nMsg.peer) + msg, nMsg.source) } if err != nil { log.Errorf("Failed reprocessing"+ @@ -1801,8 +1830,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // announcement with proof (remote). if proof != nil { announcements = append(announcements, networkMsg{ - msg: msg, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: msg, }) } @@ -1987,20 +2017,29 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // so we'll try sending the update directly to the remote peer. if !nMsg.isRemote && chanInfo.AuthProof == nil { // Get our peer's public key. - var remotePeer *btcec.PublicKey + var remotePub *btcec.PublicKey switch { case msg.Flags&lnwire.ChanUpdateDirection == 0: - remotePeer, _ = chanInfo.NodeKey2() + remotePub, _ = chanInfo.NodeKey2() case msg.Flags&lnwire.ChanUpdateDirection == 1: - remotePeer, _ = chanInfo.NodeKey1() + remotePub, _ = chanInfo.NodeKey1() } - // Send ChannelUpdate directly to remotePeer. - // TODO(halseth): make reliable send? - if err = d.cfg.SendToPeer(remotePeer, msg); err != nil { - log.Errorf("unable to send channel update "+ - "message to peer %x: %v", - remotePeer.SerializeCompressed(), err) + sPeer, err := d.cfg.FindPeer(remotePub) + if err != nil { + log.Errorf("unable to send channel update -- "+ + "could not find peer %x: %v", + remotePub, err) + } else { + // Send ChannelUpdate directly to remotePeer. + // TODO(halseth): make reliable send? + err = sPeer.SendMessage(false, msg) + if err != nil { + log.Errorf("unable to send channel "+ + "update message to peer %x: %v", + remotePub.SerializeCompressed(), + err) + } } } @@ -2010,8 +2049,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // has an attached authentication proof. if chanInfo.AuthProof != nil { announcements = append(announcements, networkMsg{ - msg: msg, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: msg, }) } @@ -2083,10 +2123,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return nil } - isFirstNode := bytes.Equal(nMsg.peer.SerializeCompressed(), - chanInfo.NodeKey1Bytes[:]) - isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(), - chanInfo.NodeKey2Bytes[:]) + nodeID := nMsg.source.SerializeCompressed() + isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:]) + isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:]) // Ensure that channel that was retrieved belongs to the peer // which sent the proof announcement. @@ -2130,7 +2169,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // received our local proof yet. So be kind and send // them the full proof. if nMsg.isRemote { - peerID := nMsg.peer.SerializeCompressed() + peerID := nMsg.source.SerializeCompressed() log.Debugf("Got AnnounceSignatures for " + "channel with full proof.") @@ -2151,7 +2190,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n log.Errorf("unable to gen ann: %v", err) return } - err = d.cfg.SendToPeer(nMsg.peer, chanAnn) + err = nMsg.peer.SendMessage(false, chanAnn) if err != nil { log.Errorf("Failed sending "+ "full proof to "+ @@ -2271,19 +2310,22 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // Assemble the necessary announcements to add to the next // broadcasting batch. announcements = append(announcements, networkMsg{ - msg: chanAnn, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: chanAnn, }) if e1Ann != nil { announcements = append(announcements, networkMsg{ - msg: e1Ann, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: e1Ann, }) } if e2Ann != nil { announcements = append(announcements, networkMsg{ - msg: e2Ann, - peer: nMsg.peer, + peer: nMsg.peer, + source: nMsg.source, + msg: e2Ann, }) } @@ -2493,15 +2535,10 @@ func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) e // If this syncer is already at the terminal state, then we'll // chose it to request the fully channel update. if syncer.SyncState() == chansSynced { - pub, err := btcec.ParsePubKey(nodeID[:], btcec.S256()) - if err != nil { - return err - } - log.Debugf("attempting to request chan ann for "+ "chan_id=%v from node=%x", cid, nodeID[:]) - return d.cfg.SendToPeer(pub, &lnwire.QueryShortChanIDs{ + return syncer.cfg.sendToPeer(&lnwire.QueryShortChanIDs{ ChainHash: d.cfg.ChainHash, EncodingType: lnwire.EncodingSortedPlain, ShortChanIDs: []lnwire.ShortChannelID{cid},