lnd: properly disconnect peer and clean up resources after critical errors

This commit is contained in:
Olaoluwa Osuntokun 2016-07-13 16:40:01 -07:00
parent 5912322995
commit 98fae7f329
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 52 additions and 4 deletions

@ -310,7 +310,9 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
// TODO(roasbeef): passing num confs 1 is irrelevant here, make signed? // TODO(roasbeef): passing num confs 1 is irrelevant here, make signed?
reservation, err := f.wallet.InitChannelReservation(amt, 0, fmsg.peer.lightningID, 1, delay) reservation, err := f.wallet.InitChannelReservation(amt, 0, fmsg.peer.lightningID, 1, delay)
if err != nil { if err != nil {
// TODO(roasbeef): push ErrorGeneric message
fndgLog.Errorf("Unable to initialize reservation: %v", err) fndgLog.Errorf("Unable to initialize reservation: %v", err)
fmsg.peer.Disconnect()
return return
} }
@ -343,6 +345,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
} }
if err := reservation.ProcessSingleContribution(contribution); err != nil { if err := reservation.ProcessSingleContribution(contribution); err != nil {
fndgLog.Errorf("unable to add contribution reservation: %v", err) fndgLog.Errorf("unable to add contribution reservation: %v", err)
fmsg.peer.Disconnect()
return return
} }
@ -403,6 +406,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
if err := resCtx.reservation.ProcessContribution(contribution); err != nil { if err := resCtx.reservation.ProcessContribution(contribution); err != nil {
fndgLog.Errorf("Unable to process contribution from %v: %v", fndgLog.Errorf("Unable to process contribution from %v: %v",
sourcePeer, err) sourcePeer, err)
fmsg.peer.Disconnect()
return return
} }
@ -465,6 +469,7 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
if err := resCtx.reservation.CompleteReservationSingle(revokeKey, fundingOut, commitSig); err != nil { if err := resCtx.reservation.CompleteReservationSingle(revokeKey, fundingOut, commitSig); err != nil {
// TODO(roasbeef): better error logging: peerID, channelID, etc. // TODO(roasbeef): better error logging: peerID, channelID, etc.
fndgLog.Errorf("unable to complete single reservation: %v", err) fndgLog.Errorf("unable to complete single reservation: %v", err)
fmsg.peer.Disconnect()
return return
} }
@ -514,6 +519,7 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
commitSig := append(fmsg.msg.CommitSignature.Serialize(), byte(txscript.SigHashAll)) commitSig := append(fmsg.msg.CommitSignature.Serialize(), byte(txscript.SigHashAll))
if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil { if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil {
fndgLog.Errorf("unable to complete reservation sign complete: %v", err) fndgLog.Errorf("unable to complete reservation sign complete: %v", err)
fmsg.peer.Disconnect()
return return
} }
@ -594,6 +600,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
openChan, err := resCtx.reservation.FinalizeReservation() openChan, err := resCtx.reservation.FinalizeReservation()
if err != nil { if err != nil {
fndgLog.Errorf("unable to finalize reservation: %v", err) fndgLog.Errorf("unable to finalize reservation: %v", err)
fmsg.peer.Disconnect()
return return
} }

49
peer.go

@ -275,6 +275,32 @@ func (p *peer) Stop() error {
return nil return nil
} }
// Disconnect terminates the connection with the remote peer. Additionally, a
// signal is sent to the server and htlcSwitch indicating the resources
// allocated to the peer can now be cleaned up.
func (p *peer) Disconnect() {
if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
return
}
peerLog.Tracef("Disconnecting %s", p)
if atomic.LoadInt32(&p.connected) != 0 {
p.conn.Close()
}
close(p.quit)
// Launch a goroutine to clean up the remaining resources.
go func() {
// Tell the switch to unregister all links associated with this
// peer. Passing nil as the target link indicates that all links
// associated with this interface should be closed.
p.server.htlcSwitch.UnregisterLink(p.lightningID, nil)
p.server.donePeers <- p
}()
}
// String returns the string representation of this peer. // String returns the string representation of this peer.
func (p *peer) String() string { func (p *peer) String() string {
return p.conn.RemoteAddr().String() return p.conn.RemoteAddr().String()
@ -376,7 +402,10 @@ out:
} }
} }
p.Disconnect()
p.wg.Done() p.wg.Done()
peerLog.Tracef("readHandler for peer %v done", p)
} }
// writeMessage writes the target lnwire.Message to the remote peer. // writeMessage writes the target lnwire.Message to the remote peer.
@ -416,8 +445,9 @@ out:
} }
if err := p.writeMessage(outMsg.msg); err != nil { if err := p.writeMessage(outMsg.msg); err != nil {
// TODO(roasbeef): disconnect
peerLog.Errorf("unable to write message: %v", err) peerLog.Errorf("unable to write message: %v", err)
p.Disconnect()
break out
} }
// Synchronize with the writeHandler. // Synchronize with the writeHandler.
@ -447,6 +477,7 @@ fin:
} }
} }
p.wg.Done() p.wg.Done()
peerLog.Tracef("writeHandler for peer %v done", p)
} }
// queueHandler is responsible for accepting messages from outside sub-systems // queueHandler is responsible for accepting messages from outside sub-systems
@ -520,12 +551,14 @@ out:
snapshots = append(snapshots, snapshot) snapshots = append(snapshots, snapshot)
} }
req.resp <- snapshots req.resp <- snapshots
case pendingChanPoint := <-p.barrierInits: case pendingChanPoint := <-p.barrierInits:
p.barrierMtx.Lock() p.barrierMtx.Lock()
peerLog.Tracef("Creating chan barrier for "+ peerLog.Tracef("Creating chan barrier for "+
"ChannelPoint(%v)", pendingChanPoint) "ChannelPoint(%v)", pendingChanPoint)
p.newChanBarriers[pendingChanPoint] = make(chan struct{}) p.newChanBarriers[pendingChanPoint] = make(chan struct{})
p.barrierMtx.Unlock() p.barrierMtx.Unlock()
case newChan := <-p.newChannels: case newChan := <-p.newChannels:
chanPoint := *newChan.ChannelPoint() chanPoint := *newChan.ChannelPoint()
p.activeChannels[chanPoint] = newChan p.activeChannels[chanPoint] = newChan
@ -556,10 +589,13 @@ out:
close(p.newChanBarriers[chanPoint]) close(p.newChanBarriers[chanPoint])
delete(p.newChanBarriers, chanPoint) delete(p.newChanBarriers, chanPoint)
p.barrierMtx.Unlock() p.barrierMtx.Unlock()
case req := <-p.localCloseChanReqs: case req := <-p.localCloseChanReqs:
p.handleLocalClose(req) p.handleLocalClose(req)
case req := <-p.remoteCloseChanReqs: case req := <-p.remoteCloseChanReqs:
p.handleRemoteClose(req) p.handleRemoteClose(req)
case <-p.quit: case <-p.quit:
break out break out
} }
@ -799,8 +835,10 @@ out:
// TODO(roasbeef): this assumes no "multi-sig" // TODO(roasbeef): this assumes no "multi-sig"
pre := htlcPkt.RedemptionProofs[0] pre := htlcPkt.RedemptionProofs[0]
if _, err := channel.SettleHTLC(pre, true); err != nil { if _, err := channel.SettleHTLC(pre, true); err != nil {
// TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) peerLog.Errorf("settle for outgoing HTLC rejected: %v", err)
continue p.Disconnect()
break out
} }
case *lnwire.CommitSignature: case *lnwire.CommitSignature:
// We just received a new update to our local // We just received a new update to our local
@ -811,7 +849,8 @@ out:
sig := htlcPkt.CommitSig.Serialize() sig := htlcPkt.CommitSig.Serialize()
if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil { if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil {
peerLog.Errorf("unable to accept new commitment: %v", err) peerLog.Errorf("unable to accept new commitment: %v", err)
continue p.Disconnect()
break out
} }
// If we didn't initiate this state transition, // If we didn't initiate this state transition,
@ -847,7 +886,8 @@ out:
htlcsToForward, err := channel.ReceiveRevocation(htlcPkt) htlcsToForward, err := channel.ReceiveRevocation(htlcPkt)
if err != nil { if err != nil {
peerLog.Errorf("unable to accept revocation: %v", err) peerLog.Errorf("unable to accept revocation: %v", err)
continue p.Disconnect()
break out
} }
// TODO(roasbeef): send the locked-in HTLC's // TODO(roasbeef): send the locked-in HTLC's
// over the plex chan to the switch. // over the plex chan to the switch.
@ -900,6 +940,7 @@ out:
} }
p.wg.Done() p.wg.Done()
peerLog.Tracef("htlcManager for peer %v done", p)
} }
// updateCommitTx signs, then sends an update to the remote peer adding a new // updateCommitTx signs, then sends an update to the remote peer adding a new