discovery: update AuthenticatedGossiper to be aware of new gossipSyncers

In this commit, we update the logic in the AuthenticatedGossiper to
ensure that can properly create, manage, and dispatch messages to any
gossipSyncer instances created by the server.

With this set of changes, the gossip now has complete knowledge of the
current set of peers we're conneted to that support the new range
queries. Upon initial connect, InitSyncState will be called by the
server if the new peer understands the set of gossip queries. This will
then create a new spot in the peerSyncers map for the new syncer. For
each new gossip query message, we'll then attempt to dispatch the
message directly to the gossip syncer. When the peer has disconnected,
we then expect the server to call the PruneSyncState method which will
allow us to free up the resources.

Finally, when we go to broadcast messages, we'll send the messages
directly to the peers that have gossipSyncer instances active, so they
can properly be filtered out. For those that don't we'll broadcast
directly, ensuring we skip *all* peers that have an active gossip
syncer.
This commit is contained in:
Olaoluwa Osuntokun 2018-04-16 19:00:00 -07:00
parent 5789ef7c10
commit 45bd544f11
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -72,6 +72,12 @@ type Config struct {
// order to be included in the LN graph. // order to be included in the LN graph.
Router routing.ChannelGraphSource Router routing.ChannelGraphSource
// ChanSeries is an interfaces that provides access to a time series
// view of the current known channel graph. Each gossipSyncer enabled
// peer will utilize this in order to create and respond to channel
// graph time series queries.
ChanSeries ChannelGraphTimeSeries
// Notifier is used for receiving notifications of incoming blocks. // Notifier is used for receiving notifications of incoming blocks.
// With each new incoming block found we process previously premature // With each new incoming block found we process previously premature
// announcements. // announcements.
@ -196,6 +202,14 @@ type AuthenticatedGossiper struct {
rejectMtx sync.RWMutex rejectMtx sync.RWMutex
recentRejects map[uint64]struct{} recentRejects map[uint64]struct{}
// peerSyncers keeps track of all the gossip syncers we're maintain for
// peers that understand this mode of operation. When we go to send out
// new updates, for all peers in the map, we'll send the messages
// directly to their gossiper, rather than broadcasting them. With this
// change, we ensure we filter out all updates properly.
syncerMtx sync.RWMutex
peerSyncers map[routing.Vertex]*gossipSyncer
sync.Mutex sync.Mutex
} }
@ -218,6 +232,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
waitingProofs: storage, waitingProofs: storage,
channelMtx: multimutex.NewMutex(), channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}), recentRejects: make(map[uint64]struct{}),
peerSyncers: make(map[routing.Vertex]*gossipSyncer),
}, nil }, nil
} }
@ -400,6 +415,12 @@ func (d *AuthenticatedGossiper) Stop() {
log.Info("Authenticated Gossiper is stopping") log.Info("Authenticated Gossiper is stopping")
d.syncerMtx.RLock()
for _, syncer := range d.peerSyncers {
syncer.Stop()
}
d.syncerMtx.RUnlock()
close(d.quit) close(d.quit)
d.wg.Wait() d.wg.Wait()
} }
@ -480,6 +501,16 @@ type msgWithSenders struct {
senders map[routing.Vertex]struct{} senders map[routing.Vertex]struct{}
} }
// mergeSyncerMap is used to merge the set of senders of a particular message
// with peers that we have an active gossipSyncer with. We do this to ensure
// that we don't broadcast messages to any peers that we have active gossip
// syncers for.
func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]struct{}) {
for peerPub := range syncers {
m.senders[peerPub] = struct{}{}
}
}
// deDupedAnnouncements de-duplicates announcements that have been added to the // deDupedAnnouncements de-duplicates announcements that have been added to the
// batch. Internally, announcements are stored in three maps // batch. Internally, announcements are stored in three maps
// (one each for channel announcements, channel updates, and node // (one each for channel announcements, channel updates, and node
@ -693,12 +724,11 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders {
return msgs return msgs
} }
// resendAnnounceSignatures will inspect the messageStore database // resendAnnounceSignatures will inspect the messageStore database bucket for
// bucket for AnnounceSignatures messages that we recently tried // AnnounceSignatures messages that we recently tried to send to a peer. If the
// to send to a peer. If the associated channels still not have the // associated channels still not have the full channel proofs assembled, we
// full channel proofs assembled, we will try to resend them. If // will try to resend them. If we have the full proof, we can safely delete the
// we have the full proof, we can safely delete the message from // message from the messageStore.
// the messageStore.
func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
type msgTuple struct { type msgTuple struct {
peer *btcec.PublicKey peer *btcec.PublicKey
@ -706,8 +736,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
dbKey []byte dbKey []byte
} }
// Fetch all the AnnounceSignatures messages that was added // Fetch all the AnnounceSignatures messages that was added to the
// to the database. // database.
//
// TODO(halseth): database access should be abstracted // TODO(halseth): database access should be abstracted
// behind interface. // behind interface.
var msgsResend []msgTuple var msgsResend []msgTuple
@ -717,7 +748,6 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
return nil return nil
} }
// Iterate over each message added to the database.
if err := bucket.ForEach(func(k, v []byte) error { if err := bucket.ForEach(func(k, v []byte) error {
// The database value represents the encoded // The database value represents the encoded
// AnnounceSignatures message. // AnnounceSignatures message.
@ -727,17 +757,16 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
return err return err
} }
// The first 33 bytes of the database key is // The first 33 bytes of the database key is the peer's
// the peer's public key. // public key.
peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) peer, err := btcec.ParsePubKey(k[:33], btcec.S256())
if err != nil { if err != nil {
return err return err
} }
t := msgTuple{peer, msg, k} t := msgTuple{peer, msg, k}
// Add the message to the slice, such that we // Add the message to the slice, such that we can
// can resend it after the database transaction // resend it after the database transaction is over.
// is over.
msgsResend = append(msgsResend, t) msgsResend = append(msgsResend, t)
return nil return nil
}); err != nil { }); err != nil {
@ -748,8 +777,8 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
return err return err
} }
// deleteMsg removes the message associated with the passed // deleteMsg removes the message associated with the passed msgTuple
// msgTuple from the messageStore. // from the messageStore.
deleteMsg := func(t msgTuple) error { deleteMsg := func(t msgTuple) error {
log.Debugf("Deleting message for chanID=%v from "+ log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID) "messageStore", t.msg.ChannelID)
@ -768,16 +797,16 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
return nil return nil
} }
// We now iterate over these messages, resending those that we // We now iterate over these messages, resending those that we don't
// don't have the full proof for, deleting the rest. // have the full proof for, deleting the rest.
for _, t := range msgsResend { for _, t := range msgsResend {
// Check if the full channel proof exists in our graph. // Check if the full channel proof exists in our graph.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID( chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
t.msg.ShortChannelID) t.msg.ShortChannelID)
if err != nil { if err != nil {
// If the channel cannot be found, it is most likely // If the channel cannot be found, it is most likely a
// a leftover message for a channel that was closed. // leftover message for a channel that was closed. In
// In this case we delete it from the message store. // this case we delete it from the message store.
log.Warnf("unable to fetch channel info for "+ log.Warnf("unable to fetch channel info for "+
"chanID=%v from graph: %v. Will delete local"+ "chanID=%v from graph: %v. Will delete local"+
"proof from database", "proof from database",
@ -788,13 +817,12 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
continue continue
} }
// 1. If the full proof does not exist in the graph, // 1. If the full proof does not exist in the graph, it means
// it means that we haven't received the remote proof // that we haven't received the remote proof yet (or that we
// yet (or that we crashed before able to assemble the // crashed before able to assemble the full proof). Since the
// full proof). Since the remote node might think they // remote node might think they have delivered their proof to
// have delivered their proof to us, we will resend // us, we will resend _our_ proof to trigger a resend on their
// _our_ proof to trigger a resend on their part: // part: they will then be able to assemble and send us the
// they will then be able to assemble and send us the
// full proof. // full proof.
if chanInfo.AuthProof == nil { if chanInfo.AuthProof == nil {
err := d.sendAnnSigReliably(t.msg, t.peer) err := d.sendAnnSigReliably(t.msg, t.peer)
@ -805,13 +833,12 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
} }
// 2. If the proof does exist in the graph, we have // 2. If the proof does exist in the graph, we have
// successfully received the remote proof and assembled // successfully received the remote proof and assembled the
// the full proof. In this case we can safely delete the // full proof. In this case we can safely delete the local
// local proof from the database. In case the remote // proof from the database. In case the remote hasn't been able
// hasn't been able to assemble the full proof yet // to assemble the full proof yet (maybe because of a crash),
// (maybe because of a crash), we will send them the full // we will send them the full proof if we notice that they
// proof if we notice that they retry sending their half // retry sending their half proof.
// proof.
if chanInfo.AuthProof != nil { if chanInfo.AuthProof != nil {
log.Debugf("Deleting message for chanID=%v from "+ log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID) "messageStore", t.msg.ChannelID)
@ -823,6 +850,22 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
return nil return nil
} }
// findGossipSyncer is a utility method used by the gossiper to locate the
// gossip syncer for an inbound message so we can properly dispatch the
// incoming message.
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) {
target := routing.NewVertex(pub)
d.syncerMtx.RLock()
syncer, ok := d.peerSyncers[target]
d.syncerMtx.RUnlock()
if !ok {
return nil, fmt.Errorf("received chan time series message for "+
"unknown peer: %x", target[:])
}
return syncer, nil
}
// networkHandler is the primary goroutine that drives this service. The roles // networkHandler is the primary goroutine that drives this service. The roles
// of this goroutine includes answering queries related to the state of the // of this goroutine includes answering queries related to the state of the
// network, syncing up newly connected peers, and also periodically // network, syncing up newly connected peers, and also periodically
@ -880,9 +923,10 @@ func (d *AuthenticatedGossiper) networkHandler() {
policyUpdate.errResp <- nil policyUpdate.errResp <- nil
case announcement := <-d.networkMsgs: case announcement := <-d.networkMsgs:
// Channel announcement signatures are the only message switch msg := announcement.msg.(type) {
// that we'll process serially. // Channel announcement signatures are amongst the only
if _, ok := announcement.msg.(*lnwire.AnnounceSignatures); ok { // messages that we'll process serially.
case *lnwire.AnnounceSignatures:
emittedAnnouncements := d.processNetworkAnnouncement( emittedAnnouncements := d.processNetworkAnnouncement(
announcement, announcement,
) )
@ -892,6 +936,43 @@ func (d *AuthenticatedGossiper) networkHandler() {
) )
} }
continue continue
// If a peer is updating its current update horizon,
// then we'll dispatch that directly to the proper
// gossipSyncer.
case *lnwire.GossipTimestampRange:
syncer, err := d.findGossipSyncer(announcement.peer)
if err != nil {
log.Error(err)
continue
}
// If we've found the message target, then
// we'll dispatch the message directly to it.
err = syncer.ApplyGossipFilter(msg)
if err != nil {
log.Warnf("unable to apply gossip "+
"filter for peer=%x: %v",
announcement.peer.SerializeCompressed(), err)
}
continue
// For messages in the known set of channel series
// queries, we'll dispatch the message directly to the
// peer, and skip the main processing loop.
case *lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer, err := d.findGossipSyncer(announcement.peer)
if err != nil {
log.Error(err)
continue
}
syncer.ProcessQueryMsg(announcement.msg)
continue
} }
// If this message was recently rejected, then we won't // If this message was recently rejected, then we won't
@ -1003,21 +1084,32 @@ func (d *AuthenticatedGossiper) networkHandler() {
continue continue
} }
// For the set of peers that have an active gossip
// syncers, we'll collect their pubkeys so we can avoid
// sending them the full message blast below.
d.syncerMtx.RLock()
syncerPeers := map[routing.Vertex]struct{}{}
for peerPub := range d.peerSyncers {
syncerPeers[peerPub] = struct{}{}
}
d.syncerMtx.RUnlock()
log.Infof("Broadcasting batch of %v new announcements", log.Infof("Broadcasting batch of %v new announcements",
len(announcementBatch)) len(announcementBatch))
// If we have new things to announce then broadcast // We'll first attempt to filter out this new message
// them to all our immediately connected peers. // for all peers that have active gossip syncers
for _, msgChunk := range announcementBatch { // active.
// We'll first attempt to filter out this new d.syncerMtx.RLock()
// message for all peers that have active for _, syncer := range d.peerSyncers {
// gossip syncers active. syncer.FilterGossipMsgs(announcementBatch...)
d.syncerMtx.RLock() }
for _, syncer := range d.peerSyncers { d.syncerMtx.RUnlock()
syncer.FilterGossipMsgs(msgChunk)
}
d.syncerMtx.RUnlock()
// Next, If we have new things to announce then
// broadcast them to all our immediately connected
// peers.
for _, msgChunk := range announcementBatch {
// With the syncers taken care of, we'll merge // With the syncers taken care of, we'll merge
// the sender map with the set of syncers, so // the sender map with the set of syncers, so
// we don't send out duplicate messages. // we don't send out duplicate messages.
@ -1052,6 +1144,67 @@ func (d *AuthenticatedGossiper) networkHandler() {
} }
} }
// TODO(roasbeef): d/c peers that send uupdates not on our chain
// InitPeerSyncState is called by outside sub-systems when a connection is
// established to a new peer that understands how to perform channel range
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
// needed to handle new queries. The recvUpdates bool indicates if we should
// continue to receive real-time updates from the remote peer once we've synced
// channel state.
func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates bool) {
d.syncerMtx.Lock()
defer d.syncerMtx.Unlock()
// If we already have a syncer, then we'll exit early as we don't want
// to override it.
nodeID := routing.NewVertex(peer)
if _, ok := d.peerSyncers[nodeID]; ok {
return
}
log.Infof("Creating new gossipSyncer for peer=%x",
peer.SerializeCompressed())
syncer := newGossiperSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,
syncChanUpdates: recvUpdates,
channelSeries: d.cfg.ChanSeries,
encodingType: lnwire.EncodingSortedPlain,
sendToPeer: func(msgs ...lnwire.Message) error {
return d.cfg.SendToPeer(peer, msgs...)
},
})
copy(syncer.peerPub[:], peer.SerializeCompressed())
d.peerSyncers[nodeID] = syncer
syncer.Start()
}
// PruneSyncState is called by outside sub-systems once a peer that we were
// previously connected to has been disconnected. In this case we can stop the
// existing gossipSyncer assigned to the peer and free up resources.
func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) {
d.syncerMtx.Lock()
defer d.syncerMtx.Unlock()
log.Infof("Removing gossipSyncer for peer=%x",
peer.SerializeCompressed())
vertex := routing.NewVertex(peer)
syncer, ok := d.peerSyncers[routing.NewVertex(peer)]
if !ok {
return
}
syncer.Stop()
delete(d.peerSyncers, vertex)
return
}
// isRecentlyRejectedMsg returns true if we recently rejected a message, and // isRecentlyRejectedMsg returns true if we recently rejected a message, and
// false otherwise, This avoids expensive reprocessing of the message. // false otherwise, This avoids expensive reprocessing of the message.
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool {
@ -2093,6 +2246,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// that the caller knows that the message will be delivered at one point. // that the caller knows that the message will be delivered at one point.
func (d *AuthenticatedGossiper) sendAnnSigReliably( func (d *AuthenticatedGossiper) sendAnnSigReliably(
msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error {
// We first add this message to the database, such that in case // We first add this message to the database, such that in case
// we do not succeed in sending it to the peer, we'll fetch it // we do not succeed in sending it to the peer, we'll fetch it
// from the DB next time we start, and retry. We use the peer ID // from the DB next time we start, and retry. We use the peer ID