test: modify new network announcement hook to be in node level

This commit modifies the two newly added network announcement hook stop
be at the lightningNode level rather than on the level of the entire
test framework. With this, callers are now able to better utilize the
newly added RPC’s since they can target particular peers and wait for
network messages to be processed rather then depending on a single node
(Alice) for information about the announcements propagated within the
network.
This commit is contained in:
Olaoluwa Osuntokun 2017-03-14 15:38:04 -07:00
parent 5623df6d7e
commit a179a3adbb
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
3 changed files with 256 additions and 211 deletions

@ -1991,7 +1991,7 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) {
closedChan := graphUpdate.ClosedChans[0]
if closedChan.ClosedHeight != uint32(blockHeight+1) {
t.Fatalf("close heights of channel mismatch: expected "+
"%v, got v", blockHeight+1, closedChan.ClosedHeight)
"%v, got %v", blockHeight+1, closedChan.ClosedHeight)
}
if !bytes.Equal(closedChan.ChanPoint.FundingTxid,
chanPoint.FundingTxid) {

@ -1,6 +1,7 @@
package main
import (
"bytes"
"encoding/hex"
"fmt"
"io"
@ -19,8 +20,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"bytes"
"os/exec"
"github.com/go-errors/errors"
@ -100,6 +99,11 @@ type lightningNode struct {
extraArgs []string
chanWatchRequests chan *chanWatchRequest
quit chan struct{}
wg sync.WaitGroup
lnrpc.LightningClient
}
@ -129,13 +133,15 @@ func newLightningNode(rpcConfig *btcrpcclient.ConnConfig, lndArgs []string) (*li
numActiveNodes++
return &lightningNode{
cfg: cfg,
p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)),
rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)),
rpcCert: rpcConfig.Certificates,
nodeID: nodeNum,
processExit: make(chan struct{}),
extraArgs: lndArgs,
cfg: cfg,
p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)),
rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)),
rpcCert: rpcConfig.Certificates,
nodeID: nodeNum,
chanWatchRequests: make(chan *chanWatchRequest),
processExit: make(chan struct{}),
quit: make(chan struct{}),
extraArgs: lndArgs,
}, nil
}
@ -162,10 +168,10 @@ func (l *lightningNode) genArgs() []string {
return args
}
// start launches a new process running lnd. Additionally, the PID of the
// Start launches a new process running lnd. Additionally, the PID of the
// launched process is saved in order to possibly kill the process forcibly
// later.
func (l *lightningNode) start(lndError chan error) error {
func (l *lightningNode) Start(lndError chan error) error {
args := l.genArgs()
l.cmd = exec.Command("lnd", args...)
@ -231,6 +237,11 @@ func (l *lightningNode) start(lndError chan error) error {
}
copy(l.PubKey[:], pubkey)
// Launch the watcher that'll hook into graph related topology change
// from the PoV of this node.
l.wg.Add(1)
go l.lightningNetworkWatcher()
return nil
}
@ -250,8 +261,8 @@ func (l *lightningNode) cleanup() error {
return err
}
// stop attempts to stop the active lnd process.
func (l *lightningNode) stop() error {
// Stop attempts to stop the active lnd process.
func (l *lightningNode) Stop() error {
// We should skip node stop in case:
// - start of the node wasn't initiated
// - process wasn't spawned
@ -261,6 +272,9 @@ func (l *lightningNode) stop() error {
case <-l.processExit:
return nil
default:
close(l.quit)
l.wg.Wait()
if runtime.GOOS == "windows" {
return l.cmd.Process.Signal(os.Kill)
}
@ -268,20 +282,22 @@ func (l *lightningNode) stop() error {
}
}
// restart attempts to restart a lightning node by shutting it down cleanly,
// Restart attempts to restart a lightning node by shutting it down cleanly,
// then restarting the process. This function is fully blocking. Upon restart,
// the RPC connection to the node will be re-attempted, continuing iff the
// connection attempt is successful. Additionally, if a callback is passed, the
// closure will be executed after the node has been shutdown, but before the
// process has been started up again.
func (l *lightningNode) restart(errChan chan error, callback func() error) error {
if err := l.stop(); err != nil {
func (l *lightningNode) Restart(errChan chan error, callback func() error) error {
if err := l.Stop(); err != nil {
return nil
}
<-l.processExit
l.processExit = make(chan struct{})
l.quit = make(chan struct{})
l.wg = sync.WaitGroup{}
if callback != nil {
if err := callback(); err != nil {
@ -289,13 +305,13 @@ func (l *lightningNode) restart(errChan chan error, callback func() error) error
}
}
return l.start(errChan)
return l.Start(errChan)
}
// shutdown stops the active lnd process and clean up any temporary directories
// Shutdown stops the active lnd process and clean up any temporary directories
// created along the way.
func (l *lightningNode) shutdown() error {
if err := l.stop(); err != nil {
func (l *lightningNode) Shutdown() error {
if err := l.Stop(); err != nil {
return err
}
if err := l.cleanup(); err != nil {
@ -304,6 +320,220 @@ func (l *lightningNode) shutdown() error {
return nil
}
// closeChanWatchRequest is a request to the lightningNetworkWatcher to be
// notified once it's detected within the test Lightning Network, that a
// channel has either been added or closed.
type chanWatchRequest struct {
chanPoint wire.OutPoint
chanOpen bool
eventChan chan struct{}
}
// lightningNetworkWatcher is a goroutine which is able to dispatch
// notifications once it has been observed that a target channel has been
// closed or opened within the network. In order to dispatch these
// notifications, the GraphTopologySubscription client exposed as part of the
// gRPC interface is used.
func (l *lightningNode) lightningNetworkWatcher() {
defer l.wg.Done()
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
go func() {
ctxb := context.Background()
req := &lnrpc.GraphTopologySubscription{}
topologyClient, err := l.SubscribeChannelGraph(ctxb, req)
if err != nil {
// We panic here in case of an error as failure to
// create the topology client will cause all subsequent
// tests to fail.
panic(fmt.Errorf("unable to create topology "+
"client: %v", err))
}
for {
update, err := topologyClient.Recv()
if err == io.EOF {
return
} else if err != nil {
// Similar to the case above, we also panic
// here (and end the tests) as these
// notifications are critical to the success of
// many tests.
panic(fmt.Errorf("unable read update ntfn: %v", err))
}
graphUpdates <- update
}
}()
// For each outpoint, we'll track an integer which denotes the number
// of edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has
// propagated through the network.
openChans := make(map[wire.OutPoint]int)
openClients := make(map[wire.OutPoint][]chan struct{})
closedChans := make(map[wire.OutPoint]struct{})
closeClients := make(map[wire.OutPoint][]chan struct{})
for {
select {
// A new graph update has just been received, so we'll examine
// the current set of registered clients to see if we can
// dispatch any requests.
case graphUpdate := <-graphUpdates:
// For each new channel, we'll increment the number of
// edges seen by one.
for _, newChan := range graphUpdate.ChannelUpdates {
txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid)
op := wire.OutPoint{
Hash: *txid,
Index: newChan.ChanPoint.OutputIndex,
}
openChans[op]++
// For this new channel, if the number of edges
// seen is less than two, then the channel
// hasn't been fully announced yet.
if numEdges := openChans[op]; numEdges < 2 {
continue
}
// Otherwise, we'll notify all the registered
// clients and remove the dispatched clients.
for _, eventChan := range openClients[op] {
close(eventChan)
}
delete(openClients, op)
}
// For each channel closed, we'll mark that we've
// detected a channel closure while lnd was pruning the
// channel graph.
for _, closedChan := range graphUpdate.ClosedChans {
txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid)
op := wire.OutPoint{
Hash: *txid,
Index: closedChan.ChanPoint.OutputIndex,
}
closedChans[op] = struct{}{}
// As the channel has been closed, we'll notify
// all register clients.
for _, eventChan := range closeClients[op] {
close(eventChan)
}
delete(closeClients, op)
}
// A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for
// processing later.
case watchRequest := <-l.chanWatchRequests:
targetChan := watchRequest.chanPoint
// TODO(roasbeef): add update type also, checks for
// multiple of 2
if watchRequest.chanOpen {
// If this is a open request, then it can be
// dispatched if the number of edges seen for
// the channel is at least two.
if numEdges, _ := openChans[targetChan]; numEdges >= 2 {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of
// watch open clients for this out point.
openClients[targetChan] = append(openClients[targetChan],
watchRequest.eventChan)
continue
}
// If this is a close request, then it can be
// immediately dispatched if we've already seen a
// channel closure for this channel.
if _, ok := closedChans[targetChan]; ok {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of close watch
// clients for this out point.
closeClients[targetChan] = append(closeClients[targetChan],
watchRequest.eventChan)
case <-l.quit:
return
}
}
}
// WaitForNetworkChannelOpen will block until a channel with the target
// outpoint is seen as being fully advertised within the network. A channel is
// considered "fully advertised" once both of its directional edges has been
// advertised within the test Lightning Network.
func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txid, err := chainhash.NewHash(op.FundingTxid)
if err != nil {
return err
}
l.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
eventChan: eventChan,
chanOpen: true,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not opened before timeout")
}
}
// WaitForNetworkChannelClose will block until a channel with the target
// outpoint is seen as closed within the network. A channel is considered
// closed once a transaction spending the funding outpoint is seen within a
// confirmed block.
func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txid, err := chainhash.NewHash(op.FundingTxid)
if err != nil {
return err
}
l.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
eventChan: eventChan,
chanOpen: false,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not closed before timeout")
}
}
// networkHarness is an integration testing harness for the lightning network.
// The harness by default is created with two active nodes on the network:
// Alice and Bob.
@ -322,8 +552,6 @@ type networkHarness struct {
seenTxns chan chainhash.Hash
bitcoinWatchRequests chan *txWatchRequest
chanWatchRequests chan *chanWatchRequest
// Channel for transmitting stderr output from failed lightning node
// to main process.
lndErrorChan chan error
@ -340,7 +568,6 @@ func newNetworkHarness() (*networkHarness, error) {
activeNodes: make(map[int]*lightningNode),
seenTxns: make(chan chainhash.Hash),
bitcoinWatchRequests: make(chan *txWatchRequest),
chanWatchRequests: make(chan *chanWatchRequest),
lndErrorChan: make(chan error),
}, nil
}
@ -406,7 +633,7 @@ func (n *networkHarness) SetUp() error {
go func() {
var err error
defer wg.Done()
if err = n.Alice.start(n.lndErrorChan); err != nil {
if err = n.Alice.Start(n.lndErrorChan); err != nil {
errChan <- err
return
}
@ -414,7 +641,7 @@ func (n *networkHarness) SetUp() error {
go func() {
var err error
defer wg.Done()
if err = n.Bob.start(n.lndErrorChan); err != nil {
if err = n.Bob.Start(n.lndErrorChan); err != nil {
errChan <- err
return
}
@ -505,7 +732,7 @@ out:
// TearDownAll tears down all active nodes within the test lightning network.
func (n *networkHarness) TearDownAll() error {
for _, node := range n.activeNodes {
if err := node.shutdown(); err != nil {
if err := node.Shutdown(); err != nil {
return err
}
}
@ -525,7 +752,7 @@ func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) {
return nil, err
}
if err := node.start(n.lndErrorChan); err != nil {
if err := node.Start(n.lndErrorChan); err != nil {
return nil, err
}
@ -563,7 +790,7 @@ func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode)
// and invalidated prior state, or persistent state recovery, simulating node
// crashes, etc.
func (n *networkHarness) RestartNode(node *lightningNode, callback func() error) error {
return node.restart(n.lndErrorChan, callback)
return node.Restart(n.lndErrorChan, callback)
}
// TODO(roasbeef): add a WithChannel higher-order function?
@ -651,186 +878,6 @@ func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.
}
}
// closeChanWatchRequest is a request to the lightningNetworkWatcher to be
// notified once it's detected within the test Lightning Network, that a
// channel has either been added or closed.
type chanWatchRequest struct {
chanPoint wire.OutPoint
chanOpen bool
eventChan chan struct{}
}
// lightningNetworkWatcher is a goroutine which is able to dispatch
// notifications once it has been observed that a target channel has been
// closed or opened within the network. In order to dispatch these
// notifications, the GraphTopologySubscription client exposed as part of the
// gRPC interface is used.
//
// TODO(roasbeef): allow caller to select target nodes to recv ntfn from?
func (n *networkHarness) lightningNetworkWatcher() {
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
go func() {
ctxb := context.Background()
req := &lnrpc.GraphTopologySubscription{}
topologyClient, err := n.Alice.SubscribeChannelGraph(ctxb, req)
if err != nil {
// We panic here in case of an error as failure to
// create the topology client will cause all subsequent
// tests to fail.
panic(fmt.Errorf("unable to create topology "+
"client: %v", err))
}
graphUpdate, err := topologyClient.Recv()
if err == io.EOF {
return
} else if err != nil {
}
graphUpdates <- graphUpdate
}()
// For each outpoint, we'll track an integer which denotes the number
// of edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has
// propagated through the network.
openChans := make(map[wire.OutPoint]int)
openClients := make(map[wire.OutPoint][]chan struct{})
closedChans := make(map[wire.OutPoint]struct{})
closeClients := make(map[wire.OutPoint][]chan struct{})
for {
select {
// A new graph update has just been received, so we'll examine
// the current set of registered clients to see if we can
// dispatch any requests.
case graphUpdate := <-graphUpdates:
// For each new channel, we'll increment the number of
// edges seen by one.
for _, newChan := range graphUpdate.ChannelUpdates {
txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid)
op := wire.OutPoint{
Hash: *txid,
Index: newChan.ChanPoint.OutputIndex,
}
openChans[op] += 1
}
// For each channel closed, we'll mark that we've
// detected a channel closure while lnd was pruning the
// channel graph.
for _, closedChan := range graphUpdate.ClosedChans {
txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid)
op := wire.OutPoint{
Hash: *txid,
Index: closedChan.ChanPoint.OutputIndex,
}
closedChans[op] = struct{}{}
}
// A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for
// processing later.
case watchRequest := <-n.chanWatchRequests:
targetChan := watchRequest.chanPoint
if watchRequest.chanOpen {
// If this is a open request, then it can be
// dispatched if the number of edges seen for
// the channel is at least two.
if numEdges, _ := openChans[targetChan]; numEdges >= 2 {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of
// watch open clients for this out point.
openClients[targetChan] = append(openClients[targetChan],
watchRequest.eventChan)
}
// If this is a close request, then it can be
// immediately dispatched if we've already seen a
// channel closure for this channel.
if _, ok := closedChans[targetChan]; ok {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of close watch
// clients for this out point.
closeClients[targetChan] = append(closeClients[targetChan],
watchRequest.eventChan)
}
}
}
// WaitForNetworkChannelOpen will block until a channel with the target
// outpoint is seen as being fully advertised within the network. A channel is
// considered "fully advertised" once both of its directional edges has been
// advertised within the test Lightning Network.
func (n *networkHarness) WaitForNetworkChannelOpen(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txid, err := chainhash.NewHash(op.FundingTxid)
if err != nil {
return err
}
n.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
chanOpen: true,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not opened before timeout")
}
}
// WaitForNetworkChannelClose will block until a channel with the target
// outpoint is seen as closed within the network. A channel is considered
// closed once a transaction spending the funding outpoint is seen within a
// confirmed block.
func (n *networkHarness) WaitForNetworkChannelClose(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txid, err := chainhash.NewHash(op.FundingTxid)
if err != nil {
return err
}
n.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
chanOpen: false,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not closed before timeout")
}
}
// OpenChannel attempts to open a channel between srcNode and destNode with the
// passed channel funding parameters. If the passed context has a timeout, then
// if the timeout is reached before the channel pending notification is

@ -1722,8 +1722,6 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
return nil
}
}
return nil
}
// marshallTopologyChange performs a mapping from the topology change sturct