From 45bd544f1178fdbac9df003fa4f83b797b3254ef Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 16 Apr 2018 19:00:00 -0700 Subject: [PATCH] discovery: update AuthenticatedGossiper to be aware of new gossipSyncers In this commit, we update the logic in the AuthenticatedGossiper to ensure that can properly create, manage, and dispatch messages to any gossipSyncer instances created by the server. With this set of changes, the gossip now has complete knowledge of the current set of peers we're conneted to that support the new range queries. Upon initial connect, InitSyncState will be called by the server if the new peer understands the set of gossip queries. This will then create a new spot in the peerSyncers map for the new syncer. For each new gossip query message, we'll then attempt to dispatch the message directly to the gossip syncer. When the peer has disconnected, we then expect the server to call the PruneSyncState method which will allow us to free up the resources. Finally, when we go to broadcast messages, we'll send the messages directly to the peers that have gossipSyncer instances active, so they can properly be filtered out. For those that don't we'll broadcast directly, ensuring we skip *all* peers that have an active gossip syncer. --- discovery/gossiper.go | 252 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 203 insertions(+), 49 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ce51643f..f03cb3e4 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -72,6 +72,12 @@ type Config struct { // order to be included in the LN graph. Router routing.ChannelGraphSource + // ChanSeries is an interfaces that provides access to a time series + // view of the current known channel graph. Each gossipSyncer enabled + // peer will utilize this in order to create and respond to channel + // graph time series queries. + ChanSeries ChannelGraphTimeSeries + // Notifier is used for receiving notifications of incoming blocks. // With each new incoming block found we process previously premature // announcements. @@ -196,6 +202,14 @@ type AuthenticatedGossiper struct { rejectMtx sync.RWMutex recentRejects map[uint64]struct{} + // peerSyncers keeps track of all the gossip syncers we're maintain for + // peers that understand this mode of operation. When we go to send out + // new updates, for all peers in the map, we'll send the messages + // directly to their gossiper, rather than broadcasting them. With this + // change, we ensure we filter out all updates properly. + syncerMtx sync.RWMutex + peerSyncers map[routing.Vertex]*gossipSyncer + sync.Mutex } @@ -218,6 +232,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { waitingProofs: storage, channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), + peerSyncers: make(map[routing.Vertex]*gossipSyncer), }, nil } @@ -400,6 +415,12 @@ func (d *AuthenticatedGossiper) Stop() { log.Info("Authenticated Gossiper is stopping") + d.syncerMtx.RLock() + for _, syncer := range d.peerSyncers { + syncer.Stop() + } + d.syncerMtx.RUnlock() + close(d.quit) d.wg.Wait() } @@ -480,6 +501,16 @@ type msgWithSenders struct { senders map[routing.Vertex]struct{} } +// mergeSyncerMap is used to merge the set of senders of a particular message +// with peers that we have an active gossipSyncer with. We do this to ensure +// that we don't broadcast messages to any peers that we have active gossip +// syncers for. +func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]struct{}) { + for peerPub := range syncers { + m.senders[peerPub] = struct{}{} + } +} + // deDupedAnnouncements de-duplicates announcements that have been added to the // batch. Internally, announcements are stored in three maps // (one each for channel announcements, channel updates, and node @@ -693,12 +724,11 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { return msgs } -// resendAnnounceSignatures will inspect the messageStore database -// bucket for AnnounceSignatures messages that we recently tried -// to send to a peer. If the associated channels still not have the -// full channel proofs assembled, we will try to resend them. If -// we have the full proof, we can safely delete the message from -// the messageStore. +// resendAnnounceSignatures will inspect the messageStore database bucket for +// AnnounceSignatures messages that we recently tried to send to a peer. If the +// associated channels still not have the full channel proofs assembled, we +// will try to resend them. If we have the full proof, we can safely delete the +// message from the messageStore. func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { type msgTuple struct { peer *btcec.PublicKey @@ -706,8 +736,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { dbKey []byte } - // Fetch all the AnnounceSignatures messages that was added - // to the database. + // Fetch all the AnnounceSignatures messages that was added to the + // database. + // // TODO(halseth): database access should be abstracted // behind interface. var msgsResend []msgTuple @@ -717,7 +748,6 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return nil } - // Iterate over each message added to the database. if err := bucket.ForEach(func(k, v []byte) error { // The database value represents the encoded // AnnounceSignatures message. @@ -727,17 +757,16 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return err } - // The first 33 bytes of the database key is - // the peer's public key. + // The first 33 bytes of the database key is the peer's + // public key. peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) if err != nil { return err } t := msgTuple{peer, msg, k} - // Add the message to the slice, such that we - // can resend it after the database transaction - // is over. + // Add the message to the slice, such that we can + // resend it after the database transaction is over. msgsResend = append(msgsResend, t) return nil }); err != nil { @@ -748,8 +777,8 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return err } - // deleteMsg removes the message associated with the passed - // msgTuple from the messageStore. + // deleteMsg removes the message associated with the passed msgTuple + // from the messageStore. deleteMsg := func(t msgTuple) error { log.Debugf("Deleting message for chanID=%v from "+ "messageStore", t.msg.ChannelID) @@ -768,16 +797,16 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return nil } - // We now iterate over these messages, resending those that we - // don't have the full proof for, deleting the rest. + // We now iterate over these messages, resending those that we don't + // have the full proof for, deleting the rest. for _, t := range msgsResend { // Check if the full channel proof exists in our graph. chanInfo, _, _, err := d.cfg.Router.GetChannelByID( t.msg.ShortChannelID) if err != nil { - // If the channel cannot be found, it is most likely - // a leftover message for a channel that was closed. - // In this case we delete it from the message store. + // If the channel cannot be found, it is most likely a + // leftover message for a channel that was closed. In + // this case we delete it from the message store. log.Warnf("unable to fetch channel info for "+ "chanID=%v from graph: %v. Will delete local"+ "proof from database", @@ -788,13 +817,12 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { continue } - // 1. If the full proof does not exist in the graph, - // it means that we haven't received the remote proof - // yet (or that we crashed before able to assemble the - // full proof). Since the remote node might think they - // have delivered their proof to us, we will resend - // _our_ proof to trigger a resend on their part: - // they will then be able to assemble and send us the + // 1. If the full proof does not exist in the graph, it means + // that we haven't received the remote proof yet (or that we + // crashed before able to assemble the full proof). Since the + // remote node might think they have delivered their proof to + // us, we will resend _our_ proof to trigger a resend on their + // part: they will then be able to assemble and send us the // full proof. if chanInfo.AuthProof == nil { err := d.sendAnnSigReliably(t.msg, t.peer) @@ -805,13 +833,12 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { } // 2. If the proof does exist in the graph, we have - // successfully received the remote proof and assembled - // the full proof. In this case we can safely delete the - // local proof from the database. In case the remote - // hasn't been able to assemble the full proof yet - // (maybe because of a crash), we will send them the full - // proof if we notice that they retry sending their half - // proof. + // successfully received the remote proof and assembled the + // full proof. In this case we can safely delete the local + // proof from the database. In case the remote hasn't been able + // to assemble the full proof yet (maybe because of a crash), + // we will send them the full proof if we notice that they + // retry sending their half proof. if chanInfo.AuthProof != nil { log.Debugf("Deleting message for chanID=%v from "+ "messageStore", t.msg.ChannelID) @@ -823,6 +850,22 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return nil } +// findGossipSyncer is a utility method used by the gossiper to locate the +// gossip syncer for an inbound message so we can properly dispatch the +// incoming message. +func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) { + target := routing.NewVertex(pub) + d.syncerMtx.RLock() + syncer, ok := d.peerSyncers[target] + d.syncerMtx.RUnlock() + if !ok { + return nil, fmt.Errorf("received chan time series message for "+ + "unknown peer: %x", target[:]) + } + + return syncer, nil +} + // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically @@ -880,9 +923,10 @@ func (d *AuthenticatedGossiper) networkHandler() { policyUpdate.errResp <- nil case announcement := <-d.networkMsgs: - // Channel announcement signatures are the only message - // that we'll process serially. - if _, ok := announcement.msg.(*lnwire.AnnounceSignatures); ok { + switch msg := announcement.msg.(type) { + // Channel announcement signatures are amongst the only + // messages that we'll process serially. + case *lnwire.AnnounceSignatures: emittedAnnouncements := d.processNetworkAnnouncement( announcement, ) @@ -892,6 +936,43 @@ func (d *AuthenticatedGossiper) networkHandler() { ) } continue + + // If a peer is updating its current update horizon, + // then we'll dispatch that directly to the proper + // gossipSyncer. + case *lnwire.GossipTimestampRange: + syncer, err := d.findGossipSyncer(announcement.peer) + if err != nil { + log.Error(err) + continue + } + + // If we've found the message target, then + // we'll dispatch the message directly to it. + err = syncer.ApplyGossipFilter(msg) + if err != nil { + log.Warnf("unable to apply gossip "+ + "filter for peer=%x: %v", + announcement.peer.SerializeCompressed(), err) + } + continue + + // For messages in the known set of channel series + // queries, we'll dispatch the message directly to the + // peer, and skip the main processing loop. + case *lnwire.QueryShortChanIDs, + *lnwire.QueryChannelRange, + *lnwire.ReplyChannelRange, + *lnwire.ReplyShortChanIDsEnd: + + syncer, err := d.findGossipSyncer(announcement.peer) + if err != nil { + log.Error(err) + continue + } + + syncer.ProcessQueryMsg(announcement.msg) + continue } // If this message was recently rejected, then we won't @@ -1003,21 +1084,32 @@ func (d *AuthenticatedGossiper) networkHandler() { continue } + // For the set of peers that have an active gossip + // syncers, we'll collect their pubkeys so we can avoid + // sending them the full message blast below. + d.syncerMtx.RLock() + syncerPeers := map[routing.Vertex]struct{}{} + for peerPub := range d.peerSyncers { + syncerPeers[peerPub] = struct{}{} + } + d.syncerMtx.RUnlock() + log.Infof("Broadcasting batch of %v new announcements", len(announcementBatch)) - // If we have new things to announce then broadcast - // them to all our immediately connected peers. - for _, msgChunk := range announcementBatch { - // We'll first attempt to filter out this new - // message for all peers that have active - // gossip syncers active. - d.syncerMtx.RLock() - for _, syncer := range d.peerSyncers { - syncer.FilterGossipMsgs(msgChunk) - } - d.syncerMtx.RUnlock() + // We'll first attempt to filter out this new message + // for all peers that have active gossip syncers + // active. + d.syncerMtx.RLock() + for _, syncer := range d.peerSyncers { + syncer.FilterGossipMsgs(announcementBatch...) + } + d.syncerMtx.RUnlock() + // Next, If we have new things to announce then + // broadcast them to all our immediately connected + // peers. + for _, msgChunk := range announcementBatch { // With the syncers taken care of, we'll merge // the sender map with the set of syncers, so // we don't send out duplicate messages. @@ -1052,6 +1144,67 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// TODO(roasbeef): d/c peers that send uupdates not on our chain + +// InitPeerSyncState is called by outside sub-systems when a connection is +// established to a new peer that understands how to perform channel range +// queries. We'll allocate a new gossip syncer for it, and start any goroutines +// needed to handle new queries. The recvUpdates bool indicates if we should +// continue to receive real-time updates from the remote peer once we've synced +// channel state. +func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates bool) { + d.syncerMtx.Lock() + defer d.syncerMtx.Unlock() + + // If we already have a syncer, then we'll exit early as we don't want + // to override it. + nodeID := routing.NewVertex(peer) + if _, ok := d.peerSyncers[nodeID]; ok { + return + } + + log.Infof("Creating new gossipSyncer for peer=%x", + peer.SerializeCompressed()) + + syncer := newGossiperSyncer(gossipSyncerCfg{ + chainHash: d.cfg.ChainHash, + syncChanUpdates: recvUpdates, + channelSeries: d.cfg.ChanSeries, + encodingType: lnwire.EncodingSortedPlain, + sendToPeer: func(msgs ...lnwire.Message) error { + return d.cfg.SendToPeer(peer, msgs...) + }, + }) + copy(syncer.peerPub[:], peer.SerializeCompressed()) + d.peerSyncers[nodeID] = syncer + + syncer.Start() +} + +// PruneSyncState is called by outside sub-systems once a peer that we were +// previously connected to has been disconnected. In this case we can stop the +// existing gossipSyncer assigned to the peer and free up resources. +func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) { + d.syncerMtx.Lock() + defer d.syncerMtx.Unlock() + + log.Infof("Removing gossipSyncer for peer=%x", + peer.SerializeCompressed()) + + vertex := routing.NewVertex(peer) + + syncer, ok := d.peerSyncers[routing.NewVertex(peer)] + if !ok { + return + } + + syncer.Stop() + + delete(d.peerSyncers, vertex) + + return +} + // isRecentlyRejectedMsg returns true if we recently rejected a message, and // false otherwise, This avoids expensive reprocessing of the message. func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { @@ -2093,6 +2246,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // that the caller knows that the message will be delivered at one point. func (d *AuthenticatedGossiper) sendAnnSigReliably( msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { + // We first add this message to the database, such that in case // we do not succeed in sending it to the peer, we'll fetch it // from the DB next time we start, and retry. We use the peer ID