discovery/gossiper: mostly deprecate SendToPeer

This commit changes the gossiper to direct messages to
peer objects, instead of sending them through the
server every time. The primary motivation is to reduce
contention on the server's mutex and, more importantly,
avoid deadlocks in the Triangle of Death.
This commit is contained in:
Conner Fromknecht 2018-06-07 20:11:27 -07:00
parent a670537f4c
commit bf515a2341
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

View File

@ -14,6 +14,7 @@ import (
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
@ -35,8 +36,9 @@ var (
// networkMsg couples a routing related wire message with the peer that
// originally sent it.
type networkMsg struct {
peer *btcec.PublicKey
msg lnwire.Message
peer lnpeer.Peer
source *btcec.PublicKey
msg lnwire.Message
isRemote bool
@ -97,6 +99,11 @@ type Config struct {
// messages to a particular peer identified by the target public key.
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
// FindPeer returns the actively registered peer for a given remote
// public key. An error is returned if the peer was not found or a
// shutdown has been requested.
FindPeer func(identityKey *btcec.PublicKey) (lnpeer.Peer, error)
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
@ -147,8 +154,8 @@ type AuthenticatedGossiper struct {
// as we know it. To be used atomically.
bestHeight uint32
quit chan struct{}
wg sync.WaitGroup
quit chan struct{}
wg sync.WaitGroup
// cfg is a copy of the configuration struct that the gossiper service
// was initialized with.
@ -244,7 +251,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
// the entire network graph is read from disk, then serialized to the format
// defined within the current wire protocol. This cache of graph data is then
// sent directly to the target node.
func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error {
// TODO(roasbeef): need to also store sig data in db
// * will be nice when we switch to pairing sigs would only need one ^_^
@ -355,12 +362,12 @@ func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
}
log.Infof("Syncing channel graph state with %x, sending %v "+
"vertexes and %v edges", pub.SerializeCompressed(),
"vertexes and %v edges", syncPeer.PubKey(),
numNodes, numEdges)
// With all the announcement messages gathered, send them all in a
// single batch to the target peer.
return d.cfg.SendToPeer(pub, announceMessages...)
return syncPeer.SendMessage(false, announceMessages...)
}
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
@ -452,12 +459,13 @@ func (d *AuthenticatedGossiper) Stop() {
// peers. Remote channel announcements should contain the announcement proof
// and be fully validated.
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) chan error {
peer lnpeer.Peer) chan error {
nMsg := &networkMsg{
msg: msg,
isRemote: true,
peer: src,
peer: peer,
source: peer.IdentityKey(),
err: make(chan error, 1),
}
@ -478,12 +486,12 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
// entire channel announcement and update messages will be re-constructed and
// broadcast to the rest of the network.
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) chan error {
source *btcec.PublicKey) chan error {
nMsg := &networkMsg{
msg: msg,
isRemote: false,
peer: src,
source: source,
err: make(chan error, 1),
}
@ -587,7 +595,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
// Channel announcements are identified by the short channel id field.
case *lnwire.ChannelAnnouncement:
deDupKey := msg.ShortChannelID
sender := routing.NewVertex(message.peer)
sender := routing.NewVertex(message.source)
mws, ok := d.channelAnnouncements[deDupKey]
if !ok {
@ -609,7 +617,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
// Channel updates are identified by the (short channel id, flags)
// tuple.
case *lnwire.ChannelUpdate:
sender := routing.NewVertex(message.peer)
sender := routing.NewVertex(message.source)
deDupKey := channelUpdateID{
msg.ShortChannelID,
msg.Flags,
@ -658,7 +666,7 @@ func (d *deDupedAnnouncements) addMsg(message networkMsg) {
// Node announcements are identified by the Vertex field. Use the
// NodeID to create the corresponding Vertex.
case *lnwire.NodeAnnouncement:
sender := routing.NewVertex(message.peer)
sender := routing.NewVertex(message.source)
deDupKey := routing.Vertex(msg.NodeID)
// We do the same for node announcements as we did for channel
@ -875,7 +883,7 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
// gossip syncer for an inbound message so we can properly dispatch the
// incoming message. If a gossip syncer isn't found, then one will be created
// for the target peer.
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSyncer {
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) {
target := routing.NewVertex(pub)
// First, we'll try to find an existing gossiper for this peer.
@ -885,16 +893,26 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy
// If one exists, then we'll return it directly.
if ok {
return syncer
return syncer, nil
}
// Otherwise, we'll obtain the mutex, then check again if a gossiper
// was added after we dropped the read mutex.
// A known gossip syncer doesn't exist, so we may have to create one
// from scratch. To do so, we'll query for a reference directly to the
// active peer.
syncPeer, err := d.cfg.FindPeer(pub)
if err != nil {
log.Debugf("unable to find gossip peer %v: %v",
pub.SerializeCompressed(), err)
return nil, err
}
// Finally, we'll obtain the exclusive mutex, then check again if a
// gossiper was added after we dropped the read mutex.
d.syncerMtx.Lock()
syncer, ok = d.peerSyncers[target]
if ok {
d.syncerMtx.Unlock()
return syncer
return syncer, nil
}
// At this point, a syncer doesn't yet exist, so we'll create a new one
@ -905,7 +923,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy
channelSeries: d.cfg.ChanSeries,
encodingType: lnwire.EncodingSortedPlain,
sendToPeer: func(msgs ...lnwire.Message) error {
return d.cfg.SendToPeer(pub, msgs...)
return syncPeer.SendMessage(false, msgs...)
},
})
copy(syncer.peerPub[:], pub.SerializeCompressed())
@ -914,7 +932,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) *gossipSy
d.syncerMtx.Unlock()
return syncer
return syncer, nil
}
// networkHandler is the primary goroutine that drives this service. The roles
@ -992,15 +1010,20 @@ func (d *AuthenticatedGossiper) networkHandler() {
// then we'll dispatch that directly to the proper
// gossipSyncer.
case *lnwire.GossipTimestampRange:
syncer := d.findGossipSyncer(announcement.peer)
syncer, err := d.findGossipSyncer(
announcement.source,
)
if err != nil {
continue
}
// If we've found the message target, then
// we'll dispatch the message directly to it.
err := syncer.ApplyGossipFilter(msg)
err = syncer.ApplyGossipFilter(msg)
if err != nil {
log.Warnf("unable to apply gossip "+
"filter for peer=%x: %v",
announcement.peer.SerializeCompressed(), err)
announcement.peer.PubKey(), err)
}
continue
@ -1012,7 +1035,12 @@ func (d *AuthenticatedGossiper) networkHandler() {
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer := d.findGossipSyncer(announcement.peer)
syncer, err := d.findGossipSyncer(
announcement.source,
)
if err != nil {
continue
}
syncer.ProcessQueryMsg(announcement.msg)
continue
@ -1193,19 +1221,19 @@ func (d *AuthenticatedGossiper) networkHandler() {
// needed to handle new queries. The recvUpdates bool indicates if we should
// continue to receive real-time updates from the remote peer once we've synced
// channel state.
func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates bool) {
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) {
d.syncerMtx.Lock()
defer d.syncerMtx.Unlock()
// If we already have a syncer, then we'll exit early as we don't want
// to override it.
nodeID := routing.NewVertex(peer)
nodeID := routing.Vertex(syncPeer.PubKey())
if _, ok := d.peerSyncers[nodeID]; ok {
return
}
log.Infof("Creating new gossipSyncer for peer=%x",
peer.SerializeCompressed())
nodeID)
syncer := newGossiperSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,
@ -1213,10 +1241,10 @@ func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates
channelSeries: d.cfg.ChanSeries,
encodingType: lnwire.EncodingSortedPlain,
sendToPeer: func(msgs ...lnwire.Message) error {
return d.cfg.SendToPeer(peer, msgs...)
return syncPeer.SendMessage(false, msgs...)
},
})
copy(syncer.peerPub[:], peer.SerializeCompressed())
copy(syncer.peerPub[:], nodeID[:])
d.peerSyncers[nodeID] = syncer
syncer.Start()
@ -1429,8 +1457,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
// We set ourselves as the source of this message to indicate
// that we shouldn't skip any peers when sending this message.
chanUpdates = append(chanUpdates, networkMsg{
peer: d.selfKey,
msg: chanUpdate,
source: d.selfKey,
msg: chanUpdate,
})
}
@ -1502,19 +1530,19 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn
// our peers.
announcements := make([]networkMsg, 0, 3)
announcements = append(announcements, networkMsg{
msg: chanAnn,
peer: d.selfKey,
source: d.selfKey,
msg: chanAnn,
})
if e1Ann != nil {
announcements = append(announcements, networkMsg{
msg: e1Ann,
peer: d.selfKey,
source: d.selfKey,
msg: e1Ann,
})
}
if e2Ann != nil {
announcements = append(announcements, networkMsg{
msg: e2Ann,
peer: d.selfKey,
source: d.selfKey,
msg: e2Ann,
})
}
@ -1591,8 +1619,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// Node announcement was successfully proceeded and know it
// might be broadcast to other connected nodes.
announcements = append(announcements, networkMsg{
msg: msg,
peer: nMsg.peer,
peer: nMsg.peer,
source: nMsg.source,
msg: msg,
})
nMsg.err <- nil
@ -1776,7 +1805,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
msg, nMsg.peer)
} else {
err = <-d.ProcessLocalAnnouncement(
msg, nMsg.peer)
msg, nMsg.source)
}
if err != nil {
log.Errorf("Failed reprocessing"+
@ -1801,8 +1830,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// announcement with proof (remote).
if proof != nil {
announcements = append(announcements, networkMsg{
msg: msg,
peer: nMsg.peer,
peer: nMsg.peer,
source: nMsg.source,
msg: msg,
})
}
@ -1987,20 +2017,29 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// so we'll try sending the update directly to the remote peer.
if !nMsg.isRemote && chanInfo.AuthProof == nil {
// Get our peer's public key.
var remotePeer *btcec.PublicKey
var remotePub *btcec.PublicKey
switch {
case msg.Flags&lnwire.ChanUpdateDirection == 0:
remotePeer, _ = chanInfo.NodeKey2()
remotePub, _ = chanInfo.NodeKey2()
case msg.Flags&lnwire.ChanUpdateDirection == 1:
remotePeer, _ = chanInfo.NodeKey1()
remotePub, _ = chanInfo.NodeKey1()
}
// Send ChannelUpdate directly to remotePeer.
// TODO(halseth): make reliable send?
if err = d.cfg.SendToPeer(remotePeer, msg); err != nil {
log.Errorf("unable to send channel update "+
"message to peer %x: %v",
remotePeer.SerializeCompressed(), err)
sPeer, err := d.cfg.FindPeer(remotePub)
if err != nil {
log.Errorf("unable to send channel update -- "+
"could not find peer %x: %v",
remotePub, err)
} else {
// Send ChannelUpdate directly to remotePeer.
// TODO(halseth): make reliable send?
err = sPeer.SendMessage(false, msg)
if err != nil {
log.Errorf("unable to send channel "+
"update message to peer %x: %v",
remotePub.SerializeCompressed(),
err)
}
}
}
@ -2010,8 +2049,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// has an attached authentication proof.
if chanInfo.AuthProof != nil {
announcements = append(announcements, networkMsg{
msg: msg,
peer: nMsg.peer,
peer: nMsg.peer,
source: nMsg.source,
msg: msg,
})
}
@ -2083,10 +2123,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
return nil
}
isFirstNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
chanInfo.NodeKey1Bytes[:])
isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
chanInfo.NodeKey2Bytes[:])
nodeID := nMsg.source.SerializeCompressed()
isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
// Ensure that channel that was retrieved belongs to the peer
// which sent the proof announcement.
@ -2130,7 +2169,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// received our local proof yet. So be kind and send
// them the full proof.
if nMsg.isRemote {
peerID := nMsg.peer.SerializeCompressed()
peerID := nMsg.source.SerializeCompressed()
log.Debugf("Got AnnounceSignatures for " +
"channel with full proof.")
@ -2151,7 +2190,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
log.Errorf("unable to gen ann: %v", err)
return
}
err = d.cfg.SendToPeer(nMsg.peer, chanAnn)
err = nMsg.peer.SendMessage(false, chanAnn)
if err != nil {
log.Errorf("Failed sending "+
"full proof to "+
@ -2271,19 +2310,22 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// Assemble the necessary announcements to add to the next
// broadcasting batch.
announcements = append(announcements, networkMsg{
msg: chanAnn,
peer: nMsg.peer,
peer: nMsg.peer,
source: nMsg.source,
msg: chanAnn,
})
if e1Ann != nil {
announcements = append(announcements, networkMsg{
msg: e1Ann,
peer: nMsg.peer,
peer: nMsg.peer,
source: nMsg.source,
msg: e1Ann,
})
}
if e2Ann != nil {
announcements = append(announcements, networkMsg{
msg: e2Ann,
peer: nMsg.peer,
peer: nMsg.peer,
source: nMsg.source,
msg: e2Ann,
})
}
@ -2493,15 +2535,10 @@ func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) e
// If this syncer is already at the terminal state, then we'll
// chose it to request the fully channel update.
if syncer.SyncState() == chansSynced {
pub, err := btcec.ParsePubKey(nodeID[:], btcec.S256())
if err != nil {
return err
}
log.Debugf("attempting to request chan ann for "+
"chan_id=%v from node=%x", cid, nodeID[:])
return d.cfg.SendToPeer(pub, &lnwire.QueryShortChanIDs{
return syncer.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
ChainHash: d.cfg.ChainHash,
EncodingType: lnwire.EncodingSortedPlain,
ShortChanIDs: []lnwire.ShortChannelID{cid},