diff --git a/fundingmanager.go b/fundingmanager.go index f839ac05..3ff7979c 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -310,7 +310,9 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { // TODO(roasbeef): passing num confs 1 is irrelevant here, make signed? reservation, err := f.wallet.InitChannelReservation(amt, 0, fmsg.peer.lightningID, 1, delay) if err != nil { + // TODO(roasbeef): push ErrorGeneric message fndgLog.Errorf("Unable to initialize reservation: %v", err) + fmsg.peer.Disconnect() return } @@ -343,6 +345,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { } if err := reservation.ProcessSingleContribution(contribution); err != nil { fndgLog.Errorf("unable to add contribution reservation: %v", err) + fmsg.peer.Disconnect() return } @@ -403,6 +406,7 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { if err := resCtx.reservation.ProcessContribution(contribution); err != nil { fndgLog.Errorf("Unable to process contribution from %v: %v", sourcePeer, err) + fmsg.peer.Disconnect() return } @@ -465,6 +469,7 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) { if err := resCtx.reservation.CompleteReservationSingle(revokeKey, fundingOut, commitSig); err != nil { // TODO(roasbeef): better error logging: peerID, channelID, etc. fndgLog.Errorf("unable to complete single reservation: %v", err) + fmsg.peer.Disconnect() return } @@ -514,6 +519,7 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) commitSig := append(fmsg.msg.CommitSignature.Serialize(), byte(txscript.SigHashAll)) if err := resCtx.reservation.CompleteReservation(nil, commitSig); err != nil { fndgLog.Errorf("unable to complete reservation sign complete: %v", err) + fmsg.peer.Disconnect() return } @@ -594,6 +600,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { openChan, err := resCtx.reservation.FinalizeReservation() if err != nil { fndgLog.Errorf("unable to finalize reservation: %v", err) + fmsg.peer.Disconnect() return } diff --git a/peer.go b/peer.go index e094fe35..416517e8 100644 --- a/peer.go +++ b/peer.go @@ -275,6 +275,32 @@ func (p *peer) Stop() error { return nil } +// Disconnect terminates the connection with the remote peer. Additionally, a +// signal is sent to the server and htlcSwitch indicating the resources +// allocated to the peer can now be cleaned up. +func (p *peer) Disconnect() { + if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) { + return + } + + peerLog.Tracef("Disconnecting %s", p) + if atomic.LoadInt32(&p.connected) != 0 { + p.conn.Close() + } + + close(p.quit) + + // Launch a goroutine to clean up the remaining resources. + go func() { + // Tell the switch to unregister all links associated with this + // peer. Passing nil as the target link indicates that all links + // associated with this interface should be closed. + p.server.htlcSwitch.UnregisterLink(p.lightningID, nil) + + p.server.donePeers <- p + }() +} + // String returns the string representation of this peer. func (p *peer) String() string { return p.conn.RemoteAddr().String() @@ -376,7 +402,10 @@ out: } } + p.Disconnect() + p.wg.Done() + peerLog.Tracef("readHandler for peer %v done", p) } // writeMessage writes the target lnwire.Message to the remote peer. @@ -416,8 +445,9 @@ out: } if err := p.writeMessage(outMsg.msg); err != nil { - // TODO(roasbeef): disconnect peerLog.Errorf("unable to write message: %v", err) + p.Disconnect() + break out } // Synchronize with the writeHandler. @@ -447,6 +477,7 @@ fin: } } p.wg.Done() + peerLog.Tracef("writeHandler for peer %v done", p) } // queueHandler is responsible for accepting messages from outside sub-systems @@ -520,12 +551,14 @@ out: snapshots = append(snapshots, snapshot) } req.resp <- snapshots + case pendingChanPoint := <-p.barrierInits: p.barrierMtx.Lock() peerLog.Tracef("Creating chan barrier for "+ "ChannelPoint(%v)", pendingChanPoint) p.newChanBarriers[pendingChanPoint] = make(chan struct{}) p.barrierMtx.Unlock() + case newChan := <-p.newChannels: chanPoint := *newChan.ChannelPoint() p.activeChannels[chanPoint] = newChan @@ -556,10 +589,13 @@ out: close(p.newChanBarriers[chanPoint]) delete(p.newChanBarriers, chanPoint) p.barrierMtx.Unlock() + case req := <-p.localCloseChanReqs: p.handleLocalClose(req) + case req := <-p.remoteCloseChanReqs: p.handleRemoteClose(req) + case <-p.quit: break out } @@ -799,8 +835,10 @@ out: // TODO(roasbeef): this assumes no "multi-sig" pre := htlcPkt.RedemptionProofs[0] if _, err := channel.SettleHTLC(pre, true); err != nil { + // TODO(roasbeef): broadcast on-chain peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) - continue + p.Disconnect() + break out } case *lnwire.CommitSignature: // We just received a new update to our local @@ -811,7 +849,8 @@ out: sig := htlcPkt.CommitSig.Serialize() if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil { peerLog.Errorf("unable to accept new commitment: %v", err) - continue + p.Disconnect() + break out } // If we didn't initiate this state transition, @@ -847,7 +886,8 @@ out: htlcsToForward, err := channel.ReceiveRevocation(htlcPkt) if err != nil { peerLog.Errorf("unable to accept revocation: %v", err) - continue + p.Disconnect() + break out } // TODO(roasbeef): send the locked-in HTLC's // over the plex chan to the switch. @@ -900,6 +940,7 @@ out: } p.wg.Done() + peerLog.Tracef("htlcManager for peer %v done", p) } // updateCommitTx signs, then sends an update to the remote peer adding a new