From c56082006fe60df167639d7b12360fd03c627dd0 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 20 Aug 2018 14:28:10 +0200 Subject: [PATCH] discovery/gossiper: formatting --- discovery/gossiper.go | 111 +++++++++++++++++++++++++++--------------- 1 file changed, 73 insertions(+), 38 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 7f776f79..75daea1a 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -265,7 +265,9 @@ func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error { // containing all the messages to be sent to the target peer. var announceMessages []lnwire.Message - makeNodeAnn := func(n *channeldb.LightningNode) (*lnwire.NodeAnnouncement, error) { + makeNodeAnn := func(n *channeldb.LightningNode) ( + *lnwire.NodeAnnouncement, error) { + alias, _ := lnwire.NewNodeAlias(n.Alias) wireSig, err := lnwire.NewSigFromRawSignature(n.AuthSigBytes) @@ -897,7 +899,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error { // gossip syncer for an inbound message so we can properly dispatch the // incoming message. If a gossip syncer isn't found, then one will be created // for the target peer. -func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) { +func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) ( + *gossipSyncer, error) { + target := routing.NewVertex(pub) // First, we'll try to find an existing gossiper for this peer. @@ -992,7 +996,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // First, we'll now create new fully signed updates for // the affected channels and also update the underlying // graph with the new state. - newChanUpdates, err := d.processChanPolicyUpdate(policyUpdate) + newChanUpdates, err := d.processChanPolicyUpdate( + policyUpdate, + ) if err != nil { log.Errorf("Unable to craft policy updates: %v", err) @@ -1065,7 +1071,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // If this message was recently rejected, then we won't // attempt to re-process it. if d.isRecentlyRejectedMsg(announcement.msg) { - announcement.err <- fmt.Errorf("recently rejected") + announcement.err <- fmt.Errorf("recently " + + "rejected") continue } @@ -1096,23 +1103,27 @@ func (d *AuthenticatedGossiper) networkHandler() { return } - // Process the network announcement to determine if - // this is either a new announcement from our PoV - // or an edges to a prior vertex/edge we previously - // proceeded. + // Process the network announcement to + // determine if this is either a new + // announcement from our PoV or an edges to a + // prior vertex/edge we previously proceeded. emittedAnnouncements := d.processNetworkAnnouncement( announcement, ) // If this message had any dependencies, then // we can now signal them to continue. - validationBarrier.SignalDependants(announcement.msg) + validationBarrier.SignalDependants( + announcement.msg, + ) - // If the announcement was accepted, then add the - // emitted announcements to our announce batch to - // be broadcast once the trickle timer ticks gain. + // If the announcement was accepted, then add + // the emitted announcements to our announce + // batch to be broadcast once the trickle timer + // ticks gain. if emittedAnnouncements != nil { - // TODO(roasbeef): exclude peer that sent + // TODO(roasbeef): exclude peer that + // sent. announcements.AddMsgs( emittedAnnouncements..., ) @@ -1138,7 +1149,7 @@ func (d *AuthenticatedGossiper) networkHandler() { // for this height, if so, then we process them once // more as normal announcements. d.Lock() - numPremature := len(d.prematureAnnouncements[uint32(newBlock.Height)]) + numPremature := len(d.prematureAnnouncements[blockHeight]) d.Unlock() // Return early if no announcement to process. @@ -1150,7 +1161,7 @@ func (d *AuthenticatedGossiper) networkHandler() { "for height %v", numPremature, blockHeight) d.Lock() - for _, ann := range d.prematureAnnouncements[uint32(newBlock.Height)] { + for _, ann := range d.prematureAnnouncements[blockHeight] { emittedAnnouncements := d.processNetworkAnnouncement(ann) if emittedAnnouncements != nil { announcements.AddMsgs( @@ -1216,9 +1227,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // The retransmission timer has ticked which indicates that we // should check if we need to prune or re-broadcast any of our - // personal channels. This addresses the case of "zombie" channels and - // channel advertisements that have been dropped, or not properly - // propagated through the network. + // personal channels. This addresses the case of "zombie" + // channels and channel advertisements that have been dropped, + // or not properly propagated through the network. case <-retransmitTimer.C: if err := d.retransmitStaleChannels(); err != nil { log.Errorf("unable to rebroadcast stale "+ @@ -1241,7 +1252,9 @@ func (d *AuthenticatedGossiper) networkHandler() { // needed to handle new queries. The recvUpdates bool indicates if we should // continue to receive real-time updates from the remote peer once we've synced // channel state. -func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) { +func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, + recvUpdates bool) { + d.syncerMtx.Lock() defer d.syncerMtx.Unlock() @@ -1493,7 +1506,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate( // situation in the case where we create a channel, but for some reason fail // to receive the remote peer's proof, while the remote peer is able to fully // assemble the proof and craft the ChannelAnnouncement. -func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAnnouncement, +func (d *AuthenticatedGossiper) processRejectedEdge( + chanAnnMsg *lnwire.ChannelAnnouncement, proof *channeldb.ChannelAuthProof) ([]networkMsg, error) { // First, we'll fetch the state of the channel as we know if from the @@ -1576,7 +1590,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn // didn't affect the internal state due to either being out of date, invalid, // or redundant, then nil is returned. Otherwise, the set of announcements will // be returned which should be broadcasted to the rest of the network. -func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []networkMsg { +func (d *AuthenticatedGossiper) processNetworkAnnouncement( + nMsg *networkMsg) []networkMsg { + isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { // TODO(roasbeef) make height delta 6 // * or configurable @@ -1612,7 +1628,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n } } - features := lnwire.NewFeatureVector(msg.Features, lnwire.GlobalFeatures) + features := lnwire.NewFeatureVector( + msg.Features, lnwire.GlobalFeatures, + ) node := &channeldb.LightningNode{ HaveNodeAnnouncement: true, LastUpdate: timestamp, @@ -1675,8 +1693,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // to be fully verified once we advance forward in the chain. if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { blockHeight := msg.ShortChannelID.BlockHeight - log.Infof("Announcement for chan_id=(%v), is premature: "+ - "advertises height %v, only height %v is known", + log.Infof("Announcement for chan_id=(%v), is "+ + "premature: advertises height %v, only "+ + "height %v is known", msg.ShortChannelID.ToUint64(), msg.ShortChannelID.BlockHeight, atomic.LoadUint32(&d.bestHeight)) @@ -2032,7 +2051,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n } if err := d.cfg.Router.UpdateEdge(update); err != nil { - if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) { + if routing.IsError(err, routing.ErrOutdated, + routing.ErrIgnored) { log.Debug(err) } else { d.rejectMtx.Lock() @@ -2097,7 +2117,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // willingness of nodes involved in the funding of a channel to // announce this new channel to the rest of the world. case *lnwire.AnnounceSignatures: - needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta + needBlockHeight := msg.ShortChannelID.BlockHeight + + d.cfg.ProofMatureDelta shortChanID := msg.ShortChannelID.ToUint64() prefix := "local" @@ -2136,6 +2157,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // not change before we call AddProof() later. d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) + chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( msg.ShortChannelID) if err != nil { @@ -2187,7 +2209,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n // Since the remote peer might not be online // we'll call a method that will attempt to // deliver the proof when it comes online. - if err := d.sendAnnSigReliably(msg, remotePeer); err != nil { + err := d.sendAnnSigReliably(msg, remotePeer) + if err != nil { err := errors.Errorf("unable to send reliably "+ "to remote for short_chan_id=%v: %v", shortChanID, err) @@ -2219,13 +2242,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n peerID) chanAnn, _, _, err := CreateChanAnnouncement( - chanInfo.AuthProof, chanInfo, e1, e2, + chanInfo.AuthProof, chanInfo, + e1, e2, ) if err != nil { - log.Errorf("unable to gen ann: %v", err) + log.Errorf("unable to gen "+ + "ann: %v", err) return } - err = nMsg.peer.SendMessage(false, chanAnn) + err = nMsg.peer.SendMessage( + false, chanAnn, + ) if err != nil { log.Errorf("Failed sending "+ "full proof to "+ @@ -2234,7 +2261,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return } log.Debugf("Full proof sent to peer=%x"+ - " for chanID=%v", peerID, msg.ChannelID) + " for chanID=%v", peerID, + msg.ChannelID) }() } @@ -2293,7 +2321,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes() } - chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(&dbProof, chanInfo, e1, e2) + chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement( + &dbProof, chanInfo, e1, e2, + ) if err != nil { log.Error(err) nMsg.err <- err @@ -2328,9 +2358,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n return nil } - if err := d.waitingProofs.Remove(proof.OppositeKey()); err != nil { + err = d.waitingProofs.Remove(proof.OppositeKey()) + if err != nil { err := errors.Errorf("unable remove opposite proof "+ - "for the channel with chanID=%v: %v", msg.ChannelID, err) + "for the channel with chanID=%v: %v", + msg.ChannelID, err) log.Error(err) nMsg.err <- err return nil @@ -2443,8 +2475,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( remotePeer.SerializeCompressed()) case <-d.quit: - log.Infof("Gossiper shutting down, did not send" + - " AnnounceSignatures.") + log.Infof("Gossiper shutting down, did not " + + "send AnnounceSignatures.") return } } @@ -2463,7 +2495,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably( // updateChannel creates a new fully signed update for the channel, and updates // the underlying graph with the new state. func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, - edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, *lnwire.ChannelUpdate, error) { + edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, + *lnwire.ChannelUpdate, error) { var err error @@ -2564,7 +2597,9 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, // maybeRequestChanAnn will attempt to request the full channel announcement // for a particular short chan ID. We do this in the case that we get a channel // update, yet don't already have a channel announcement for it. -func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) error { +func (d *AuthenticatedGossiper) maybeRequestChanAnn( + cid lnwire.ShortChannelID) error { + d.syncerMtx.Lock() defer d.syncerMtx.Unlock()