Merge pull request #5281 from guggero/router-chain-sync

itest flakes: fix multiple issues around router subsystem being out of sync
This commit is contained in:
Olaoluwa Osuntokun 2021-05-10 13:11:37 -07:00 committed by GitHub
commit 0d3253c410
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 5 deletions

@ -177,9 +177,16 @@ func (n *NetworkHarness) SetUp(testCase string, lndArgs []string) error {
default: default:
} }
// First, make a connection between the two nodes. This will wait until
// both nodes are fully started since the Connect RPC is guarded behind
// the server.Started() flag that waits for all subsystems to be ready.
ctxb := context.Background()
if err := n.ConnectNodes(ctxb, n.Alice, n.Bob); err != nil {
return err
}
// Load up the wallets of the seeder nodes with 10 outputs of 1 BTC // Load up the wallets of the seeder nodes with 10 outputs of 1 BTC
// each. // each.
ctxb := context.Background()
addrReq := &lnrpc.NewAddressRequest{ addrReq := &lnrpc.NewAddressRequest{
Type: lnrpc.AddressType_WITNESS_PUBKEY_HASH, Type: lnrpc.AddressType_WITNESS_PUBKEY_HASH,
} }
@ -216,8 +223,13 @@ func (n *NetworkHarness) SetUp(testCase string, lndArgs []string) error {
return err return err
} }
// Finally, make a connection between both of the nodes. // Now we want to wait for the nodes to catch up.
if err := n.ConnectNodes(ctxb, n.Alice, n.Bob); err != nil { ctxt, cancel := context.WithTimeout(ctxb, DefaultTimeout)
defer cancel()
if err := n.Alice.WaitForBlockchainSync(ctxt); err != nil {
return err
}
if err := n.Bob.WaitForBlockchainSync(ctxt); err != nil {
return err return err
} }

@ -11787,7 +11787,15 @@ func testSwitchOfflineDelivery(net *lntest.NetworkHarness, t *harnessTest) {
err) err)
} }
time.Sleep(time.Millisecond * 50) // Make sure all nodes are fully synced before we continue.
ctxt, cancel := context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
for _, node := range nodes {
err := node.WaitForBlockchainSync(ctxt)
if err != nil {
t.Fatalf("unable to wait for sync: %v", err)
}
}
// Using Carol as the source, pay to the 5 invoices from Bob created // Using Carol as the source, pay to the 5 invoices from Bob created
// above. // above.
@ -11856,6 +11864,16 @@ func testSwitchOfflineDelivery(net *lntest.NetworkHarness, t *harnessTest) {
t.Fatalf("htlc mismatch: %v", predErr) t.Fatalf("htlc mismatch: %v", predErr)
} }
// Make sure all nodes are fully synced again.
ctxt, cancel = context.WithTimeout(ctxb, defaultTimeout)
defer cancel()
for _, node := range nodes {
err := node.WaitForBlockchainSync(ctxt)
if err != nil {
t.Fatalf("unable to wait for sync: %v", err)
}
}
// Now that the settles have reached Dave, reconnect him with Alice, // Now that the settles have reached Dave, reconnect him with Alice,
// allowing the settles to return to the sender. // allowing the settles to return to the sender.
ctxt, _ = context.WithTimeout(ctxb, defaultTimeout) ctxt, _ = context.WithTimeout(ctxb, defaultTimeout)

@ -155,6 +155,7 @@
<time> [ERR] NTFN: unable to get hash from block with height 790 <time> [ERR] NTFN: unable to get hash from block with height 790
<time> [ERR] NTFN: unable to get missed blocks: starting height <height> is greater than ending height <height> <time> [ERR] NTFN: unable to get missed blocks: starting height <height> is greater than ending height <height>
<time> [ERR] NTFN: Unable to rewind chain from height <height> to height <height>: unable to find blockhash for disconnected height=<height>: -1: Block number out of range <time> [ERR] NTFN: Unable to rewind chain from height <height> to height <height>: unable to find blockhash for disconnected height=<height>: -1: Block number out of range
<time> [ERR] NTFN: Unable to rewind chain from height <height> to height <height>: unable to find blockhash for disconnected height=<height>: -8: Block height out of range
<time> [ERR] NTNF: unable to get hash from block with height <height> <time> [ERR] NTNF: unable to get hash from block with height <height>
<time> [ERR] PEER: Allowed test error from <ip> (inbound): ReadMessage: unhandled command [sendaddrv2] <time> [ERR] PEER: Allowed test error from <ip> (inbound): ReadMessage: unhandled command [sendaddrv2]
<time> [ERR] PEER: resend failed: unable to fetch channel sync messages for peer <hex>@<ip>: unable to find closed channel summary <time> [ERR] PEER: resend failed: unable to fetch channel sync messages for peer <hex>@<ip>: unable to find closed channel summary

@ -10,7 +10,7 @@ s/HTLC ID = [[:digit:]]+/HTLC ID = <id>/g
s/height=[[:digit:]]+/height=<height>/g s/height=[[:digit:]]+/height=<height>/g
s/collecting result for shard [[:digit:]]+/collecting result for shard <number>/g s/collecting result for shard [[:digit:]]+/collecting result for shard <number>/g
s/sending attempt [[:digit:]]+/sending attempt <number>/g s/sending attempt [[:digit:]]+/sending attempt <number>/g
s/Unable to rewind chain from height [[:digit:]]+ to height [[:digit:]]+/Unable to rewind chain from height <height> to height <height>/g s/Unable to rewind chain from height [[:digit:]]+ to height -?[[:digit:]]+/Unable to rewind chain from height <height> to height <height>/g
s/NTFN: unable to get missed blocks: starting height [[:digit:]]+ is greater than ending height [[:digit:]]+/NTFN: unable to get missed blocks: starting height <height> is greater than ending height <height>/g s/NTFN: unable to get missed blocks: starting height [[:digit:]]+ is greater than ending height [[:digit:]]+/NTFN: unable to get missed blocks: starting height <height> is greater than ending height <height>/g
s/BTCN: Broadcast attempt failed: rejected by <ip>: replacement transaction <hex> has an insufficient fee rate: needs more than [[:digit:]]+, has [[:digit:]]+/BTCN: Broadcast attempt failed: rejected by <ip>: replacement transaction <hex> has an insufficient fee rate: needs more than <amt>, has <amt>/g s/BTCN: Broadcast attempt failed: rejected by <ip>: replacement transaction <hex> has an insufficient fee rate: needs more than [[:digit:]]+, has [[:digit:]]+/BTCN: Broadcast attempt failed: rejected by <ip>: replacement transaction <hex> has an insufficient fee rate: needs more than <amt>, has <amt>/g
s/pid=[[:digit:]]+/pid=<pid>/g s/pid=[[:digit:]]+/pid=<pid>/g

@ -563,6 +563,14 @@ func (r *ChannelRouter) Start() error {
} }
} }
// The graph pruning might have taken a while and there could be
// new blocks available.
bestHash, bestHeight, err = r.cfg.Chain.GetBestBlock()
if err != nil {
return err
}
r.bestHeight = uint32(bestHeight)
// Before we begin normal operation of the router, we first need // Before we begin normal operation of the router, we first need
// to synchronize the channel graph to the latest state of the // to synchronize the channel graph to the latest state of the
// UTXO set. // UTXO set.
@ -2479,6 +2487,13 @@ func (r *ChannelRouter) CurrentBlockHeight() (uint32, error) {
return uint32(height), err return uint32(height), err
} }
// SyncedHeight returns the block height to which the router subsystem currently
// is synced to. This can differ from the above chain height if the goroutine
// responsible for processing the blocks isn't yet up to speed.
func (r *ChannelRouter) SyncedHeight() uint32 {
return atomic.LoadUint32(&r.bestHeight)
}
// GetChannelByID return the channel by the channel id. // GetChannelByID return the channel by the channel id.
// //
// NOTE: This method is part of the ChannelGraphSource interface. // NOTE: This method is part of the ChannelGraphSource interface.

@ -2507,6 +2507,17 @@ func (r *rpcServer) GetInfo(ctx context.Context,
"with current best block in the main chain: %v", err) "with current best block in the main chain: %v", err)
} }
// The router has a lot of work to do for each block. So it might be
// possible that it isn't yet up to date with the most recent block,
// even if the wallet is. This can happen in environments with high CPU
// load (such as parallel itests). Since the `synced_to_chain` flag in
// the response of this call is used by many wallets (and also our
// itests) to make sure everything's up to date, we add the router's
// state to it. So the flag will only toggle to true once the router was
// also able to catch up.
routerHeight := r.server.chanRouter.SyncedHeight()
isSynced = isSynced && uint32(bestHeight) == routerHeight
network := lncfg.NormalizeNetwork(r.cfg.ActiveNetParams.Name) network := lncfg.NormalizeNetwork(r.cfg.ActiveNetParams.Name)
activeChains := make([]*lnrpc.Chain, r.cfg.registeredChains.NumActiveChains()) activeChains := make([]*lnrpc.Chain, r.cfg.registeredChains.NumActiveChains())
for i, chain := range r.cfg.registeredChains.ActiveChains() { for i, chain := range r.cfg.registeredChains.ActiveChains() {