peer: prune persistent peer connection on zero on-disk channels

In this commit, we fix a small bug with regards to the persistent peer
connection pruning logic. Before this commit, it'd be the case that we'd
prune a persistent connection to a peer if all links happen to be
inactive. This isn't ideal, as the channels are still open, so we should
always be atttempting to connect to them. We fix this by looking at the
set of channels on-disk instead and prune the persistent connection if
there aren't any.
This commit is contained in:
Wilmer Paulino 2018-09-05 18:22:29 -07:00
parent b2efbce1ac
commit ea51ec34b1
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
2 changed files with 17 additions and 22 deletions

26
peer.go

@ -299,11 +299,16 @@ func (p *peer) Start() error {
return err return err
} }
if len(activeChans) == 0 {
p.server.prunePersistentPeerConnection(p.pubKeyBytes)
}
// Next, load all the active channels we have with this peer, // Next, load all the active channels we have with this peer,
// registering them with the switch and launching the necessary // registering them with the switch and launching the necessary
// goroutines required to operate them. // goroutines required to operate them.
peerLog.Debugf("Loaded %v active channels from database with "+ peerLog.Debugf("Loaded %v active channels from database with "+
"NodeKey(%x)", len(activeChans), p.PubKey()) "NodeKey(%x)", len(activeChans), p.PubKey())
if err := p.loadActiveChannels(activeChans); err != nil { if err := p.loadActiveChannels(activeChans); err != nil {
return fmt.Errorf("unable to load channels: %v", err) return fmt.Errorf("unable to load channels: %v", err)
} }
@ -1993,6 +1998,7 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) {
go waitForChanToClose(chanCloser.negotiationHeight, notifier, errChan, go waitForChanToClose(chanCloser.negotiationHeight, notifier, errChan,
chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() { chanPoint, &closingTxid, closingTx.TxOut[0].PkScript, func() {
// Respond to the local subsystem which requested the // Respond to the local subsystem which requested the
// channel closure. // channel closure.
if closeReq != nil { if closeReq != nil {
@ -2005,18 +2011,6 @@ func (p *peer) finalizeChanClosure(chanCloser *channelCloser) {
}, },
} }
} }
// Remove the persistent connection to this peer if we
// no longer have open channels with them.
p.activeChanMtx.Lock()
numActiveChans := len(p.activeChannels)
p.activeChanMtx.Unlock()
if numActiveChans == 0 {
p.server.prunePersistentPeerConnection(
p.pubKeyBytes,
)
}
}) })
} }
@ -2061,19 +2055,15 @@ func waitForChanToClose(bestHeight uint32, notifier chainntnfs.ChainNotifier,
cb() cb()
} }
// WipeChannel removes the passed channel point from all indexes associated // WipeChannel removes the passed channel point from all indexes associated with
// with the peer, and the switch. // the peer, and the switch.
func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error { func (p *peer) WipeChannel(chanPoint *wire.OutPoint) error {
chanID := lnwire.NewChanIDFromOutPoint(chanPoint) chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
p.activeChanMtx.Lock() p.activeChanMtx.Lock()
if channel, ok := p.activeChannels[chanID]; ok { if channel, ok := p.activeChannels[chanID]; ok {
channel.Stop() channel.Stop()
delete(p.activeChannels, chanID) delete(p.activeChannels, chanID)
if len(p.activeChannels) == 0 {
p.server.prunePersistentPeerConnection(p.pubKeyBytes)
}
} }
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()

@ -272,21 +272,21 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
Packager: channeldb.NewChannelPackager(shortChanID), Packager: channeldb.NewChannelPackager(shortChanID),
} }
addr := &net.TCPAddr{ aliceAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"), IP: net.ParseIP("127.0.0.1"),
Port: 18555, Port: 18555,
} }
if err := aliceChannelState.SyncPending(addr, 0); err != nil { if err := aliceChannelState.SyncPending(aliceAddr, 0); err != nil {
return nil, nil, nil, nil, err return nil, nil, nil, nil, err
} }
addr = &net.TCPAddr{ bobAddr := &net.TCPAddr{
IP: net.ParseIP("127.0.0.1"), IP: net.ParseIP("127.0.0.1"),
Port: 18556, Port: 18556,
} }
if err := bobChannelState.SyncPending(addr, 0); err != nil { if err := bobChannelState.SyncPending(bobAddr, 0); err != nil {
return nil, nil, nil, nil, err return nil, nil, nil, nil, err
} }
@ -363,6 +363,11 @@ func createTestPeer(notifier chainntnfs.ChainNotifier,
s.htlcSwitch.Start() s.htlcSwitch.Start()
alicePeer := &peer{ alicePeer := &peer{
addr: &lnwire.NetAddress{
IdentityKey: aliceKeyPub,
Address: aliceAddr,
},
server: s, server: s,
sendQueue: make(chan outgoingMsg, 1), sendQueue: make(chan outgoingMsg, 1),
outgoingQueue: make(chan outgoingMsg, outgoingQueueLen), outgoingQueue: make(chan outgoingMsg, outgoingQueueLen),