Merge pull request #1841 from cfromknecht/sync-gq-start

peer: Synchronous registration of gossip queries
This commit is contained in:
Olaoluwa Osuntokun 2018-09-05 16:37:03 -07:00 committed by GitHub
commit d1a80e35ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 76 deletions

View File

@ -35,6 +35,11 @@ var (
// ErrGossiperShuttingDown is an error that is returned if the gossiper
// is in the process of being shut down.
ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
// ErrGossipSyncerNotFound signals that we were unable to find an active
// gossip syncer corresponding to a gossip query message received from
// the remote peer.
ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
)
// networkMsg couples a routing related wire message with the peer that
@ -919,45 +924,7 @@ func (d *AuthenticatedGossiper) findGossipSyncer(pub *btcec.PublicKey) (
return syncer, nil
}
// A known gossip syncer doesn't exist, so we may have to create one
// from scratch. To do so, we'll query for a reference directly to the
// active peer.
syncPeer, err := d.cfg.FindPeer(pub)
if err != nil {
log.Debugf("unable to find gossip peer %v: %v",
pub.SerializeCompressed(), err)
return nil, err
}
// Finally, we'll obtain the exclusive mutex, then check again if a
// gossiper was added after we dropped the read mutex.
d.syncerMtx.Lock()
syncer, ok = d.peerSyncers[target]
if ok {
d.syncerMtx.Unlock()
return syncer, nil
}
// At this point, a syncer doesn't yet exist, so we'll create a new one
// for the peer and return it to the caller.
encoding := lnwire.EncodingSortedPlain
syncer = newGossiperSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,
syncChanUpdates: true,
channelSeries: d.cfg.ChanSeries,
encodingType: encoding,
chunkSize: encodingTypeToChunkSize[encoding],
sendToPeer: func(msgs ...lnwire.Message) error {
return syncPeer.SendMessage(false, msgs...)
},
})
copy(syncer.peerPub[:], pub.SerializeCompressed())
d.peerSyncers[target] = syncer
syncer.Start()
d.syncerMtx.Unlock()
return syncer, nil
return nil, ErrGossipSyncerNotFound
}
// networkHandler is the primary goroutine that drives this service. The roles
@ -1041,6 +1008,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcement.source,
)
if err != nil {
log.Warnf("Unable to find gossip "+
"syncer for peer=%x: %v",
announcement.peer.PubKey(), err)
continue
}
@ -1066,6 +1036,9 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcement.source,
)
if err != nil {
log.Warnf("Unable to find gossip "+
"syncer for peer=%x: %v",
announcement.source, err)
continue
}

64
peer.go
View File

@ -3,6 +3,7 @@ package main
import (
"bytes"
"container/list"
"errors"
"fmt"
"net"
"sync"
@ -15,7 +16,6 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
@ -32,7 +32,7 @@ var (
numNodes int32
// ErrPeerExiting signals that the peer received a disconnect request.
ErrPeerExiting = errors.Errorf("peer exiting")
ErrPeerExiting = fmt.Errorf("peer exiting")
)
const (
@ -113,7 +113,7 @@ type peer struct {
pubKeyBytes [33]byte
// startTime is the time this peer connection was successfully
// established. It will be zero for peers that did not successfuly
// established. It will be zero for peers that did not successfully
// Start().
startTime time.Time
@ -239,7 +239,7 @@ func (p *peer) Start() error {
return nil
}
peerLog.Tracef("peer %v starting", p)
peerLog.Tracef("Peer %v starting", p)
// Exchange local and global features, the init message should be very
// first between two nodes.
@ -320,6 +320,43 @@ func (p *peer) Start() error {
return nil
}
// initGossipSync initializes either a gossip syncer or an initial routing
// dump, depending on the negotiated synchronization method.
func (p *peer) initGossipSync() {
switch {
// If the remote peer knows of the new gossip queries feature, then
// we'll create a new gossipSyncer in the AuthenticatedGossiper for it.
case p.remoteLocalFeatures.HasFeature(lnwire.GossipQueriesOptional):
srvrLog.Infof("Negotiated chan series queries with %x",
p.pubKeyBytes[:])
// We'll only request channel updates from the remote peer if
// its enabled in the config, or we're already getting updates
// from enough peers.
//
// TODO(roasbeef): craft s.t. we only get updates from a few
// peers
recvUpdates := !cfg.NoChanUpdates
// Register the this peer's for gossip syncer with the gossiper.
// This is blocks synchronously to ensure the gossip syncer is
// registered with the gossiper before attempting to read
// messages from the remote peer.
p.server.authGossiper.InitSyncState(p, recvUpdates)
// If the remote peer has the initial sync feature bit set, then we'll
// being the synchronization protocol to exchange authenticated channel
// graph edges/vertexes, but only if they don't know of the new gossip
// queries.
case p.remoteLocalFeatures.HasFeature(lnwire.InitialRoutingSync):
srvrLog.Infof("Requesting full table sync with %x",
p.pubKeyBytes[:])
go p.server.authGossiper.SynchronizeNode(p)
}
}
// QuitSignal is a method that should return a channel which will be sent upon
// or closed once the backing peer exits. This allows callers using the
// interface to cancel any processing in the event the backing implementation
@ -569,7 +606,7 @@ func (p *peer) Disconnect(reason error) {
return
}
peerLog.Debugf("Disconnecting %s, reason: %v", p, reason)
peerLog.Infof("Disconnecting %s, reason: %v", p, reason)
// Ensure that the TCP connection is properly closed before continuing.
p.conn.Close()
@ -877,6 +914,14 @@ func (p *peer) readHandler() {
p.Disconnect(err)
})
// Initialize our negotiated gossip sync method before reading
// messages off the wire. When using gossip queries, this ensures
// a gossip syncer is active by the time query messages arrive.
//
// TODO(conner): have peer store gossip syncer directly and bypass
// gossiper?
p.initGossipSync()
discStream := newDiscMsgStream(p)
discStream.Start()
defer discStream.Stop()
@ -1316,7 +1361,8 @@ out:
}
if err != nil {
exitErr = errors.Errorf("unable to write message: %v", err)
exitErr = fmt.Errorf("unable to write "+
"message: %v", err)
break out
}
@ -2048,17 +2094,15 @@ func (p *peer) handleInitMsg(msg *lnwire.Init) error {
unknownLocalFeatures := p.remoteLocalFeatures.UnknownRequiredFeatures()
if len(unknownLocalFeatures) > 0 {
err := errors.Errorf("Peer set unknown local feature bits: %v",
err := fmt.Errorf("Peer set unknown local feature bits: %v",
unknownLocalFeatures)
peerLog.Error(err)
return err
}
unknownGlobalFeatures := p.remoteGlobalFeatures.UnknownRequiredFeatures()
if len(unknownGlobalFeatures) > 0 {
err := errors.Errorf("Peer set unknown global feature bits: %v",
err := fmt.Errorf("Peer set unknown global feature bits: %v",
unknownGlobalFeatures)
peerLog.Error(err)
return err
}

View File

@ -2356,33 +2356,6 @@ func (s *server) peerInitializer(p *peer) {
// was successful, and to begin watching the peer's wait group.
close(ready)
switch {
// If the remote peer knows of the new gossip queries feature, then
// we'll create a new gossipSyncer in the AuthenticatedGossiper for it.
case p.remoteLocalFeatures.HasFeature(lnwire.GossipQueriesOptional):
srvrLog.Infof("Negotiated chan series queries with %x",
p.pubKeyBytes[:])
// We'll only request channel updates from the remote peer if
// its enabled in the config, or we're already getting updates
// from enough peers.
//
// TODO(roasbeef): craft s.t. we only get updates from a few
// peers
recvUpdates := !cfg.NoChanUpdates
go s.authGossiper.InitSyncState(p, recvUpdates)
// If the remote peer has the initial sync feature bit set, then we'll
// being the synchronization protocol to exchange authenticated channel
// graph edges/vertexes, but only if they don't know of the new gossip
// queries.
case p.remoteLocalFeatures.HasFeature(lnwire.InitialRoutingSync):
srvrLog.Infof("Requesting full table sync with %x",
p.pubKeyBytes[:])
go s.authGossiper.SynchronizeNode(p)
}
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
s.mu.Lock()