test: add ability to register for LN channel notification to framework

This commit adds the ability for test authors using the integration
testing framework to hook into real-time notifications for
network-level announcements concerning channel openings, closings, and
updates. With this commit we should be able to eliminate a number of
the sleeps within the test framework with synchronous calls (time outs)
to the new methods added in this PR.
This commit is contained in:
Olaoluwa Osuntokun 2017-03-13 22:15:41 -07:00
parent 24a69c1164
commit 19f33d4faf
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

@ -3,6 +3,7 @@ package main
import (
"encoding/hex"
"fmt"
"io"
"io/ioutil"
"log"
"net"
@ -20,6 +21,8 @@ import (
"bytes"
"os/exec"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/roasbeef/btcd/chaincfg"
@ -29,7 +32,6 @@ import (
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcrpcclient"
"github.com/roasbeef/btcutil"
"os/exec"
)
var (
@ -317,8 +319,10 @@ type networkHarness struct {
Alice *lightningNode
Bob *lightningNode
seenTxns chan chainhash.Hash
watchRequests chan *watchRequest
seenTxns chan chainhash.Hash
bitcoinWatchRequests chan *txWatchRequest
chanWatchRequests chan *chanWatchRequest
// Channel for transmitting stderr output from failed lightning node
// to main process.
@ -333,10 +337,11 @@ type networkHarness struct {
// within the repo each time before changes
func newNetworkHarness() (*networkHarness, error) {
return &networkHarness{
activeNodes: make(map[int]*lightningNode),
seenTxns: make(chan chainhash.Hash),
watchRequests: make(chan *watchRequest),
lndErrorChan: make(chan error),
activeNodes: make(map[int]*lightningNode),
seenTxns: make(chan chainhash.Hash),
bitcoinWatchRequests: make(chan *txWatchRequest),
chanWatchRequests: make(chan *chanWatchRequest),
lndErrorChan: make(chan error),
}, nil
}
@ -566,17 +571,17 @@ func (n *networkHarness) RestartNode(node *lightningNode, callback func() error)
// * possibly adds more funds to the target wallet if the funds are not
// enough
// watchRequest encapsulates a request to the harness' network watcher to
// dispatch a notification once a transaction with the target txid is seen
// within the test network.
type watchRequest struct {
// txWatchRequest encapsulates a request to the harness' Bitcoin network
// watcher to dispatch a notification once a transaction with the target txid
// is seen within the test network.
type txWatchRequest struct {
txid chainhash.Hash
eventChan chan struct{}
}
// networkWatcher is a goroutine which accepts async notification requests for
// the broadcast of a target transaction, and then dispatches the transaction
// once its seen on the network.
// bitcoinNetworkWatcher is a goroutine which accepts async notification
// requests for the broadcast of a target transaction, and then dispatches the
// transaction once its seen on the Bitcoin network.
func (n *networkHarness) networkWatcher() {
seenTxns := make(map[chainhash.Hash]struct{})
clients := make(map[chainhash.Hash][]chan struct{})
@ -584,7 +589,7 @@ func (n *networkHarness) networkWatcher() {
for {
select {
case req := <-n.watchRequests:
case req := <-n.bitcoinWatchRequests:
// If we've already seen this transaction, then
// immediately dispatch the request. Otherwise, append
// to the list of clients who are watching for the
@ -633,7 +638,10 @@ func (n *networkHarness) OnTxAccepted(hash *chainhash.Hash, amt btcutil.Amount)
func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error {
eventChan := make(chan struct{})
n.watchRequests <- &watchRequest{txid, eventChan}
n.bitcoinWatchRequests <- &txWatchRequest{
txid: txid,
eventChan: eventChan,
}
select {
case <-eventChan:
@ -643,6 +651,186 @@ func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.
}
}
// closeChanWatchRequest is a request to the lightningNetworkWatcher to be
// notified once it's detected within the test Lightning Network, that a
// channel has either been added or closed.
type chanWatchRequest struct {
chanPoint wire.OutPoint
chanOpen bool
eventChan chan struct{}
}
// lightningNetworkWatcher is a goroutine which is able to dispatch
// notifications once it has been observed that a target channel has been
// closed or opened within the network. In order to dispatch these
// notifications, the GraphTopologySubscription client exposed as part of the
// gRPC interface is used.
//
// TODO(roasbeef): allow caller to select target nodes to recv ntfn from?
func (n *networkHarness) lightningNetworkWatcher() {
graphUpdates := make(chan *lnrpc.GraphTopologyUpdate)
go func() {
ctxb := context.Background()
req := &lnrpc.GraphTopologySubscription{}
topologyClient, err := n.Alice.SubscribeChannelGraph(ctxb, req)
if err != nil {
// We panic here in case of an error as failure to
// create the topology client will cause all subsequent
// tests to fail.
panic(fmt.Errorf("unable to create topology "+
"client: %v", err))
}
graphUpdate, err := topologyClient.Recv()
if err == io.EOF {
return
} else if err != nil {
}
graphUpdates <- graphUpdate
}()
// For each outpoint, we'll track an integer which denotes the number
// of edges seen for that channel within the network. When this number
// reaches 2, then it means that both edge advertisements has
// propagated through the network.
openChans := make(map[wire.OutPoint]int)
openClients := make(map[wire.OutPoint][]chan struct{})
closedChans := make(map[wire.OutPoint]struct{})
closeClients := make(map[wire.OutPoint][]chan struct{})
for {
select {
// A new graph update has just been received, so we'll examine
// the current set of registered clients to see if we can
// dispatch any requests.
case graphUpdate := <-graphUpdates:
// For each new channel, we'll increment the number of
// edges seen by one.
for _, newChan := range graphUpdate.ChannelUpdates {
txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid)
op := wire.OutPoint{
Hash: *txid,
Index: newChan.ChanPoint.OutputIndex,
}
openChans[op] += 1
}
// For each channel closed, we'll mark that we've
// detected a channel closure while lnd was pruning the
// channel graph.
for _, closedChan := range graphUpdate.ClosedChans {
txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid)
op := wire.OutPoint{
Hash: *txid,
Index: closedChan.ChanPoint.OutputIndex,
}
closedChans[op] = struct{}{}
}
// A new watch request, has just arrived. We'll either be able
// to dispatch immediately, or need to add the client for
// processing later.
case watchRequest := <-n.chanWatchRequests:
targetChan := watchRequest.chanPoint
if watchRequest.chanOpen {
// If this is a open request, then it can be
// dispatched if the number of edges seen for
// the channel is at least two.
if numEdges, _ := openChans[targetChan]; numEdges >= 2 {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of
// watch open clients for this out point.
openClients[targetChan] = append(openClients[targetChan],
watchRequest.eventChan)
}
// If this is a close request, then it can be
// immediately dispatched if we've already seen a
// channel closure for this channel.
if _, ok := closedChans[targetChan]; ok {
close(watchRequest.eventChan)
continue
}
// Otherwise, we'll add this to the list of close watch
// clients for this out point.
closeClients[targetChan] = append(closeClients[targetChan],
watchRequest.eventChan)
}
}
}
// WaitForNetworkChannelOpen will block until a channel with the target
// outpoint is seen as being fully advertised within the network. A channel is
// considered "fully advertised" once both of its directional edges has been
// advertised within the test Lightning Network.
func (n *networkHarness) WaitForNetworkChannelOpen(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txid, err := chainhash.NewHash(op.FundingTxid)
if err != nil {
return err
}
n.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
chanOpen: true,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not opened before timeout")
}
}
// WaitForNetworkChannelClose will block until a channel with the target
// outpoint is seen as closed within the network. A channel is considered
// closed once a transaction spending the funding outpoint is seen within a
// confirmed block.
func (n *networkHarness) WaitForNetworkChannelClose(ctx context.Context,
op *lnrpc.ChannelPoint) error {
eventChan := make(chan struct{})
txid, err := chainhash.NewHash(op.FundingTxid)
if err != nil {
return err
}
n.chanWatchRequests <- &chanWatchRequest{
chanPoint: wire.OutPoint{
Hash: *txid,
Index: op.OutputIndex,
},
chanOpen: false,
}
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("channel not closed before timeout")
}
}
// OpenChannel attempts to open a channel between srcNode and destNode with the
// passed channel funding parameters. If the passed context has a timeout, then
// if the timeout is reached before the channel pending notification is