discovery/gossiper: formatting
This commit is contained in:
parent
0bc415c683
commit
c56082006f
@ -265,7 +265,9 @@ func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error {
|
||||
// containing all the messages to be sent to the target peer.
|
||||
var announceMessages []lnwire.Message
|
||||
|
||||
makeNodeAnn := func(n *channeldb.LightningNode) (*lnwire.NodeAnnouncement, error) {
|
||||
makeNodeAnn := func(n *channeldb.LightningNode) (
|
||||
*lnwire.NodeAnnouncement, error) {
|
||||
|
||||
alias, _ := lnwire.NewNodeAlias(n.Alias)
|
||||
|
||||
wireSig, err := lnwire.NewSigFromRawSignature(n.AuthSigBytes)
|
||||
@ -897,7 +899,9 @@ func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
|
||||
// gossip syncer for an inbound message so we can properly dispatch the
|
||||
// incoming message. If a gossip syncer isn't found, then one will be created
|
||||
// for the target peer.
|
||||
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (*gossipSyncer, error) {
|
||||
func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (
|
||||
*gossipSyncer, error) {
|
||||
|
||||
target := routing.NewVertex(pub)
|
||||
|
||||
// First, we'll try to find an existing gossiper for this peer.
|
||||
@ -992,7 +996,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
// First, we'll now create new fully signed updates for
|
||||
// the affected channels and also update the underlying
|
||||
// graph with the new state.
|
||||
newChanUpdates, err := d.processChanPolicyUpdate(policyUpdate)
|
||||
newChanUpdates, err := d.processChanPolicyUpdate(
|
||||
policyUpdate,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to craft policy updates: %v",
|
||||
err)
|
||||
@ -1065,7 +1071,8 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
// If this message was recently rejected, then we won't
|
||||
// attempt to re-process it.
|
||||
if d.isRecentlyRejectedMsg(announcement.msg) {
|
||||
announcement.err <- fmt.Errorf("recently rejected")
|
||||
announcement.err <- fmt.Errorf("recently " +
|
||||
"rejected")
|
||||
continue
|
||||
}
|
||||
|
||||
@ -1096,23 +1103,27 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
return
|
||||
}
|
||||
|
||||
// Process the network announcement to determine if
|
||||
// this is either a new announcement from our PoV
|
||||
// or an edges to a prior vertex/edge we previously
|
||||
// proceeded.
|
||||
// Process the network announcement to
|
||||
// determine if this is either a new
|
||||
// announcement from our PoV or an edges to a
|
||||
// prior vertex/edge we previously proceeded.
|
||||
emittedAnnouncements := d.processNetworkAnnouncement(
|
||||
announcement,
|
||||
)
|
||||
|
||||
// If this message had any dependencies, then
|
||||
// we can now signal them to continue.
|
||||
validationBarrier.SignalDependants(announcement.msg)
|
||||
validationBarrier.SignalDependants(
|
||||
announcement.msg,
|
||||
)
|
||||
|
||||
// If the announcement was accepted, then add the
|
||||
// emitted announcements to our announce batch to
|
||||
// be broadcast once the trickle timer ticks gain.
|
||||
// If the announcement was accepted, then add
|
||||
// the emitted announcements to our announce
|
||||
// batch to be broadcast once the trickle timer
|
||||
// ticks gain.
|
||||
if emittedAnnouncements != nil {
|
||||
// TODO(roasbeef): exclude peer that sent
|
||||
// TODO(roasbeef): exclude peer that
|
||||
// sent.
|
||||
announcements.AddMsgs(
|
||||
emittedAnnouncements...,
|
||||
)
|
||||
@ -1138,7 +1149,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
// for this height, if so, then we process them once
|
||||
// more as normal announcements.
|
||||
d.Lock()
|
||||
numPremature := len(d.prematureAnnouncements[uint32(newBlock.Height)])
|
||||
numPremature := len(d.prematureAnnouncements[blockHeight])
|
||||
d.Unlock()
|
||||
|
||||
// Return early if no announcement to process.
|
||||
@ -1150,7 +1161,7 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
"for height %v", numPremature, blockHeight)
|
||||
|
||||
d.Lock()
|
||||
for _, ann := range d.prematureAnnouncements[uint32(newBlock.Height)] {
|
||||
for _, ann := range d.prematureAnnouncements[blockHeight] {
|
||||
emittedAnnouncements := d.processNetworkAnnouncement(ann)
|
||||
if emittedAnnouncements != nil {
|
||||
announcements.AddMsgs(
|
||||
@ -1216,9 +1227,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
|
||||
// The retransmission timer has ticked which indicates that we
|
||||
// should check if we need to prune or re-broadcast any of our
|
||||
// personal channels. This addresses the case of "zombie" channels and
|
||||
// channel advertisements that have been dropped, or not properly
|
||||
// propagated through the network.
|
||||
// personal channels. This addresses the case of "zombie"
|
||||
// channels and channel advertisements that have been dropped,
|
||||
// or not properly propagated through the network.
|
||||
case <-retransmitTimer.C:
|
||||
if err := d.retransmitStaleChannels(); err != nil {
|
||||
log.Errorf("unable to rebroadcast stale "+
|
||||
@ -1241,7 +1252,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
|
||||
// needed to handle new queries. The recvUpdates bool indicates if we should
|
||||
// continue to receive real-time updates from the remote peer once we've synced
|
||||
// channel state.
|
||||
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates bool) {
|
||||
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer,
|
||||
recvUpdates bool) {
|
||||
|
||||
d.syncerMtx.Lock()
|
||||
defer d.syncerMtx.Unlock()
|
||||
|
||||
@ -1493,7 +1506,8 @@ func (d *AuthenticatedGossiper) processChanPolicyUpdate(
|
||||
// situation in the case where we create a channel, but for some reason fail
|
||||
// to receive the remote peer's proof, while the remote peer is able to fully
|
||||
// assemble the proof and craft the ChannelAnnouncement.
|
||||
func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAnnouncement,
|
||||
func (d *AuthenticatedGossiper) processRejectedEdge(
|
||||
chanAnnMsg *lnwire.ChannelAnnouncement,
|
||||
proof *channeldb.ChannelAuthProof) ([]networkMsg, error) {
|
||||
|
||||
// First, we'll fetch the state of the channel as we know if from the
|
||||
@ -1576,7 +1590,9 @@ func (d *AuthenticatedGossiper) processRejectedEdge(chanAnnMsg *lnwire.ChannelAn
|
||||
// didn't affect the internal state due to either being out of date, invalid,
|
||||
// or redundant, then nil is returned. Otherwise, the set of announcements will
|
||||
// be returned which should be broadcasted to the rest of the network.
|
||||
func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []networkMsg {
|
||||
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
|
||||
nMsg *networkMsg) []networkMsg {
|
||||
|
||||
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
|
||||
// TODO(roasbeef) make height delta 6
|
||||
// * or configurable
|
||||
@ -1612,7 +1628,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
}
|
||||
}
|
||||
|
||||
features := lnwire.NewFeatureVector(msg.Features, lnwire.GlobalFeatures)
|
||||
features := lnwire.NewFeatureVector(
|
||||
msg.Features, lnwire.GlobalFeatures,
|
||||
)
|
||||
node := &channeldb.LightningNode{
|
||||
HaveNodeAnnouncement: true,
|
||||
LastUpdate: timestamp,
|
||||
@ -1675,8 +1693,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
// to be fully verified once we advance forward in the chain.
|
||||
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
|
||||
blockHeight := msg.ShortChannelID.BlockHeight
|
||||
log.Infof("Announcement for chan_id=(%v), is premature: "+
|
||||
"advertises height %v, only height %v is known",
|
||||
log.Infof("Announcement for chan_id=(%v), is "+
|
||||
"premature: advertises height %v, only "+
|
||||
"height %v is known",
|
||||
msg.ShortChannelID.ToUint64(),
|
||||
msg.ShortChannelID.BlockHeight,
|
||||
atomic.LoadUint32(&d.bestHeight))
|
||||
@ -2032,7 +2051,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
}
|
||||
|
||||
if err := d.cfg.Router.UpdateEdge(update); err != nil {
|
||||
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) {
|
||||
if routing.IsError(err, routing.ErrOutdated,
|
||||
routing.ErrIgnored) {
|
||||
log.Debug(err)
|
||||
} else {
|
||||
d.rejectMtx.Lock()
|
||||
@ -2097,7 +2117,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
// willingness of nodes involved in the funding of a channel to
|
||||
// announce this new channel to the rest of the world.
|
||||
case *lnwire.AnnounceSignatures:
|
||||
needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta
|
||||
needBlockHeight := msg.ShortChannelID.BlockHeight +
|
||||
d.cfg.ProofMatureDelta
|
||||
shortChanID := msg.ShortChannelID.ToUint64()
|
||||
|
||||
prefix := "local"
|
||||
@ -2136,6 +2157,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
// not change before we call AddProof() later.
|
||||
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
|
||||
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
|
||||
|
||||
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
|
||||
msg.ShortChannelID)
|
||||
if err != nil {
|
||||
@ -2187,7 +2209,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
// Since the remote peer might not be online
|
||||
// we'll call a method that will attempt to
|
||||
// deliver the proof when it comes online.
|
||||
if err := d.sendAnnSigReliably(msg, remotePeer); err != nil {
|
||||
err := d.sendAnnSigReliably(msg, remotePeer)
|
||||
if err != nil {
|
||||
err := errors.Errorf("unable to send reliably "+
|
||||
"to remote for short_chan_id=%v: %v",
|
||||
shortChanID, err)
|
||||
@ -2219,13 +2242,17 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
peerID)
|
||||
|
||||
chanAnn, _, _, err := CreateChanAnnouncement(
|
||||
chanInfo.AuthProof, chanInfo, e1, e2,
|
||||
chanInfo.AuthProof, chanInfo,
|
||||
e1, e2,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("unable to gen ann: %v", err)
|
||||
log.Errorf("unable to gen "+
|
||||
"ann: %v", err)
|
||||
return
|
||||
}
|
||||
err = nMsg.peer.SendMessage(false, chanAnn)
|
||||
err = nMsg.peer.SendMessage(
|
||||
false, chanAnn,
|
||||
)
|
||||
if err != nil {
|
||||
log.Errorf("Failed sending "+
|
||||
"full proof to "+
|
||||
@ -2234,7 +2261,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
return
|
||||
}
|
||||
log.Debugf("Full proof sent to peer=%x"+
|
||||
" for chanID=%v", peerID, msg.ChannelID)
|
||||
" for chanID=%v", peerID,
|
||||
msg.ChannelID)
|
||||
}()
|
||||
}
|
||||
|
||||
@ -2293,7 +2321,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes()
|
||||
dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes()
|
||||
}
|
||||
chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(&dbProof, chanInfo, e1, e2)
|
||||
chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(
|
||||
&dbProof, chanInfo, e1, e2,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
nMsg.err <- err
|
||||
@ -2328,9 +2358,11 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := d.waitingProofs.Remove(proof.OppositeKey()); err != nil {
|
||||
err = d.waitingProofs.Remove(proof.OppositeKey())
|
||||
if err != nil {
|
||||
err := errors.Errorf("unable remove opposite proof "+
|
||||
"for the channel with chanID=%v: %v", msg.ChannelID, err)
|
||||
"for the channel with chanID=%v: %v",
|
||||
msg.ChannelID, err)
|
||||
log.Error(err)
|
||||
nMsg.err <- err
|
||||
return nil
|
||||
@ -2443,8 +2475,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably(
|
||||
remotePeer.SerializeCompressed())
|
||||
|
||||
case <-d.quit:
|
||||
log.Infof("Gossiper shutting down, did not send" +
|
||||
" AnnounceSignatures.")
|
||||
log.Infof("Gossiper shutting down, did not " +
|
||||
"send AnnounceSignatures.")
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -2463,7 +2495,8 @@ func (d *AuthenticatedGossiper) sendAnnSigReliably(
|
||||
// updateChannel creates a new fully signed update for the channel, and updates
|
||||
// the underlying graph with the new state.
|
||||
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
|
||||
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, *lnwire.ChannelUpdate, error) {
|
||||
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement,
|
||||
*lnwire.ChannelUpdate, error) {
|
||||
|
||||
var err error
|
||||
|
||||
@ -2564,7 +2597,9 @@ func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
|
||||
// maybeRequestChanAnn will attempt to request the full channel announcement
|
||||
// for a particular short chan ID. We do this in the case that we get a channel
|
||||
// update, yet don't already have a channel announcement for it.
|
||||
func (d *AuthenticatedGossiper) maybeRequestChanAnn(cid lnwire.ShortChannelID) error {
|
||||
func (d *AuthenticatedGossiper) maybeRequestChanAnn(
|
||||
cid lnwire.ShortChannelID) error {
|
||||
|
||||
d.syncerMtx.Lock()
|
||||
defer d.syncerMtx.Unlock()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user