diff --git a/routing/router.go b/routing/router.go index 2cdd5db3..3db9f083 100644 --- a/routing/router.go +++ b/routing/router.go @@ -3,6 +3,7 @@ package routing import ( "bytes" "fmt" + "runtime" "sort" "sync" "sync/atomic" @@ -496,35 +497,55 @@ func (r *ChannelRouter) networkHandler() { graphPruneTicker := time.NewTicker(r.cfg.GraphPruneInterval) defer graphPruneTicker.Stop() + // We'll use this validation barrier to ensure that we process all jobs + // in the proper order during parallel validation. + validationBarrier := NewValidationBarrier(runtime.NumCPU()*10, r.quit) + for { select { // A new fully validated network update has just arrived. As a // result we'll modify the channel graph accordingly depending // on the exact type of the message. case updateMsg := <-r.networkUpdates: - // Process the routing update to determine if this is - // either a new update from our PoV or an update to a - // prior vertex/edge we previously accepted. - err := r.processUpdate(updateMsg.msg) - updateMsg.err <- err - if err != nil { - continue - } + // We'll set up any dependants, and wait until a free + // slot for this job opens up, this allow us to not + // have thousands of goroutines active. + validationBarrier.InitJobDependancies(updateMsg.msg) - // Send off a new notification for the newly accepted - // update. - topChange := &TopologyChange{} - err = addToTopologyChange(r.cfg.Graph, topChange, - updateMsg.msg) - if err != nil { - log.Errorf("unable to update topology "+ - "change notification: %v", err) - continue - } + go func() { + defer validationBarrier.CompleteJob() - if !topChange.isEmpty() { - r.notifyTopologyChange(topChange) - } + // If this message has an existing dependency, + // then we'll wait until that has been fully + // validated before we proceed. + validationBarrier.WaitForDependants(updateMsg.msg) + + // Process the routing update to determine if + // this is either a new update from our PoV or + // an update to a prior vertex/edge we + // previously accepted. + err := r.processUpdate(updateMsg.msg) + updateMsg.err <- err + + // If this message had any dependencies, then + // we can now signal them to continue. + validationBarrier.SignalDependants(updateMsg.msg) + + // Send off a new notification for the newly + // accepted update. + topChange := &TopologyChange{} + err = addToTopologyChange(r.cfg.Graph, topChange, + updateMsg.msg) + if err != nil { + log.Errorf("unable to update topology "+ + "change notification: %v", err) + return + } + + if !topChange.isEmpty() { + r.notifyTopologyChange(topChange) + } + }() // TODO(roasbeef): remove all unconnected vertexes // after N blocks pass with no corresponding @@ -631,8 +652,13 @@ func (r *ChannelRouter) networkHandler() { clientID := ntfnUpdate.clientID if ntfnUpdate.cancel { - if client, ok := r.topologyClients[ntfnUpdate.clientID]; ok { + r.RLock() + client, ok := r.topologyClients[ntfnUpdate.clientID] + r.RUnlock() + if ok { + r.Lock() delete(r.topologyClients, clientID) + r.Unlock() close(client.exit) client.wg.Wait() @@ -643,10 +669,12 @@ func (r *ChannelRouter) networkHandler() { continue } + r.Lock() r.topologyClients[ntfnUpdate.clientID] = &topologyClient{ ntfnChan: ntfnUpdate.ntfnChan, exit: make(chan struct{}), } + r.Unlock() // The graph prune ticker has ticked, so we'll examine the // state of the known graph to filter out any zombie channels diff --git a/routing/router_test.go b/routing/router_test.go index e05c1cd9..63632852 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -62,6 +62,14 @@ func (c *testCtx) RestartRouter() error { return nil } +func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey { + return &btcec.PublicKey{ + Curve: btcec.S256(), + X: pub.X, + Y: pub.Y, + } +} + func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func(), error) { var ( graph *channeldb.ChannelGraph @@ -439,10 +447,10 @@ func TestAddProof(t *testing.T) { // After utxo was recreated adding the edge without the proof. edge := &channeldb.ChannelEdgeInfo{ ChannelID: chanID.ToUint64(), - NodeKey1: node1.PubKey, - NodeKey2: node2.PubKey, - BitcoinKey1: bitcoinKey1, - BitcoinKey2: bitcoinKey2, + NodeKey1: copyPubKey(node1.PubKey), + NodeKey2: copyPubKey(node2.PubKey), + BitcoinKey1: copyPubKey(bitcoinKey1), + BitcoinKey2: copyPubKey(bitcoinKey2), AuthProof: nil, } @@ -480,7 +488,7 @@ func TestIgnoreNodeAnnouncement(t *testing.T) { HaveNodeAnnouncement: true, LastUpdate: time.Unix(123, 0), Addresses: testAddrs, - PubKey: priv1.PubKey(), + PubKey: copyPubKey(priv1.PubKey()), Color: color.RGBA{1, 2, 3, 0}, Alias: "node11", AuthSig: testSig, @@ -540,10 +548,10 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { edge := &channeldb.ChannelEdgeInfo{ ChannelID: chanID.ToUint64(), - NodeKey1: priv1.PubKey(), - NodeKey2: priv2.PubKey(), - BitcoinKey1: bitcoinKey1, - BitcoinKey2: bitcoinKey2, + NodeKey1: copyPubKey(priv1.PubKey()), + NodeKey2: copyPubKey(priv2.PubKey()), + BitcoinKey1: copyPubKey(bitcoinKey1), + BitcoinKey2: copyPubKey(bitcoinKey2), AuthProof: nil, } if err := ctx.router.AddEdge(edge); err != nil { @@ -697,7 +705,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { HaveNodeAnnouncement: true, LastUpdate: time.Unix(123, 0), Addresses: testAddrs, - PubKey: priv1.PubKey(), + PubKey: copyPubKey(priv1.PubKey()), Color: color.RGBA{1, 2, 3, 0}, Alias: "node11", AuthSig: testSig, @@ -712,7 +720,7 @@ func TestAddEdgeUnknownVertexes(t *testing.T) { HaveNodeAnnouncement: true, LastUpdate: time.Unix(123, 0), Addresses: testAddrs, - PubKey: priv2.PubKey(), + PubKey: copyPubKey(priv2.PubKey()), Color: color.RGBA{1, 2, 3, 0}, Alias: "node22", AuthSig: testSig, @@ -847,10 +855,10 @@ func TestWakeUpOnStaleBranch(t *testing.T) { edge1 := &channeldb.ChannelEdgeInfo{ ChannelID: chanID1, - NodeKey1: node1.PubKey, - NodeKey2: node2.PubKey, - BitcoinKey1: bitcoinKey1, - BitcoinKey2: bitcoinKey2, + NodeKey1: copyPubKey(node1.PubKey), + NodeKey2: copyPubKey(node2.PubKey), + BitcoinKey1: copyPubKey(bitcoinKey1), + BitcoinKey2: copyPubKey(bitcoinKey2), AuthProof: &channeldb.ChannelAuthProof{ NodeSig1: testSig, NodeSig2: testSig, @@ -865,10 +873,10 @@ func TestWakeUpOnStaleBranch(t *testing.T) { edge2 := &channeldb.ChannelEdgeInfo{ ChannelID: chanID2, - NodeKey1: node1.PubKey, - NodeKey2: node2.PubKey, - BitcoinKey1: bitcoinKey1, - BitcoinKey2: bitcoinKey2, + NodeKey1: copyPubKey(node1.PubKey), + NodeKey2: copyPubKey(node2.PubKey), + BitcoinKey1: copyPubKey(bitcoinKey1), + BitcoinKey2: copyPubKey(bitcoinKey2), AuthProof: &channeldb.ChannelAuthProof{ NodeSig1: testSig, NodeSig2: testSig, @@ -1049,10 +1057,10 @@ func TestDisconnectedBlocks(t *testing.T) { edge1 := &channeldb.ChannelEdgeInfo{ ChannelID: chanID1, - NodeKey1: node1.PubKey, - NodeKey2: node2.PubKey, - BitcoinKey1: bitcoinKey1, - BitcoinKey2: bitcoinKey2, + NodeKey1: copyPubKey(node1.PubKey), + NodeKey2: copyPubKey(node2.PubKey), + BitcoinKey1: copyPubKey(bitcoinKey1), + BitcoinKey2: copyPubKey(bitcoinKey2), AuthProof: &channeldb.ChannelAuthProof{ NodeSig1: testSig, NodeSig2: testSig, @@ -1067,10 +1075,10 @@ func TestDisconnectedBlocks(t *testing.T) { edge2 := &channeldb.ChannelEdgeInfo{ ChannelID: chanID2, - NodeKey1: node1.PubKey, - NodeKey2: node2.PubKey, - BitcoinKey1: bitcoinKey1, - BitcoinKey2: bitcoinKey2, + NodeKey1: copyPubKey(node1.PubKey), + NodeKey2: copyPubKey(node2.PubKey), + BitcoinKey1: copyPubKey(bitcoinKey1), + BitcoinKey2: copyPubKey(bitcoinKey2), AuthProof: &channeldb.ChannelAuthProof{ NodeSig1: testSig, NodeSig2: testSig, @@ -1189,10 +1197,10 @@ func TestRouterChansClosedOfflinePruneGraph(t *testing.T) { } edge1 := &channeldb.ChannelEdgeInfo{ ChannelID: chanID1.ToUint64(), - NodeKey1: node1.PubKey, - NodeKey2: node2.PubKey, - BitcoinKey1: bitcoinKey1, - BitcoinKey2: bitcoinKey2, + NodeKey1: copyPubKey(node1.PubKey), + NodeKey2: copyPubKey(node2.PubKey), + BitcoinKey1: copyPubKey(bitcoinKey1), + BitcoinKey2: copyPubKey(bitcoinKey2), AuthProof: &channeldb.ChannelAuthProof{ NodeSig1: testSig, NodeSig2: testSig,