Merge pull request #1733 from cfromknecht/persist-network-watcher

lntest/node: persist network subscription state across restarts
This commit is contained in:
Olaoluwa Osuntokun 2018-09-28 17:16:37 -07:00 committed by GitHub
commit 79ed4e8b60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -204,6 +204,16 @@ type HarnessNode struct {
chanWatchRequests chan *chanWatchRequest chanWatchRequests chan *chanWatchRequest
// For each outpoint, we'll track an integer which denotes the number of
// edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has propagated
// through the network.
openChans map[wire.OutPoint]int
openClients map[wire.OutPoint][]chan struct{}
closedChans map[wire.OutPoint]struct{}
closeClients map[wire.OutPoint][]chan struct{}
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -242,6 +252,11 @@ func newNode(cfg nodeConfig) (*HarnessNode, error) {
cfg: &cfg, cfg: &cfg,
NodeID: nodeNum, NodeID: nodeNum,
chanWatchRequests: make(chan *chanWatchRequest), chanWatchRequests: make(chan *chanWatchRequest),
openChans: make(map[wire.OutPoint]int),
openClients: make(map[wire.OutPoint][]chan struct{}),
closedChans: make(map[wire.OutPoint]struct{}),
closeClients: make(map[wire.OutPoint][]chan struct{}),
}, nil }, nil
} }
@ -673,16 +688,6 @@ func (hn *HarnessNode) lightningNetworkWatcher() {
} }
}() }()
// For each outpoint, we'll track an integer which denotes the number
// of edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has
// propagated through the network.
openChans := make(map[wire.OutPoint]int)
openClients := make(map[wire.OutPoint][]chan struct{})
closedChans := make(map[wire.OutPoint]struct{})
closeClients := make(map[wire.OutPoint][]chan struct{})
for { for {
select { select {
@ -699,21 +704,21 @@ func (hn *HarnessNode) lightningNetworkWatcher() {
Hash: *txid, Hash: *txid,
Index: newChan.ChanPoint.OutputIndex, Index: newChan.ChanPoint.OutputIndex,
} }
openChans[op]++ hn.openChans[op]++
// For this new channel, if the number of edges // For this new channel, if the number of edges
// seen is less than two, then the channel // seen is less than two, then the channel
// hasn't been fully announced yet. // hasn't been fully announced yet.
if numEdges := openChans[op]; numEdges < 2 { if numEdges := hn.openChans[op]; numEdges < 2 {
continue continue
} }
// Otherwise, we'll notify all the registered // Otherwise, we'll notify all the registered
// clients and remove the dispatched clients. // clients and remove the dispatched clients.
for _, eventChan := range openClients[op] { for _, eventChan := range hn.openClients[op] {
close(eventChan) close(eventChan)
} }
delete(openClients, op) delete(hn.openClients, op)
} }
// For each channel closed, we'll mark that we've // For each channel closed, we'll mark that we've
@ -726,14 +731,14 @@ func (hn *HarnessNode) lightningNetworkWatcher() {
Hash: *txid, Hash: *txid,
Index: closedChan.ChanPoint.OutputIndex, Index: closedChan.ChanPoint.OutputIndex,
} }
closedChans[op] = struct{}{} hn.closedChans[op] = struct{}{}
// As the channel has been closed, we'll notify // As the channel has been closed, we'll notify
// all register clients. // all register clients.
for _, eventChan := range closeClients[op] { for _, eventChan := range hn.closeClients[op] {
close(eventChan) close(eventChan)
} }
delete(closeClients, op) delete(hn.closeClients, op)
} }
// A new watch request, has just arrived. We'll either be able // A new watch request, has just arrived. We'll either be able
@ -748,30 +753,34 @@ func (hn *HarnessNode) lightningNetworkWatcher() {
// If this is an open request, then it can be // If this is an open request, then it can be
// dispatched if the number of edges seen for // dispatched if the number of edges seen for
// the channel is at least two. // the channel is at least two.
if numEdges := openChans[targetChan]; numEdges >= 2 { if numEdges := hn.openChans[targetChan]; numEdges >= 2 {
close(watchRequest.eventChan) close(watchRequest.eventChan)
continue continue
} }
// Otherwise, we'll add this to the list of // Otherwise, we'll add this to the list of
// watch open clients for this out point. // watch open clients for this out point.
openClients[targetChan] = append(openClients[targetChan], hn.openClients[targetChan] = append(
watchRequest.eventChan) hn.openClients[targetChan],
watchRequest.eventChan,
)
continue continue
} }
// If this is a close request, then it can be // If this is a close request, then it can be
// immediately dispatched if we've already seen a // immediately dispatched if we've already seen a
// channel closure for this channel. // channel closure for this channel.
if _, ok := closedChans[targetChan]; ok { if _, ok := hn.closedChans[targetChan]; ok {
close(watchRequest.eventChan) close(watchRequest.eventChan)
continue continue
} }
// Otherwise, we'll add this to the list of close watch // Otherwise, we'll add this to the list of close watch
// clients for this out point. // clients for this out point.
closeClients[targetChan] = append(closeClients[targetChan], hn.closeClients[targetChan] = append(
watchRequest.eventChan) hn.closeClients[targetChan],
watchRequest.eventChan,
)
case <-hn.quit: case <-hn.quit:
return return