peer+server+htlcswitch: add reason to disconnnect function
In order to recognize exact reason of the disconnect the additional field have been added in the disconnect function.
This commit is contained in:
parent
46ba18db9b
commit
ef73062c14
@ -99,5 +99,5 @@ type Peer interface {
|
|||||||
|
|
||||||
// Disconnect disconnects with peer if we have error which we can't
|
// Disconnect disconnects with peer if we have error which we can't
|
||||||
// properly handle.
|
// properly handle.
|
||||||
Disconnect()
|
Disconnect(reason error)
|
||||||
}
|
}
|
||||||
|
@ -314,9 +314,7 @@ out:
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
log.Errorf("unable to update commitment: %v",
|
l.fail("unable to update commitment: %v", err)
|
||||||
err)
|
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -333,9 +331,7 @@ out:
|
|||||||
// update, waiting for the revocation window to open
|
// update, waiting for the revocation window to open
|
||||||
// up.
|
// up.
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
log.Errorf("unable to update "+
|
l.fail("unable to update commitment: %v", err)
|
||||||
"commitment: %v", err)
|
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -454,9 +450,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
|
|||||||
logIndex, err := l.channel.SettleHTLC(pre)
|
logIndex, err := l.channel.SettleHTLC(pre)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(roasbeef): broadcast on-chain
|
// TODO(roasbeef): broadcast on-chain
|
||||||
log.Errorf("settle for incoming HTLC "+
|
l.fail("unable to settle incoming HTLC: %v", err)
|
||||||
"rejected: %v", err)
|
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -499,9 +493,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket) {
|
|||||||
// this is a settle request, then initiate an update.
|
// this is a settle request, then initiate an update.
|
||||||
if l.batchCounter >= 10 || isSettle {
|
if l.batchCounter >= 10 || isSettle {
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
log.Errorf("unable to update "+
|
l.fail("unable to update commitment: %v", err)
|
||||||
"commitment: %v", err)
|
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -518,10 +510,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
// "settle" list in the event that we know the preimage.
|
// "settle" list in the event that we know the preimage.
|
||||||
index, err := l.channel.ReceiveHTLC(msg)
|
index, err := l.channel.ReceiveHTLC(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO(roasbeef): fail channel
|
l.fail("unable to handle upstream add HTLC: %v", err)
|
||||||
log.Errorf("unable to handle upstream add HTLC: %v",
|
|
||||||
err)
|
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Tracef("Receive upstream htlc with payment hash(%x), "+
|
log.Tracef("Receive upstream htlc with payment hash(%x), "+
|
||||||
@ -540,9 +529,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
idx := msg.ID
|
idx := msg.ID
|
||||||
if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
|
if err := l.channel.ReceiveHTLCSettle(pre, idx); err != nil {
|
||||||
// TODO(roasbeef): broadcast on-chain
|
// TODO(roasbeef): broadcast on-chain
|
||||||
log.Errorf("unable to handle upstream settle "+
|
l.fail("unable to handle upstream settle HTLC: %v", err)
|
||||||
"HTLC: %v", err)
|
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -551,9 +538,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
case *lnwire.UpdateFailHTLC:
|
case *lnwire.UpdateFailHTLC:
|
||||||
idx := msg.ID
|
idx := msg.ID
|
||||||
if err := l.channel.ReceiveFailHTLC(idx); err != nil {
|
if err := l.channel.ReceiveFailHTLC(idx); err != nil {
|
||||||
log.Errorf("unable to handle upstream fail HTLC: "+
|
l.fail("unable to handle upstream fail HTLC: %v", err)
|
||||||
"%v", err)
|
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -566,8 +551,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
// TODO(roasbeef): redundant re-serialization
|
// TODO(roasbeef): redundant re-serialization
|
||||||
sig := msg.CommitSig.Serialize()
|
sig := msg.CommitSig.Serialize()
|
||||||
if err := l.channel.ReceiveNewCommitment(sig); err != nil {
|
if err := l.channel.ReceiveNewCommitment(sig); err != nil {
|
||||||
log.Errorf("unable to accept new commitment: %v", err)
|
l.fail("unable to accept new commitment: %v", err)
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -607,8 +591,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
// so we'll reply with a signature to provide them with their
|
// so we'll reply with a signature to provide them with their
|
||||||
// version of the latest commitment l.
|
// version of the latest commitment l.
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
log.Errorf("unable to update commitment: %v", err)
|
l.fail("unable to update commitment: %v", err)
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -618,8 +601,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
// revocation window.
|
// revocation window.
|
||||||
htlcs, err := l.channel.ReceiveRevocation(msg)
|
htlcs, err := l.channel.ReceiveRevocation(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to accept revocation: %v", err)
|
l.fail("unable to accept revocation: %v", err)
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -643,8 +625,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
|||||||
// channel, if not we will apply the update.
|
// channel, if not we will apply the update.
|
||||||
fee := msg.FeePerKw
|
fee := msg.FeePerKw
|
||||||
if err := l.channel.ReceiveUpdateFee(fee); err != nil {
|
if err := l.channel.ReceiveUpdateFee(fee); err != nil {
|
||||||
log.Errorf("error receiving fee update: %v", err)
|
l.fail("error receiving fee update: %v", err)
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -999,8 +980,7 @@ func (l *channelLink) processLockedInHtlcs(
|
|||||||
preimage := invoice.Terms.PaymentPreimage
|
preimage := invoice.Terms.PaymentPreimage
|
||||||
logIndex, err := l.channel.SettleHTLC(preimage)
|
logIndex, err := l.channel.SettleHTLC(preimage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to settle htlc: %v", err)
|
l.fail("unable to settle htlc: %v", err)
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1009,8 +989,7 @@ func (l *channelLink) processLockedInHtlcs(
|
|||||||
// update.
|
// update.
|
||||||
err = l.cfg.Registry.SettleInvoice(invoiceHash)
|
err = l.cfg.Registry.SettleInvoice(invoiceHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to settle invoice: %v", err)
|
l.fail("unable to settle invoice: %v", err)
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1133,8 +1112,7 @@ func (l *channelLink) processLockedInHtlcs(
|
|||||||
// remote htlc logs, initiate a state transition by updating
|
// remote htlc logs, initiate a state transition by updating
|
||||||
// the remote commitment chain.
|
// the remote commitment chain.
|
||||||
if err := l.updateCommitTx(); err != nil {
|
if err := l.updateCommitTx(); err != nil {
|
||||||
log.Errorf("unable to update commitment: %v", err)
|
l.fail("unable to update commitment: %v", err)
|
||||||
l.cfg.Peer.Disconnect()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1159,3 +1137,11 @@ func (l *channelLink) sendHTLCError(rHash [32]byte,
|
|||||||
Reason: reason,
|
Reason: reason,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fail helper function which is used to encapsulate the action neccessary
|
||||||
|
// for proper disconnect.
|
||||||
|
func (l *channelLink) fail(format string, a ...interface{}) {
|
||||||
|
reason := errors.Errorf(format, a...)
|
||||||
|
log.Error(reason)
|
||||||
|
l.cfg.Peer.Disconnect(reason)
|
||||||
|
}
|
@ -260,7 +260,7 @@ func (s *mockServer) PubKey() [33]byte {
|
|||||||
return s.id
|
return s.id
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *mockServer) Disconnect() {
|
func (s *mockServer) Disconnect(reason error) {
|
||||||
s.Stop()
|
s.Stop()
|
||||||
s.t.Fatalf("server %v was disconnected", s.name)
|
s.t.Fatalf("server %v was disconnected", s.name)
|
||||||
}
|
}
|
||||||
|
11
peer.go
11
peer.go
@ -338,12 +338,12 @@ func (p *peer) WaitForDisconnect() {
|
|||||||
// Disconnect terminates the connection with the remote peer. Additionally, a
|
// Disconnect terminates the connection with the remote peer. Additionally, a
|
||||||
// signal is sent to the server and htlcSwitch indicating the resources
|
// signal is sent to the server and htlcSwitch indicating the resources
|
||||||
// allocated to the peer can now be cleaned up.
|
// allocated to the peer can now be cleaned up.
|
||||||
func (p *peer) Disconnect() {
|
func (p *peer) Disconnect(reason error) {
|
||||||
if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
|
if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
peerLog.Tracef("Disconnecting %s", p)
|
peerLog.Tracef("Disconnecting %s, reason: %v", p, reason)
|
||||||
|
|
||||||
// Ensure that the TCP connection is properly closed before continuing.
|
// Ensure that the TCP connection is properly closed before continuing.
|
||||||
p.conn.Close()
|
p.conn.Close()
|
||||||
@ -534,7 +534,7 @@ out:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Disconnect()
|
p.Disconnect(errors.New("read handler closed"))
|
||||||
|
|
||||||
p.wg.Done()
|
p.wg.Done()
|
||||||
peerLog.Tracef("readHandler for peer %v done", p)
|
peerLog.Tracef("readHandler for peer %v done", p)
|
||||||
@ -641,9 +641,8 @@ func (p *peer) writeHandler() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
peerLog.Errorf("unable to write message: %v",
|
p.Disconnect(errors.Errorf("unable to write message: %v",
|
||||||
err)
|
err))
|
||||||
p.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
13
server.go
13
server.go
@ -682,8 +682,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound
|
|||||||
// Attempt to start the peer, if we're unable to do so, then disconnect
|
// Attempt to start the peer, if we're unable to do so, then disconnect
|
||||||
// this peer.
|
// this peer.
|
||||||
if err := p.Start(); err != nil {
|
if err := p.Start(); err != nil {
|
||||||
srvrLog.Errorf("unable to start peer: %v", err)
|
p.Disconnect(errors.Errorf("unable to start peer: %v", err))
|
||||||
p.Disconnect()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -746,7 +745,7 @@ func (s *server) inboundPeerConnected(conn net.Conn) {
|
|||||||
// peer to the peer garbage collection goroutine.
|
// peer to the peer garbage collection goroutine.
|
||||||
srvrLog.Debugf("Disconnecting stale connection to %v",
|
srvrLog.Debugf("Disconnecting stale connection to %v",
|
||||||
connectedPeer)
|
connectedPeer)
|
||||||
connectedPeer.Disconnect()
|
connectedPeer.Disconnect(errors.New("remove stale connection"))
|
||||||
s.donePeers <- connectedPeer
|
s.donePeers <- connectedPeer
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -825,7 +824,7 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
|
|||||||
// server for garbage collection.
|
// server for garbage collection.
|
||||||
srvrLog.Debugf("Disconnecting stale connection to %v",
|
srvrLog.Debugf("Disconnecting stale connection to %v",
|
||||||
connectedPeer)
|
connectedPeer)
|
||||||
connectedPeer.Disconnect()
|
connectedPeer.Disconnect(errors.New("remove stale connection"))
|
||||||
s.donePeers <- connectedPeer
|
s.donePeers <- connectedPeer
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -841,7 +840,7 @@ func (s *server) addPeer(p *peer) {
|
|||||||
|
|
||||||
// Ignore new peers if we're shutting down.
|
// Ignore new peers if we're shutting down.
|
||||||
if atomic.LoadInt32(&s.shutdown) != 0 {
|
if atomic.LoadInt32(&s.shutdown) != 0 {
|
||||||
p.Disconnect()
|
p.Disconnect(errors.New("server is shutting down"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -889,7 +888,7 @@ func (s *server) removePeer(p *peer) {
|
|||||||
|
|
||||||
// As the peer is now finished, ensure that the TCP connection is
|
// As the peer is now finished, ensure that the TCP connection is
|
||||||
// closed and all of its related goroutines have exited.
|
// closed and all of its related goroutines have exited.
|
||||||
p.Disconnect()
|
p.Disconnect(errors.New("remove peer"))
|
||||||
|
|
||||||
// Ignore deleting peers if we're shutting down.
|
// Ignore deleting peers if we're shutting down.
|
||||||
if atomic.LoadInt32(&s.shutdown) != 0 {
|
if atomic.LoadInt32(&s.shutdown) != 0 {
|
||||||
@ -1144,7 +1143,7 @@ func (s *server) handleDisconnectPeer(msg *disconnectPeerMsg) {
|
|||||||
// Now that we know the peer is actually connected, we'll disconnect
|
// Now that we know the peer is actually connected, we'll disconnect
|
||||||
// from the peer.
|
// from the peer.
|
||||||
srvrLog.Infof("Disconnecting from %v", peer)
|
srvrLog.Infof("Disconnecting from %v", peer)
|
||||||
peer.Disconnect()
|
peer.Disconnect(errors.New("received user command to disconnect the peer"))
|
||||||
|
|
||||||
msg.err <- nil
|
msg.err <- nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user