itest: replace WaitForTxBroadcast with WaitForTxInMempool

This commit is contained in:
Oliver Gugger 2020-10-26 14:21:08 +01:00
parent 1714394add
commit d1b46211d8
No known key found for this signature in database
GPG Key ID: 8E4256593F177720
2 changed files with 36 additions and 107 deletions

@ -57,9 +57,6 @@ type NetworkHarness struct {
Alice *HarnessNode
Bob *HarnessNode
seenTxns chan *chainhash.Hash
bitcoinWatchRequests chan *txWatchRequest
// Channel for transmitting stderr output from failed lightning node
// to main process.
lndErrorChan chan error
@ -85,8 +82,6 @@ func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string) (
n := NetworkHarness{
activeNodes: make(map[int]*HarnessNode),
nodesByPub: make(map[string]*HarnessNode),
seenTxns: make(chan *chainhash.Hash),
bitcoinWatchRequests: make(chan *txWatchRequest),
lndErrorChan: make(chan error),
netParams: r.ActiveNet,
Miner: r,
@ -95,7 +90,6 @@ func NewNetworkHarness(r *rpctest.Harness, b BackendConfig, lndBinary string) (
quit: make(chan struct{}),
lndBinary: lndBinary,
}
go n.networkWatcher()
return &n, nil
}
@ -746,81 +740,12 @@ func saveProfilesPage(node *HarnessNode) error {
return nil
}
// TODO(roasbeef): add a WithChannel higher-order function?
// * python-like context manager w.r.t using a channel within a test
// * possibly adds more funds to the target wallet if the funds are not
// enough
// txWatchRequest encapsulates a request to the harness' Bitcoin network
// watcher to dispatch a notification once a transaction with the target txid
// is seen within the test network.
type txWatchRequest struct {
txid chainhash.Hash
eventChan chan struct{}
}
// networkWatcher is a goroutine which accepts async notification
// requests for the broadcast of a target transaction, and then dispatches the
// transaction once its seen on the Bitcoin network.
func (n *NetworkHarness) networkWatcher() {
seenTxns := make(map[chainhash.Hash]struct{})
clients := make(map[chainhash.Hash][]chan struct{})
for {
select {
case <-n.quit:
return
case req := <-n.bitcoinWatchRequests:
// If we've already seen this transaction, then
// immediately dispatch the request. Otherwise, append
// to the list of clients who are watching for the
// broadcast of this transaction.
if _, ok := seenTxns[req.txid]; ok {
close(req.eventChan)
} else {
clients[req.txid] = append(clients[req.txid], req.eventChan)
}
case txid := <-n.seenTxns:
// Add this txid to our set of "seen" transactions. So
// we're able to dispatch any notifications for this
// txid which arrive *after* it's seen within the
// network.
seenTxns[*txid] = struct{}{}
// If there isn't a registered notification for this
// transaction then ignore it.
txClients, ok := clients[*txid]
if !ok {
continue
}
// Otherwise, dispatch the notification to all clients,
// cleaning up the now un-needed state.
for _, client := range txClients {
close(client)
}
delete(clients, *txid)
}
}
}
// OnTxAccepted is a callback to be called each time a new transaction has been
// broadcast on the network.
func (n *NetworkHarness) OnTxAccepted(hash *chainhash.Hash) {
select {
case n.seenTxns <- hash:
case <-n.quit:
return
}
}
// WaitForTxBroadcast blocks until the target txid is seen on the network. If
// WaitForTxInMempool blocks until the target txid is seen in the mempool. If
// the transaction isn't seen within the network before the passed timeout,
// then an error is returned.
// TODO(roasbeef): add another method which creates queue of all seen transactions
func (n *NetworkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.Hash) error {
func (n *NetworkHarness) WaitForTxInMempool(ctx context.Context,
txid chainhash.Hash) error {
// Return immediately if harness has been torn down.
select {
case <-n.quit:
@ -828,20 +753,29 @@ func (n *NetworkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash.
default:
}
eventChan := make(chan struct{})
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
n.bitcoinWatchRequests <- &txWatchRequest{
txid: txid,
eventChan: eventChan,
var mempool []*chainhash.Hash
for {
select {
case <-ctx.Done():
return fmt.Errorf("wanted %v, found %v txs "+
"in mempool: %v", txid, len(mempool), mempool)
case <-ticker.C:
var err error
mempool, err = n.Miner.Node.GetRawMempool()
if err != nil {
return err
}
select {
case <-eventChan:
for _, mempoolTx := range mempool {
if *mempoolTx == txid {
return nil
case <-n.quit:
return fmt.Errorf("NetworkHarness has been torn down")
case <-ctx.Done():
return fmt.Errorf("tx not seen before context timeout")
}
}
}
}
}
@ -1163,7 +1097,7 @@ func (n *NetworkHarness) CloseChannel(ctx context.Context,
"%v", err)
return
}
if err := n.WaitForTxBroadcast(ctx, *closeTxid); err != nil {
if err := n.WaitForTxInMempool(ctx, *closeTxid); err != nil {
errChan <- fmt.Errorf("error while waiting for "+
"broadcast tx: %v", err)
return

@ -14079,14 +14079,9 @@ func TestLightningNetworkDaemon(t *testing.T) {
//
// We will also connect it to our chain backend.
minerLogDir := "./.minerlogs"
handlers := &rpcclient.NotificationHandlers{
OnTxAccepted: func(hash *chainhash.Hash, amt btcutil.Amount) {
lndHarness.OnTxAccepted(hash)
},
}
miner, minerCleanUp, err := lntest.NewMiner(
minerLogDir, "output_btcd_miner.log",
harnessNetParams, handlers,
harnessNetParams, &rpcclient.NotificationHandlers{},
)
require.NoError(t, err, "failed to create new miner")
defer func() {