Merge pull request #1349 from Roasbeef/server-fixes-itest-stability

test+server: finalize inbound/outbound fixes, update itests to account for fixes
This commit is contained in:
Olaoluwa Osuntokun 2018-06-12 12:32:55 -07:00 committed by GitHub
commit c3191a7728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 136 additions and 125 deletions

@ -1233,7 +1233,7 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates
}
log.Infof("Creating new gossipSyncer for peer=%x",
nodeID)
nodeID[:])
syncer := newGossiperSyncer(gossipSyncerCfg{
chainHash: d.cfg.ChainHash,

@ -202,8 +202,8 @@ type Switch struct {
forwardingIndex map[lnwire.ShortChannelID]ChannelLink
// interfaceIndex maps the compressed public key of a peer to all the
// channels that the switch maintains iwht that peer.
interfaceIndex map[[33]byte]map[ChannelLink]struct{}
// channels that the switch maintains with that peer.
interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink
// htlcPlex is the channel which all connected links use to coordinate
// the setup/teardown of Sphinx (onion routing) payment circuits.
@ -253,7 +253,7 @@ func New(cfg Config) (*Switch, error) {
linkIndex: make(map[lnwire.ChannelID]ChannelLink),
mailOrchestrator: newMailOrchestrator(),
forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink),
interfaceIndex: make(map[[33]byte]map[ChannelLink]struct{}),
interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink),
pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink),
pendingPayments: make(map[uint64]*pendingPayment),
htlcPlex: make(chan *plexPacket),
@ -1723,11 +1723,11 @@ func (s *Switch) AddLink(link ChannelLink) error {
chanID := link.ChanID()
// First, ensure that this link is not already active in the switch.
// If a link already exists, then remove the prior one so we can
// replace it with this fresh instance.
_, err := s.getLink(chanID)
if err == nil {
return fmt.Errorf("unable to add ChannelLink(%v), already "+
"active", chanID)
s.removeLink(chanID)
}
// Get and attach the mailbox for this link, which buffers packets in
@ -1774,9 +1774,9 @@ func (s *Switch) addLiveLink(link ChannelLink) {
// quickly look up all the channels for a particular node.
peerPub := link.Peer().PubKey()
if _, ok := s.interfaceIndex[peerPub]; !ok {
s.interfaceIndex[peerPub] = make(map[ChannelLink]struct{})
s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink)
}
s.interfaceIndex[peerPub][link] = struct{}{}
s.interfaceIndex[peerPub][link.ChanID()] = link
}
// GetLink is used to initiate the handling of the get link command. The
@ -1840,9 +1840,18 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error {
delete(s.linkIndex, link.ChanID())
delete(s.forwardingIndex, link.ShortChanID())
// Remove the channel from channel index.
// If the link has been added to the peer index, then we'll move to
// delete the entry within the index.
peerPub := link.Peer().PubKey()
delete(s.interfaceIndex, peerPub)
if peerIndex, ok := s.interfaceIndex[peerPub]; ok {
delete(peerIndex, link.ChanID())
// If after deletion, there are no longer any links, then we'll
// remove the interface map all together.
if len(peerIndex) == 0 {
delete(s.interfaceIndex, peerPub)
}
}
link.Stop()
@ -1918,7 +1927,7 @@ func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) {
}
channelLinks := make([]ChannelLink, 0, len(links))
for link := range links {
for _, link := range links {
channelLinks = append(channelLinks, link)
}

@ -25,65 +25,6 @@ func genPreimage() ([32]byte, error) {
return preimage, nil
}
// TestSwitchAddDuplicateLink tests that the switch will reject duplicate links
// for both pending and live links. It also tests that we can successfully
// add a link after having removed it.
func TestSwitchAddDuplicateLink(t *testing.T) {
t.Parallel()
alicePeer, err := newMockServer(t, "alice", nil)
if err != nil {
t.Fatalf("unable to create alice server: %v", err)
}
s, err := initSwitchWithDB(nil)
if err != nil {
t.Fatalf("unable to init switch: %v", err)
}
if err := s.Start(); err != nil {
t.Fatalf("unable to start switch: %v", err)
}
defer s.Stop()
chanID1, _, aliceChanID, _ := genIDs()
pendingChanID := lnwire.ShortChannelID{}
aliceChannelLink := newMockChannelLink(
s, chanID1, pendingChanID, alicePeer, false,
)
if err := s.AddLink(aliceChannelLink); err != nil {
t.Fatalf("unable to add alice link: %v", err)
}
// Alice should have a pending link, adding again should fail.
if err := s.AddLink(aliceChannelLink); err == nil {
t.Fatalf("adding duplicate link should have failed")
}
// Update the short chan id of the channel, so that the link goes live.
aliceChannelLink.setLiveShortChanID(aliceChanID)
err = s.UpdateShortChanID(chanID1)
if err != nil {
t.Fatalf("unable to update alice short_chan_id: %v", err)
}
// Alice should have a live link, adding again should fail.
if err := s.AddLink(aliceChannelLink); err == nil {
t.Fatalf("adding duplicate link should have failed")
}
// Remove the live link to ensure the indexes are cleared.
if err := s.RemoveLink(chanID1); err != nil {
t.Fatalf("unable to remove alice link: %v", err)
}
// Alice has no links, adding should succeed.
if err := s.AddLink(aliceChannelLink); err != nil {
t.Fatalf("unable to add alice link: %v", err)
}
}
// TestSwitchSendPending checks the inability of htlc switch to forward adds
// over pending links, and the UpdateShortChanID makes a pending link live.
func TestSwitchSendPending(t *testing.T) {

@ -5105,11 +5105,20 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness
// broadcasting his current channel state. This is actually the
// commitment transaction of a prior *revoked* state, so he'll soon
// feel the wrath of Alice's retribution.
force := true
closeUpdates, closeTxId, err := net.CloseChannel(ctxb, carol,
chanPoint, force)
var (
closeUpdates lnrpc.Lightning_CloseChannelClient
closeTxId *chainhash.Hash
closeErr error
force bool = true
)
err = lntest.WaitPredicate(func() bool {
closeUpdates, closeTxId, closeErr = net.CloseChannel(
ctxb, carol, chanPoint, force,
)
return closeErr == nil
}, time.Second*15)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
t.Fatalf("unable to close channel: %v", closeErr)
}
// Query the mempool for the breaching closing transaction, this should
@ -6787,6 +6796,27 @@ func assertActiveHtlcs(nodes []*lntest.HarnessNode, payHashes ...[]byte) error {
return nil
}
func assertNumActiveHtlcsChanPoint(node *lntest.HarnessNode,
chanPoint wire.OutPoint, numHtlcs int) bool {
req := &lnrpc.ListChannelsRequest{}
ctxb := context.Background()
nodeChans, err := node.ListChannels(ctxb, req)
if err != nil {
return false
}
for _, channel := range nodeChans.Channels {
if channel.ChannelPoint != chanPoint.String() {
continue
}
return len(channel.PendingHtlcs) == numHtlcs
}
return false
}
func assertNumActiveHtlcs(nodes []*lntest.HarnessNode, numHtlcs int) bool {
req := &lnrpc.ListChannelsRequest{}
ctxb := context.Background()
@ -9079,11 +9109,11 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("htlc mismatch: %v", err)
}
// Disconnect the two intermediaries, Alice and Dave, so that when carol
// restarts, the response will be held by Dave.
// Disconnect the two intermediaries, Alice and Dave, by shutting down
// Alice.
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil {
t.Fatalf("unable to disconnect alice from dave: %v", err)
if err := net.StopNode(net.Alice); err != nil {
t.Fatalf("unable to shutdown alice: %v", err)
}
// Now restart carol without hodl mode, to settle back the outstanding
@ -9101,10 +9131,12 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("unable to reconnect dave and carol: %v", err)
}
// Wait for Carol to report no outstanding htlcs.
// Wait for Carol to report no outstanding htlcs, and also for Dav to
// receive all the settles from Carol.
carolNode := []*lntest.HarnessNode{carol}
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(carolNode, 0)
return assertNumActiveHtlcs(carolNode, 0) &&
assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0)
}, time.Second*15)
if err != nil {
t.Fatalf("htlc mismatch: %v", err)
@ -9113,7 +9145,10 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
// Finally, restart dave who received the settles, but was unable to
// deliver them to Alice since they were disconnected.
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to reconnect alice to dave: %v", err)
t.Fatalf("unable to restart dave: %v", err)
}
if err = net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart alice: %v", err)
}
// Force Dave and Alice to reconnect before waiting for the htlcs to
@ -9124,8 +9159,8 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness
t.Fatalf("unable to reconnect dave and carol: %v", err)
}
// After reconnection succeeds, the settles should be propagated all the
// way back to the sender. All nodes should report no active htlcs.
// After reconnection succeeds, the settles should be propagated all
// the way back to the sender. All nodes should report no active htlcs.
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(nodes, 0)
}, time.Second*15)
@ -9410,8 +9445,8 @@ func testSwitchOfflineDeliveryOutgoingOffline(
// Disconnect the two intermediaries, Alice and Dave, so that when carol
// restarts, the response will be held by Dave.
ctxt, _ = context.WithTimeout(ctxb, timeout)
if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil {
t.Fatalf("unable to disconnect alice from dave: %v", err)
if err := net.StopNode(net.Alice); err != nil {
t.Fatalf("unable to shutdown alice: %v", err)
}
// Now restart carol without hodl mode, to settle back the outstanding
@ -9424,7 +9459,8 @@ func testSwitchOfflineDeliveryOutgoingOffline(
// Wait for Carol to report no outstanding htlcs.
carolNode := []*lntest.HarnessNode{carol}
err = lntest.WaitPredicate(func() bool {
return assertNumActiveHtlcs(carolNode, 0)
return assertNumActiveHtlcs(carolNode, 0) &&
assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0)
}, time.Second*15)
if err != nil {
t.Fatalf("htlc mismatch: %v", err)
@ -9451,6 +9487,9 @@ func testSwitchOfflineDeliveryOutgoingOffline(
if err := net.RestartNode(dave, nil); err != nil {
t.Fatalf("unable to restart dave: %v", err)
}
if err = net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("unable to restart alice: %v", err)
}
// Ensure that Dave is reconnected to Alice before waiting for the htlcs
// to clear.

@ -566,6 +566,13 @@ func (n *NetworkHarness) ShutdownNode(node *HarnessNode) error {
return nil
}
// StopNode stops the target node, but doesn't yet clean up its directories.
// This can be used to temporarily bring a node down during a test, to be later
// started up again.
func (n *NetworkHarness) StopNode(node *HarnessNode) error {
return node.stop()
}
// TODO(roasbeef): add a WithChannel higher-order function?
// * python-like context manager w.r.t using a channel within a test
// * possibly adds more funds to the target wallet if the funds are not

@ -1332,7 +1332,7 @@ type LightningChannel struct {
cowg sync.WaitGroup
wg sync.WaitGroup
quit chan struct{}
quit chan struct{}
}
// NewLightningChannel creates a new, active payment channel given an
@ -3184,7 +3184,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg(
// revocation, but also initiate a state transition to re-sync
// them.
if !lc.FullySynced() {
commitSig, htlcSigs, err := lc.SignNextCommitment()
switch {

18
peer.go

@ -324,10 +324,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
peerLog.Infof("NodeKey(%x) loading ChannelPoint(%v)",
p.PubKey(), chanPoint)
@ -415,12 +411,18 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
}
// Create the link and add it to the switch.
err = p.addLink(chanPoint, lnChan, forwardingPolicy, blockEpoch,
chainEvents, currentHeight, true)
err = p.addLink(
chanPoint, lnChan, forwardingPolicy, blockEpoch,
chainEvents, currentHeight, true,
)
if err != nil {
lnChan.Stop()
return err
}
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
}
return nil
@ -1527,8 +1529,8 @@ out:
chainEvents, currentHeight, false)
if err != nil {
peerLog.Errorf("can't register new channel "+
"link(%v) with NodeKey(%x)", chanPoint,
p.PubKey())
"link(%v) with NodeKey(%x): %v", chanPoint,
p.PubKey(), err)
}
close(newChanReq.done)

@ -1384,9 +1384,11 @@ func (s *server) peerTerminationWatcher(p *peer) {
// available for use.
s.fundingMgr.CancelPeerReservations(p.PubKey())
pubKey := p.addr.IdentityKey
// We'll also inform the gossiper that this peer is no longer active,
// so we don't need to maintain sync state for it any longer.
s.authGossiper.PruneSyncState(p.addr.IdentityKey)
s.authGossiper.PruneSyncState(pubKey)
// Tell the switch to remove all links associated with this peer.
// Passing nil as the target link indicates that all links associated
@ -1435,7 +1437,7 @@ func (s *server) peerTerminationWatcher(p *peer) {
s.removePeer(p)
// Next, check to see if this is a persistent peer or not.
pubStr := string(p.addr.IdentityKey.SerializeCompressed())
pubStr := string(pubKey.SerializeCompressed())
_, ok := s.persistentPeers[pubStr]
if ok {
// We'll only need to re-launch a connection request if one
@ -1444,6 +1446,23 @@ func (s *server) peerTerminationWatcher(p *peer) {
return
}
// We'll ensure that we locate an advertised address to use
// within the peer's address for reconnection purposes.
//
// TODO(roasbeef): use them all?
if p.inbound {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(
pubKey,
)
if err != nil {
srvrLog.Errorf("Unable to retrieve advertised "+
"address for node %x: %v",
pubKey.SerializeCompressed(), err)
} else {
p.addr.Address = advertisedAddr
}
}
// Otherwise, we'll launch a new connection request in order to
// attempt to maintain a persistent connection with this peer.
connReq := &connmgr.ConnReq{
@ -1526,20 +1545,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
addr := conn.RemoteAddr()
pubKey := brontideConn.RemotePub()
// We'll ensure that we locate an advertised address to use within the
// peer's address for reconnection purposes.
//
// TODO: leave the address field empty if there aren't any?
if inbound {
advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey)
if err != nil {
srvrLog.Errorf("Unable to retrieve advertised address "+
"for node %x: %v", pubKey.SerializeCompressed(),
err)
} else {
addr = advertisedAddr
}
}
srvrLog.Infof("finalizing connection to %x, inbound=%v",
pubKey.SerializeCompressed(), inbound)
peerAddr := &lnwire.NetAddress{
IdentityKey: pubKey,
@ -1618,10 +1625,12 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
s.mu.Lock()
defer s.mu.Unlock()
// If we already have an inbound connection to this peer, then ignore
// If we already have an outbound connection to this peer, then ignore
// this new connection.
if _, ok := s.inboundPeers[pubStr]; ok {
srvrLog.Debugf("Ignoring duplicate inbound connection")
if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have outbound connection for %v, "+
"ignoring inbound connection", nodePub.SerializeCompressed())
conn.Close()
return
}
@ -1637,12 +1646,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr())
// Cancel all pending connection requests, we either already have an
// outbound connection, or this incoming connection will become our
// primary connection. The incoming connection will not have an
// associated connection request, so we pass nil.
s.cancelConnReqs(pubStr, nil)
// Check to see if we already have a connection with this peer. If so,
// we may need to drop our existing connection. This prevents us from
// having duplicate connections to the same peer. We forgo adding a
@ -1653,6 +1656,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
case ErrPeerNotConnected:
// We were unable to locate an existing connection with the
// target peer, proceed to connect.
s.cancelConnReqs(pubStr, nil)
s.peerConnected(conn, nil, true)
case nil:
@ -1674,6 +1678,8 @@ func (s *server) InboundPeerConnected(conn net.Conn) {
srvrLog.Debugf("Disconnecting stale connection to %v",
connectedPeer)
s.cancelConnReqs(pubStr, nil)
// Remove the current peer from the server's internal state and
// signal that the peer termination watcher does not need to
// execute for this peer.
@ -1701,10 +1707,13 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
s.mu.Lock()
defer s.mu.Unlock()
// If we already have an outbound connection to this peer, then ignore
// If we already have an inbound connection to this peer, then ignore
// this new connection.
if _, ok := s.outboundPeers[pubStr]; ok {
srvrLog.Debugf("Ignoring duplicate outbound connection")
if _, ok := s.inboundPeers[pubStr]; ok {
srvrLog.Debugf("Already have inbound connection for %v, "+
"ignoring outbound connection",
nodePub.SerializeCompressed())
if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
@ -1723,6 +1732,11 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// ignore this connection.
if _, ok := s.scheduledPeerConnection[pubStr]; ok {
srvrLog.Debugf("Ignoring connection, peer already scheduled")
if connReq != nil {
s.connMgr.Remove(connReq.ID())
}
conn.Close()
return
}