diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ce51643f..f03cb3e4 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -72,6 +72,12 @@ type Config struct { // order to be included in the LN graph. Router routing.ChannelGraphSource + // ChanSeries is an interfaces that provides access to a time series + // view of the current known channel graph. Each gossipSyncer enabled + // peer will utilize this in order to create and respond to channel + // graph time series queries. + ChanSeries ChannelGraphTimeSeries + // Notifier is used for receiving notifications of incoming blocks. // With each new incoming block found we process previously premature // announcements. @@ -196,6 +202,14 @@ type AuthenticatedGossiper struct { rejectMtx sync.RWMutex recentRejects map[uint64]struct{} + // peerSyncers keeps track of all the gossip syncers we're maintain for + // peers that understand this mode of operation. When we go to send out + // new updates, for all peers in the map, we'll send the messages + // directly to their gossiper, rather than broadcasting them. With this + // change, we ensure we filter out all updates properly. + syncerMtx sync.RWMutex + peerSyncers map[routing.Vertex]*gossipSyncer + sync.Mutex } @@ -218,6 +232,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { waitingProofs: storage, channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), + peerSyncers: make(map[routing.Vertex]*gossipSyncer), }, nil } @@ -400,6 +415,12 @@ func (d *AuthenticatedGossiper) Stop() { log.Info("Authenticated Gossiper is stopping") + d.syncerMtx.RLock() + for _, syncer := range d.peerSyncers { + syncer.Stop() + } + d.syncerMtx.RUnlock() + close(d.quit) d.wg.Wait() } @@ -480,6 +501,16 @@ type msgWithSenders struct { senders map[routing.Vertex]struct{} } +// mergeSyncerMap is used to merge the set of senders of a particular message +// with peers that we have an active gossipSyncer with. We do this to ensure +// that we don't broadcast messages to any peers that we have active gossip +// syncers for. +func (m *msgWithSenders) mergeSyncerMap(syncers map[routing.Vertex]struct{}) { + for peerPub := range syncers { + m.senders[peerPub] = struct{}{} + } +} + // deDupedAnnouncements de-duplicates announcements that have been added to the // batch. Internally, announcements are stored in three maps // (one each for channel announcements, channel updates, and node @@ -693,12 +724,11 @@ func (d *deDupedAnnouncements) Emit() []msgWithSenders { return msgs } -// resendAnnounceSignatures will inspect the messageStore database -// bucket for AnnounceSignatures messages that we recently tried -// to send to a peer. If the associated channels still not have the -// full channel proofs assembled, we will try to resend them. If -// we have the full proof, we can safely delete the message from -// the messageStore. +// resendAnnounceSignatures will inspect the messageStore database bucket for +// AnnounceSignatures messages that we recently tried to send to a peer. If the +// associated channels still not have the full channel proofs assembled, we +// will try to resend them. If we have the full proof, we can safely delete the +// message from the messageStore. func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { type msgTuple struct { peer *btcec.PublicKey @@ -706,8 +736,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { dbKey []byte } - // Fetch all the AnnounceSignatures messages that was added - // to the database. + // Fetch all the AnnounceSignatures messages that was added to the + // database. + // // TODO(halseth): database access should be abstracted // behind interface. var msgsResend []msgTuple @@ -717,7 +748,6 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return nil } - // Iterate over each message added to the database. if err := bucket.ForEach(func(k, v []byte) error { // The database value represents the encoded // AnnounceSignatures message. @@ -727,17 +757,16 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return err } - // The first 33 bytes of the database key is - // the peer's public key. + // The first 33 bytes of the database key is the peer's + // public key. peer, err := btcec.ParsePubKey(k[:33], btcec.S256()) if err != nil { return err } t := msgTuple{peer, msg, k} - // Add the message to the slice, such that we - // can resend it after the database transaction - // is over. + // Add the message to the slice, such that we can + // resend it after the database transaction is over. msgsResend = append(msgsResend, t) return nil }); err != nil { @@ -748,8 +777,8 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return err } - // deleteMsg removes the message associated with the passed - // msgTuple from the messageStore. + // deleteMsg removes the message associated with the passed msgTuple + // from the messageStore. deleteMsg := func(t msgTuple) error { log.Debugf("Deleting message for chanID=%v from "+ "messageStore", t.msg.ChannelID) @@ -768,16 +797,16 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return nil } - // We now iterate over these messages, resending those that we - // don't have the full proof for, deleting the rest. + // We now iterate over these messages, resending those that we don't + // have the full proof for, deleting the rest. for _, t := range msgsResend { // Check if the full channel proof exists in our graph. chanInfo, _, _, err := d.cfg.Router.GetChannelByID( t.msg.ShortChannelID) if err != nil { - // If the channel cannot be found, it is most likely - // a leftover message for a channel that was closed. - // In this case we delete it from the message store. + // If the channel cannot be found, it is most likely a + // leftover message for a channel that was closed. In + // this case we delete it from the message store. log.Warnf("unable to fetch channel info for "+ "chanID=%v from graph: %v. Will delete local"+ "proof from database", @@ -788,13 +817,12 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { continue } - // 1. If the full proof does not exist in the graph, - // it means that we haven't received the remote proof - // yet (or that we crashed before able to assemble the - // full proof). Since the remote node might think they - // have delivered their proof to us, we will resend - // _our_ proof to trigger a resend on their part: - // they will then be able to assemble and send us the + // 1. If the full proof does not exist in the graph, it means + // that we haven't received the remote proof yet (or that we + // crashed before able to assemble the full proof). Since the + // remote node might think they have delivered their proof to + // us, we will resend _our_ proof to trigger a resend on their + // part: they will then be able to assemble and send us the // full proof. if chanInfo.AuthProof == nil { err := d.sendAnnSigReliably(t.msg, t.peer) @@ -805,13 +833,12 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { } // 2. If the proof does exist in the graph, we have - // successfully received the remote proof and assembled - // the full proof. In this case we can safely delete the - // local proof from the database. In case the remote - // hasn't been able to assemble the full proof yet - // (maybe because of a crash), we will send them the full - // proof if we notice that they retry sending their half - // proof. + // successfully received the remote proof and assembled the + // full proof. In this case we can safely delete the local + // proof from the database. In case the remote hasn't been able + // to assemble the full proof yet (maybe because of a crash), + // we will send them the full proof if we notice that they + // retry sending their half proof. if chanInfo.AuthProof != nil { log.Debugf("Deleting message for chanID=%v from "+ "messageStore", t.msg.ChannelID) @@ -823,6 +850,22 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { return nil } +// findGossipSyncer is a utility method used by the gossiper to locate the +// gossip syncer for an inbound message so we can properly dispatch the +// incoming message. +func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) { + target := routing.NewVertex(pub) + d.syncerMtx.RLock() + syncer, ok := d.peerSyncers[target] + d.syncerMtx.RUnlock() + if !ok { + return nil, fmt.Errorf("received chan time series message for "+ + "unknown peer: %x", target[:]) + } + + return syncer, nil +} + // networkHandler is the primary goroutine that drives this service. The roles // of this goroutine includes answering queries related to the state of the // network, syncing up newly connected peers, and also periodically @@ -880,9 +923,10 @@ func (d *AuthenticatedGossiper) networkHandler() { policyUpdate.errResp <- nil case announcement := <-d.networkMsgs: - // Channel announcement signatures are the only message - // that we'll process serially. - if _, ok := announcement.msg.(*lnwire.AnnounceSignatures); ok { + switch msg := announcement.msg.(type) { + // Channel announcement signatures are amongst the only + // messages that we'll process serially. + case *lnwire.AnnounceSignatures: emittedAnnouncements := d.processNetworkAnnouncement( announcement, ) @@ -892,6 +936,43 @@ func (d *AuthenticatedGossiper) networkHandler() { ) } continue + + // If a peer is updating its current update horizon, + // then we'll dispatch that directly to the proper + // gossipSyncer. + case *lnwire.GossipTimestampRange: + syncer, err := d.findGossipSyncer(announcement.peer) + if err != nil { + log.Error(err) + continue + } + + // If we've found the message target, then + // we'll dispatch the message directly to it. + err = syncer.ApplyGossipFilter(msg) + if err != nil { + log.Warnf("unable to apply gossip "+ + "filter for peer=%x: %v", + announcement.peer.SerializeCompressed(), err) + } + continue + + // For messages in the known set of channel series + // queries, we'll dispatch the message directly to the + // peer, and skip the main processing loop. + case *lnwire.QueryShortChanIDs, + *lnwire.QueryChannelRange, + *lnwire.ReplyChannelRange, + *lnwire.ReplyShortChanIDsEnd: + + syncer, err := d.findGossipSyncer(announcement.peer) + if err != nil { + log.Error(err) + continue + } + + syncer.ProcessQueryMsg(announcement.msg) + continue } // If this message was recently rejected, then we won't @@ -1003,21 +1084,32 @@ func (d *AuthenticatedGossiper) networkHandler() { continue } + // For the set of peers that have an active gossip + // syncers, we'll collect their pubkeys so we can avoid + // sending them the full message blast below. + d.syncerMtx.RLock() + syncerPeers := map[routing.Vertex]struct{}{} + for peerPub := range d.peerSyncers { + syncerPeers[peerPub] = struct{}{} + } + d.syncerMtx.RUnlock() + log.Infof("Broadcasting batch of %v new announcements", len(announcementBatch)) - // If we have new things to announce then broadcast - // them to all our immediately connected peers. - for _, msgChunk := range announcementBatch { - // We'll first attempt to filter out this new - // message for all peers that have active - // gossip syncers active. - d.syncerMtx.RLock() - for _, syncer := range d.peerSyncers { - syncer.FilterGossipMsgs(msgChunk) - } - d.syncerMtx.RUnlock() + // We'll first attempt to filter out this new message + // for all peers that have active gossip syncers + // active. + d.syncerMtx.RLock() + for _, syncer := range d.peerSyncers { + syncer.FilterGossipMsgs(announcementBatch...) + } + d.syncerMtx.RUnlock() + // Next, If we have new things to announce then + // broadcast them to all our immediately connected + // peers. + for _, msgChunk := range announcementBatch { // With the syncers taken care of, we'll merge // the sender map with the set of syncers, so // we don't send out duplicate messages. @@ -1052,6 +1144,67 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// TODO(roasbeef): d/c peers that send uupdates not on our chain + +// InitPeerSyncState is called by outside sub-systems when a connection is +// established to a new peer that understands how to perform channel range +// queries. We'll allocate a new gossip syncer for it, and start any goroutines +// needed to handle new queries. The recvUpdates bool indicates if we should +// continue to receive real-time updates from the remote peer once we've synced +// channel state. +func (d *AuthenticatedGossiper) InitSyncState(peer *btcec.PublicKey, recvUpdates bool) { + d.syncerMtx.Lock() + defer d.syncerMtx.Unlock() + + // If we already have a syncer, then we'll exit early as we don't want + // to override it. + nodeID := routing.NewVertex(peer) + if _, ok := d.peerSyncers[nodeID]; ok { + return + } + + log.Infof("Creating new gossipSyncer for peer=%x", + peer.SerializeCompressed()) + + syncer := newGossiperSyncer(gossipSyncerCfg{ + chainHash: d.cfg.ChainHash, + syncChanUpdates: recvUpdates, + channelSeries: d.cfg.ChanSeries, + encodingType: lnwire.EncodingSortedPlain, + sendToPeer: func(msgs ...lnwire.Message) error { + return d.cfg.SendToPeer(peer, msgs...) + }, + }) + copy(syncer.peerPub[:], peer.SerializeCompressed()) + d.peerSyncers[nodeID] = syncer + + syncer.Start() +} + +// PruneSyncState is called by outside sub-systems once a peer that we were +// previously connected to has been disconnected. In this case we can stop the +// existing gossipSyncer assigned to the peer and free up resources. +func (d *AuthenticatedGossiper) PruneSyncState(peer *btcec.PublicKey) { + d.syncerMtx.Lock() + defer d.syncerMtx.Unlock() + + log.Infof("Removing gossipSyncer for peer=%x", + peer.SerializeCompressed()) + + vertex := routing.NewVertex(peer) + + syncer, ok := d.peerSyncers[routing.NewVertex(peer)] + if !ok { + return + } + + syncer.Stop() + + delete(d.peerSyncers, vertex) + + return +} + // isRecentlyRejectedMsg returns true if we recently rejected a message, and // false otherwise, This avoids expensive reprocessing of the message. func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { @@ -2093,6 +2246,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // that the caller knows that the message will be delivered at one point. func (d *AuthenticatedGossiper) sendAnnSigReliably( msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error { + // We first add this message to the database, such that in case // we do not succeed in sending it to the peer, we'll fetch it // from the DB next time we start, and retry. We use the peer ID