diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 56bd7929..3819b589 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1233,7 +1233,7 @@ func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer, recvUpdates } log.Infof("Creating new gossipSyncer for peer=%x", - nodeID) + nodeID[:]) syncer := newGossiperSyncer(gossipSyncerCfg{ chainHash: d.cfg.ChainHash, diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index dcb6714d..91d6b0e1 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -202,8 +202,8 @@ type Switch struct { forwardingIndex map[lnwire.ShortChannelID]ChannelLink // interfaceIndex maps the compressed public key of a peer to all the - // channels that the switch maintains iwht that peer. - interfaceIndex map[[33]byte]map[ChannelLink]struct{} + // channels that the switch maintains with that peer. + interfaceIndex map[[33]byte]map[lnwire.ChannelID]ChannelLink // htlcPlex is the channel which all connected links use to coordinate // the setup/teardown of Sphinx (onion routing) payment circuits. @@ -253,7 +253,7 @@ func New(cfg Config) (*Switch, error) { linkIndex: make(map[lnwire.ChannelID]ChannelLink), mailOrchestrator: newMailOrchestrator(), forwardingIndex: make(map[lnwire.ShortChannelID]ChannelLink), - interfaceIndex: make(map[[33]byte]map[ChannelLink]struct{}), + interfaceIndex: make(map[[33]byte]map[lnwire.ChannelID]ChannelLink), pendingLinkIndex: make(map[lnwire.ChannelID]ChannelLink), pendingPayments: make(map[uint64]*pendingPayment), htlcPlex: make(chan *plexPacket), @@ -1723,11 +1723,11 @@ func (s *Switch) AddLink(link ChannelLink) error { chanID := link.ChanID() - // First, ensure that this link is not already active in the switch. + // If a link already exists, then remove the prior one so we can + // replace it with this fresh instance. _, err := s.getLink(chanID) if err == nil { - return fmt.Errorf("unable to add ChannelLink(%v), already "+ - "active", chanID) + s.removeLink(chanID) } // Get and attach the mailbox for this link, which buffers packets in @@ -1774,9 +1774,9 @@ func (s *Switch) addLiveLink(link ChannelLink) { // quickly look up all the channels for a particular node. peerPub := link.Peer().PubKey() if _, ok := s.interfaceIndex[peerPub]; !ok { - s.interfaceIndex[peerPub] = make(map[ChannelLink]struct{}) + s.interfaceIndex[peerPub] = make(map[lnwire.ChannelID]ChannelLink) } - s.interfaceIndex[peerPub][link] = struct{}{} + s.interfaceIndex[peerPub][link.ChanID()] = link } // GetLink is used to initiate the handling of the get link command. The @@ -1840,9 +1840,18 @@ func (s *Switch) removeLink(chanID lnwire.ChannelID) error { delete(s.linkIndex, link.ChanID()) delete(s.forwardingIndex, link.ShortChanID()) - // Remove the channel from channel index. + // If the link has been added to the peer index, then we'll move to + // delete the entry within the index. peerPub := link.Peer().PubKey() - delete(s.interfaceIndex, peerPub) + if peerIndex, ok := s.interfaceIndex[peerPub]; ok { + delete(peerIndex, link.ChanID()) + + // If after deletion, there are no longer any links, then we'll + // remove the interface map all together. + if len(peerIndex) == 0 { + delete(s.interfaceIndex, peerPub) + } + } link.Stop() @@ -1918,7 +1927,7 @@ func (s *Switch) getLinks(destination [33]byte) ([]ChannelLink, error) { } channelLinks := make([]ChannelLink, 0, len(links)) - for link := range links { + for _, link := range links { channelLinks = append(channelLinks, link) } diff --git a/htlcswitch/switch_test.go b/htlcswitch/switch_test.go index 852aa654..82932d3c 100644 --- a/htlcswitch/switch_test.go +++ b/htlcswitch/switch_test.go @@ -25,65 +25,6 @@ func genPreimage() ([32]byte, error) { return preimage, nil } -// TestSwitchAddDuplicateLink tests that the switch will reject duplicate links -// for both pending and live links. It also tests that we can successfully -// add a link after having removed it. -func TestSwitchAddDuplicateLink(t *testing.T) { - t.Parallel() - - alicePeer, err := newMockServer(t, "alice", nil) - if err != nil { - t.Fatalf("unable to create alice server: %v", err) - } - - s, err := initSwitchWithDB(nil) - if err != nil { - t.Fatalf("unable to init switch: %v", err) - } - if err := s.Start(); err != nil { - t.Fatalf("unable to start switch: %v", err) - } - defer s.Stop() - - chanID1, _, aliceChanID, _ := genIDs() - - pendingChanID := lnwire.ShortChannelID{} - - aliceChannelLink := newMockChannelLink( - s, chanID1, pendingChanID, alicePeer, false, - ) - if err := s.AddLink(aliceChannelLink); err != nil { - t.Fatalf("unable to add alice link: %v", err) - } - - // Alice should have a pending link, adding again should fail. - if err := s.AddLink(aliceChannelLink); err == nil { - t.Fatalf("adding duplicate link should have failed") - } - - // Update the short chan id of the channel, so that the link goes live. - aliceChannelLink.setLiveShortChanID(aliceChanID) - err = s.UpdateShortChanID(chanID1) - if err != nil { - t.Fatalf("unable to update alice short_chan_id: %v", err) - } - - // Alice should have a live link, adding again should fail. - if err := s.AddLink(aliceChannelLink); err == nil { - t.Fatalf("adding duplicate link should have failed") - } - - // Remove the live link to ensure the indexes are cleared. - if err := s.RemoveLink(chanID1); err != nil { - t.Fatalf("unable to remove alice link: %v", err) - } - - // Alice has no links, adding should succeed. - if err := s.AddLink(aliceChannelLink); err != nil { - t.Fatalf("unable to add alice link: %v", err) - } -} - // TestSwitchSendPending checks the inability of htlc switch to forward adds // over pending links, and the UpdateShortChanID makes a pending link live. func TestSwitchSendPending(t *testing.T) { diff --git a/lnd_test.go b/lnd_test.go index 96ca8998..d9e4f2a2 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -5105,11 +5105,20 @@ func testRevokedCloseRetributionZeroValueRemoteOutput(net *lntest.NetworkHarness // broadcasting his current channel state. This is actually the // commitment transaction of a prior *revoked* state, so he'll soon // feel the wrath of Alice's retribution. - force := true - closeUpdates, closeTxId, err := net.CloseChannel(ctxb, carol, - chanPoint, force) + var ( + closeUpdates lnrpc.Lightning_CloseChannelClient + closeTxId *chainhash.Hash + closeErr error + force bool = true + ) + err = lntest.WaitPredicate(func() bool { + closeUpdates, closeTxId, closeErr = net.CloseChannel( + ctxb, carol, chanPoint, force, + ) + return closeErr == nil + }, time.Second*15) if err != nil { - t.Fatalf("unable to close channel: %v", err) + t.Fatalf("unable to close channel: %v", closeErr) } // Query the mempool for the breaching closing transaction, this should @@ -6787,6 +6796,27 @@ func assertActiveHtlcs(nodes []*lntest.HarnessNode, payHashes ...[]byte) error { return nil } +func assertNumActiveHtlcsChanPoint(node *lntest.HarnessNode, + chanPoint wire.OutPoint, numHtlcs int) bool { + + req := &lnrpc.ListChannelsRequest{} + ctxb := context.Background() + nodeChans, err := node.ListChannels(ctxb, req) + if err != nil { + return false + } + + for _, channel := range nodeChans.Channels { + if channel.ChannelPoint != chanPoint.String() { + continue + } + + return len(channel.PendingHtlcs) == numHtlcs + } + + return false +} + func assertNumActiveHtlcs(nodes []*lntest.HarnessNode, numHtlcs int) bool { req := &lnrpc.ListChannelsRequest{} ctxb := context.Background() @@ -9079,11 +9109,11 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness t.Fatalf("htlc mismatch: %v", err) } - // Disconnect the two intermediaries, Alice and Dave, so that when carol - // restarts, the response will be held by Dave. + // Disconnect the two intermediaries, Alice and Dave, by shutting down + // Alice. ctxt, _ = context.WithTimeout(ctxb, timeout) - if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil { - t.Fatalf("unable to disconnect alice from dave: %v", err) + if err := net.StopNode(net.Alice); err != nil { + t.Fatalf("unable to shutdown alice: %v", err) } // Now restart carol without hodl mode, to settle back the outstanding @@ -9101,10 +9131,12 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness t.Fatalf("unable to reconnect dave and carol: %v", err) } - // Wait for Carol to report no outstanding htlcs. + // Wait for Carol to report no outstanding htlcs, and also for Dav to + // receive all the settles from Carol. carolNode := []*lntest.HarnessNode{carol} err = lntest.WaitPredicate(func() bool { - return assertNumActiveHtlcs(carolNode, 0) + return assertNumActiveHtlcs(carolNode, 0) && + assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0) }, time.Second*15) if err != nil { t.Fatalf("htlc mismatch: %v", err) @@ -9113,7 +9145,10 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness // Finally, restart dave who received the settles, but was unable to // deliver them to Alice since they were disconnected. if err := net.RestartNode(dave, nil); err != nil { - t.Fatalf("unable to reconnect alice to dave: %v", err) + t.Fatalf("unable to restart dave: %v", err) + } + if err = net.RestartNode(net.Alice, nil); err != nil { + t.Fatalf("unable to restart alice: %v", err) } // Force Dave and Alice to reconnect before waiting for the htlcs to @@ -9124,8 +9159,8 @@ func testSwitchOfflineDeliveryPersistence(net *lntest.NetworkHarness, t *harness t.Fatalf("unable to reconnect dave and carol: %v", err) } - // After reconnection succeeds, the settles should be propagated all the - // way back to the sender. All nodes should report no active htlcs. + // After reconnection succeeds, the settles should be propagated all + // the way back to the sender. All nodes should report no active htlcs. err = lntest.WaitPredicate(func() bool { return assertNumActiveHtlcs(nodes, 0) }, time.Second*15) @@ -9410,8 +9445,8 @@ func testSwitchOfflineDeliveryOutgoingOffline( // Disconnect the two intermediaries, Alice and Dave, so that when carol // restarts, the response will be held by Dave. ctxt, _ = context.WithTimeout(ctxb, timeout) - if err := net.DisconnectNodes(ctxt, dave, net.Alice); err != nil { - t.Fatalf("unable to disconnect alice from dave: %v", err) + if err := net.StopNode(net.Alice); err != nil { + t.Fatalf("unable to shutdown alice: %v", err) } // Now restart carol without hodl mode, to settle back the outstanding @@ -9424,7 +9459,8 @@ func testSwitchOfflineDeliveryOutgoingOffline( // Wait for Carol to report no outstanding htlcs. carolNode := []*lntest.HarnessNode{carol} err = lntest.WaitPredicate(func() bool { - return assertNumActiveHtlcs(carolNode, 0) + return assertNumActiveHtlcs(carolNode, 0) && + assertNumActiveHtlcsChanPoint(dave, carolFundPoint, 0) }, time.Second*15) if err != nil { t.Fatalf("htlc mismatch: %v", err) @@ -9451,6 +9487,9 @@ func testSwitchOfflineDeliveryOutgoingOffline( if err := net.RestartNode(dave, nil); err != nil { t.Fatalf("unable to restart dave: %v", err) } + if err = net.RestartNode(net.Alice, nil); err != nil { + t.Fatalf("unable to restart alice: %v", err) + } // Ensure that Dave is reconnected to Alice before waiting for the htlcs // to clear. diff --git a/lntest/harness.go b/lntest/harness.go index 600c7b02..da462e94 100644 --- a/lntest/harness.go +++ b/lntest/harness.go @@ -566,6 +566,13 @@ func (n *NetworkHarness) ShutdownNode(node *HarnessNode) error { return nil } +// StopNode stops the target node, but doesn't yet clean up its directories. +// This can be used to temporarily bring a node down during a test, to be later +// started up again. +func (n *NetworkHarness) StopNode(node *HarnessNode) error { + return node.stop() +} + // TODO(roasbeef): add a WithChannel higher-order function? // * python-like context manager w.r.t using a channel within a test // * possibly adds more funds to the target wallet if the funds are not diff --git a/lnwallet/channel.go b/lnwallet/channel.go index 7cb3ce62..50bd723b 100644 --- a/lnwallet/channel.go +++ b/lnwallet/channel.go @@ -1332,7 +1332,7 @@ type LightningChannel struct { cowg sync.WaitGroup wg sync.WaitGroup - quit chan struct{} + quit chan struct{} } // NewLightningChannel creates a new, active payment channel given an @@ -3184,7 +3184,6 @@ func (lc *LightningChannel) ProcessChanSyncMsg( // revocation, but also initiate a state transition to re-sync // them. if !lc.FullySynced() { - commitSig, htlcSigs, err := lc.SignNextCommitment() switch { diff --git a/peer.go b/peer.go index ba9642ea..f21ece2e 100644 --- a/peer.go +++ b/peer.go @@ -324,10 +324,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { chanID := lnwire.NewChanIDFromOutPoint(chanPoint) - p.activeChanMtx.Lock() - p.activeChannels[chanID] = lnChan - p.activeChanMtx.Unlock() - peerLog.Infof("NodeKey(%x) loading ChannelPoint(%v)", p.PubKey(), chanPoint) @@ -415,12 +411,18 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { } // Create the link and add it to the switch. - err = p.addLink(chanPoint, lnChan, forwardingPolicy, blockEpoch, - chainEvents, currentHeight, true) + err = p.addLink( + chanPoint, lnChan, forwardingPolicy, blockEpoch, + chainEvents, currentHeight, true, + ) if err != nil { lnChan.Stop() return err } + + p.activeChanMtx.Lock() + p.activeChannels[chanID] = lnChan + p.activeChanMtx.Unlock() } return nil @@ -1527,8 +1529,8 @@ out: chainEvents, currentHeight, false) if err != nil { peerLog.Errorf("can't register new channel "+ - "link(%v) with NodeKey(%x)", chanPoint, - p.PubKey()) + "link(%v) with NodeKey(%x): %v", chanPoint, + p.PubKey(), err) } close(newChanReq.done) diff --git a/server.go b/server.go index def6c6a7..5f0becd3 100644 --- a/server.go +++ b/server.go @@ -1384,9 +1384,11 @@ func (s *server) peerTerminationWatcher(p *peer) { // available for use. s.fundingMgr.CancelPeerReservations(p.PubKey()) + pubKey := p.addr.IdentityKey + // We'll also inform the gossiper that this peer is no longer active, // so we don't need to maintain sync state for it any longer. - s.authGossiper.PruneSyncState(p.addr.IdentityKey) + s.authGossiper.PruneSyncState(pubKey) // Tell the switch to remove all links associated with this peer. // Passing nil as the target link indicates that all links associated @@ -1435,7 +1437,7 @@ func (s *server) peerTerminationWatcher(p *peer) { s.removePeer(p) // Next, check to see if this is a persistent peer or not. - pubStr := string(p.addr.IdentityKey.SerializeCompressed()) + pubStr := string(pubKey.SerializeCompressed()) _, ok := s.persistentPeers[pubStr] if ok { // We'll only need to re-launch a connection request if one @@ -1444,6 +1446,23 @@ func (s *server) peerTerminationWatcher(p *peer) { return } + // We'll ensure that we locate an advertised address to use + // within the peer's address for reconnection purposes. + // + // TODO(roasbeef): use them all? + if p.inbound { + advertisedAddr, err := s.fetchNodeAdvertisedAddr( + pubKey, + ) + if err != nil { + srvrLog.Errorf("Unable to retrieve advertised "+ + "address for node %x: %v", + pubKey.SerializeCompressed(), err) + } else { + p.addr.Address = advertisedAddr + } + } + // Otherwise, we'll launch a new connection request in order to // attempt to maintain a persistent connection with this peer. connReq := &connmgr.ConnReq{ @@ -1526,20 +1545,8 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, addr := conn.RemoteAddr() pubKey := brontideConn.RemotePub() - // We'll ensure that we locate an advertised address to use within the - // peer's address for reconnection purposes. - // - // TODO: leave the address field empty if there aren't any? - if inbound { - advertisedAddr, err := s.fetchNodeAdvertisedAddr(pubKey) - if err != nil { - srvrLog.Errorf("Unable to retrieve advertised address "+ - "for node %x: %v", pubKey.SerializeCompressed(), - err) - } else { - addr = advertisedAddr - } - } + srvrLog.Infof("finalizing connection to %x, inbound=%v", + pubKey.SerializeCompressed(), inbound) peerAddr := &lnwire.NetAddress{ IdentityKey: pubKey, @@ -1618,10 +1625,12 @@ func (s *server) InboundPeerConnected(conn net.Conn) { s.mu.Lock() defer s.mu.Unlock() - // If we already have an inbound connection to this peer, then ignore + // If we already have an outbound connection to this peer, then ignore // this new connection. - if _, ok := s.inboundPeers[pubStr]; ok { - srvrLog.Debugf("Ignoring duplicate inbound connection") + if _, ok := s.outboundPeers[pubStr]; ok { + srvrLog.Debugf("Already have outbound connection for %v, "+ + "ignoring inbound connection", nodePub.SerializeCompressed()) + conn.Close() return } @@ -1637,12 +1646,6 @@ func (s *server) InboundPeerConnected(conn net.Conn) { srvrLog.Infof("New inbound connection from %v", conn.RemoteAddr()) - // Cancel all pending connection requests, we either already have an - // outbound connection, or this incoming connection will become our - // primary connection. The incoming connection will not have an - // associated connection request, so we pass nil. - s.cancelConnReqs(pubStr, nil) - // Check to see if we already have a connection with this peer. If so, // we may need to drop our existing connection. This prevents us from // having duplicate connections to the same peer. We forgo adding a @@ -1653,6 +1656,7 @@ func (s *server) InboundPeerConnected(conn net.Conn) { case ErrPeerNotConnected: // We were unable to locate an existing connection with the // target peer, proceed to connect. + s.cancelConnReqs(pubStr, nil) s.peerConnected(conn, nil, true) case nil: @@ -1674,6 +1678,8 @@ func (s *server) InboundPeerConnected(conn net.Conn) { srvrLog.Debugf("Disconnecting stale connection to %v", connectedPeer) + s.cancelConnReqs(pubStr, nil) + // Remove the current peer from the server's internal state and // signal that the peer termination watcher does not need to // execute for this peer. @@ -1701,10 +1707,13 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) s.mu.Lock() defer s.mu.Unlock() - // If we already have an outbound connection to this peer, then ignore + // If we already have an inbound connection to this peer, then ignore // this new connection. - if _, ok := s.outboundPeers[pubStr]; ok { - srvrLog.Debugf("Ignoring duplicate outbound connection") + if _, ok := s.inboundPeers[pubStr]; ok { + srvrLog.Debugf("Already have inbound connection for %v, "+ + "ignoring outbound connection", + nodePub.SerializeCompressed()) + if connReq != nil { s.connMgr.Remove(connReq.ID()) } @@ -1723,6 +1732,11 @@ func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // ignore this connection. if _, ok := s.scheduledPeerConnection[pubStr]; ok { srvrLog.Debugf("Ignoring connection, peer already scheduled") + + if connReq != nil { + s.connMgr.Remove(connReq.ID()) + } + conn.Close() return }