discovery/gossiper: isolate sync with node from query

This commit refactors the SynchronizeNode logic such that
  it can be called without interacting with the gossiper's
  main execution loop. This method does not require access
  to any of the gossiper's internal state, making the change
  fairly straightforward. The primary motivation behind
  this change is to minimize the possibility of introducing
  deadlock scenarios between the gossiper and server.
This commit is contained in:
Conner Fromknecht 2017-10-23 16:09:24 -07:00 committed by Olaoluwa Osuntokun
parent 0e07699550
commit 3b2e4caa93

@ -30,12 +30,6 @@ type networkMsg struct {
err chan error err chan error
} }
// syncRequest represents a request from an outside subsystem to the wallet to
// sync a new node to the latest graph state.
type syncRequest struct {
node *btcec.PublicKey
}
// feeUpdateRequest is a request that is sent to the server when a caller // feeUpdateRequest is a request that is sent to the server when a caller
// wishes to update the fees for a particular set of channels. New UpdateFee // wishes to update the fees for a particular set of channels. New UpdateFee
// messages will be crafted to be sent out during the next broadcast epoch and // messages will be crafted to be sent out during the next broadcast epoch and
@ -156,11 +150,6 @@ type AuthenticatedGossiper struct {
// networkHandler. // networkHandler.
networkMsgs chan *networkMsg networkMsgs chan *networkMsg
// syncRequests is a channel that carries requests to synchronize newly
// connected peers to the state of the lightning network topology from
// our PoV.
syncRequests chan *syncRequest
// feeUpdates is a channel that requests to update the fee schedule of // feeUpdates is a channel that requests to update the fee schedule of
// a set of channels is sent over. // a set of channels is sent over.
feeUpdates chan *feeUpdateRequest feeUpdates chan *feeUpdateRequest
@ -186,7 +175,6 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
cfg: &cfg, cfg: &cfg,
networkMsgs: make(chan *networkMsg), networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}), quit: make(chan struct{}),
syncRequests: make(chan *syncRequest),
feeUpdates: make(chan *feeUpdateRequest), feeUpdates: make(chan *feeUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg), prematureAnnouncements: make(map[uint32][]*networkMsg),
waitingProofs: storage, waitingProofs: storage,
@ -196,15 +184,87 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
// SynchronizeNode sends a message to the service indicating it should // SynchronizeNode sends a message to the service indicating it should
// synchronize lightning topology state with the target node. This method is to // synchronize lightning topology state with the target node. This method is to
// be utilized when a node connections for the first time to provide it with // be utilized when a node connections for the first time to provide it with
// the latest topology update state. // the latest topology update state. In order to accomplish this, (currently)
func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) { // the entire network graph is read from disk, then serialized to the format
select { // defined within the current wire protocol. This cache of graph data is then
case d.syncRequests <- &syncRequest{ // sent directly to the target node.
node: pub, func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) error {
}: // TODO(roasbeef): need to also store sig data in db
case <-d.quit: // * will be nice when we switch to pairing sigs would only need one ^_^
return
// We'll collate all the gathered routing messages into a single slice
// containing all the messages to be sent to the target peer.
var announceMessages []lnwire.Message
// As peers are expecting channel announcements before node
// announcements, we first retrieve the initial announcement, as well as
// the latest channel update announcement for both of the directed edges
// that make up each channel, and queue these to be sent to the peer.
var numEdges uint32
if err := d.cfg.Router.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
// First, using the parameters of the channel, along with the
// channel authentication proof, we'll create re-create the
// original authenticated channel announcement.
if chanInfo.AuthProof != nil {
chanAnn, e1Ann, e2Ann := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2)
announceMessages = append(announceMessages, chanAnn)
if e1Ann != nil {
announceMessages = append(announceMessages, e1Ann)
} }
if e2Ann != nil {
announceMessages = append(announceMessages, e2Ann)
}
numEdges++
}
return nil
}); err != nil && err != channeldb.ErrGraphNoEdgesFound {
log.Errorf("unable to sync infos with peer: %v", err)
return err
}
// Run through all the vertexes in the graph, retrieving the data for
// the node announcements we originally retrieved.
var numNodes uint32
if err := d.cfg.Router.ForEachNode(func(node *channeldb.LightningNode) error {
// If this is a node we never received a node announcement for,
// we skip it.
if !node.HaveNodeAnnouncement {
return nil
}
alias, err := lnwire.NewNodeAlias(node.Alias)
if err != nil {
return err
}
ann := &lnwire.NodeAnnouncement{
Signature: node.AuthSig,
Timestamp: uint32(node.LastUpdate.Unix()),
Addresses: node.Addresses,
NodeID: node.PubKey,
Alias: alias,
Features: node.Features.RawFeatureVector,
}
announceMessages = append(announceMessages, ann)
numNodes++
return nil
}); err != nil {
return err
}
log.Infof("Syncing channel graph state with %x, sending %v "+
"vertexes and %v edges", pub.SerializeCompressed(),
numNodes, numEdges)
// With all the announcement messages gathered, send them all in a
// single batch to the target peer.
return d.cfg.SendToPeer(pub, announceMessages...)
} }
// PropagateFeeUpdate signals the AuthenticatedGossiper to update the fee // PropagateFeeUpdate signals the AuthenticatedGossiper to update the fee
@ -470,18 +530,6 @@ func (d *AuthenticatedGossiper) networkHandler() {
"channels: %v", err) "channels: %v", err)
} }
// We've just received a new request to synchronize a peer with
// our latest lightning network topology state. This indicates
// that a peer has just connected for the first time, so for
// now we dump our entire network graph and allow them to sift
// through the (subjectively) new information on their own.
case syncReq := <-d.syncRequests:
nodePub := syncReq.node.SerializeCompressed()
if err := d.synchronizeWithNode(syncReq); err != nil {
log.Errorf("unable to sync graph state with %x: %v",
nodePub, err)
}
// The gossiper has been signalled to exit, to we exit our // The gossiper has been signalled to exit, to we exit our
// main loop so the wait group can be decremented. // main loop so the wait group can be decremented.
case <-d.quit: case <-d.quit:
@ -1113,92 +1161,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
} }
} }
// synchronizeWithNode attempts to synchronize the target node in the syncReq
// to the latest channel graph state. In order to accomplish this, (currently)
// the entire network graph is read from disk, then serialized to the format
// defined within the current wire protocol. This cache of graph data is then
// sent directly to the target node.
func (d *AuthenticatedGossiper) synchronizeWithNode(syncReq *syncRequest) error {
targetNode := syncReq.node
// TODO(roasbeef): need to also store sig data in db
// * will be nice when we switch to pairing sigs would only need one ^_^
// We'll collate all the gathered routing messages into a single slice
// containing all the messages to be sent to the target peer.
var announceMessages []lnwire.Message
// As peers are expecting channel announcements before node
// announcements, we first retrieve the initial announcement, as well as
// the latest channel update announcement for both of the directed edges
// that make up each channel, and queue these to be sent to the peer.
var numEdges uint32
if err := d.cfg.Router.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
// First, using the parameters of the channel, along with the
// channel authentication proof, we'll create re-create the
// original authenticated channel announcement.
if chanInfo.AuthProof != nil {
chanAnn, e1Ann, e2Ann := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2)
announceMessages = append(announceMessages, chanAnn)
if e1Ann != nil {
announceMessages = append(announceMessages, e1Ann)
}
if e2Ann != nil {
announceMessages = append(announceMessages, e2Ann)
}
numEdges++
}
return nil
}); err != nil && err != channeldb.ErrGraphNoEdgesFound {
log.Errorf("unable to sync infos with peer: %v", err)
return err
}
// Run through all the vertexes in the graph, retrieving the data for
// the node announcements we originally retrieved.
var numNodes uint32
if err := d.cfg.Router.ForEachNode(func(node *channeldb.LightningNode) error {
// If this is a node we never received a node announcement for,
// we skip it.
if !node.HaveNodeAnnouncement {
return nil
}
alias, err := lnwire.NewNodeAlias(node.Alias)
if err != nil {
return err
}
ann := &lnwire.NodeAnnouncement{
Signature: node.AuthSig,
Timestamp: uint32(node.LastUpdate.Unix()),
Addresses: node.Addresses,
NodeID: node.PubKey,
Alias: alias,
Features: node.Features.RawFeatureVector,
}
announceMessages = append(announceMessages, ann)
numNodes++
return nil
}); err != nil {
return err
}
log.Infof("Syncing channel graph state with %x, sending %v "+
"vertexes and %v edges", targetNode.SerializeCompressed(),
numNodes, numEdges)
// With all the announcement messages gathered, send them all in a
// single batch to the target peer.
return d.cfg.SendToPeer(targetNode, announceMessages...)
}
// updateChannel creates a new fully signed update for the channel, and updates // updateChannel creates a new fully signed update for the channel, and updates
// the underlying graph with the new state. // the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,