From 2df5a36048069597907f160850f542178663e64a Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 25 Sep 2019 12:00:59 -0700 Subject: [PATCH 1/4] peer+lnwire: add LinkUpdater iface and impl on relevant msgs Removes longstanding TODO to simplify parsing of target chanid. --- lnwire/commit_sig.go | 12 +++++++- lnwire/revoke_and_ack.go | 8 ++++++ lnwire/update_add_htlc.go | 12 +++++++- lnwire/update_fail_htlc.go | 12 +++++++- lnwire/update_fail_malformed_htlc.go | 8 ++++++ lnwire/update_fee.go | 8 ++++++ lnwire/update_fulfill_htlc.go | 12 +++++++- peer.go | 42 +++++++++++----------------- 8 files changed, 84 insertions(+), 30 deletions(-) diff --git a/lnwire/commit_sig.go b/lnwire/commit_sig.go index 5002d5fe..72c235b3 100644 --- a/lnwire/commit_sig.go +++ b/lnwire/commit_sig.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // CommitSig is sent by either side to stage any pending HTLC's in the // receiver's pending set into a new commitment state. Implicitly, the new @@ -83,3 +85,11 @@ func (c *CommitSig) MaxPayloadLength(uint32) uint32 { // 32 + 64 + 2 + max_allowed_htlcs return MaxMessagePayload } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *CommitSig) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/revoke_and_ack.go b/lnwire/revoke_and_ack.go index 623bcc36..f6395108 100644 --- a/lnwire/revoke_and_ack.go +++ b/lnwire/revoke_and_ack.go @@ -81,3 +81,11 @@ func (c *RevokeAndAck) MaxPayloadLength(uint32) uint32 { // 32 + 32 + 33 return 97 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *RevokeAndAck) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_add_htlc.go b/lnwire/update_add_htlc.go index d127db44..b3add950 100644 --- a/lnwire/update_add_htlc.go +++ b/lnwire/update_add_htlc.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // OnionPacketSize is the size of the serialized Sphinx onion packet included // in each UpdateAddHTLC message. The breakdown of the onion packet is as @@ -107,3 +109,11 @@ func (c *UpdateAddHTLC) MaxPayloadLength(uint32) uint32 { // 1450 return 32 + 8 + 4 + 8 + 32 + 1366 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateAddHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fail_htlc.go b/lnwire/update_fail_htlc.go index ed59af17..17fc3cd4 100644 --- a/lnwire/update_fail_htlc.go +++ b/lnwire/update_fail_htlc.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // OpaqueReason is an opaque encrypted byte slice that encodes the exact // failure reason and additional some supplemental data. The contents of this @@ -83,3 +85,11 @@ func (c *UpdateFailHTLC) MaxPayloadLength(uint32) uint32 { return length } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFailHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fail_malformed_htlc.go b/lnwire/update_fail_malformed_htlc.go index 6415b882..68f0a61b 100644 --- a/lnwire/update_fail_malformed_htlc.go +++ b/lnwire/update_fail_malformed_htlc.go @@ -73,3 +73,11 @@ func (c *UpdateFailMalformedHTLC) MaxPayloadLength(uint32) uint32 { // 32 + 8 + 32 + 2 return 74 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFailMalformedHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fee.go b/lnwire/update_fee.go index fde0cb8b..5657633b 100644 --- a/lnwire/update_fee.go +++ b/lnwire/update_fee.go @@ -68,3 +68,11 @@ func (c *UpdateFee) MaxPayloadLength(uint32) uint32 { // 32 + 4 return 36 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFee) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fulfill_htlc.go b/lnwire/update_fulfill_htlc.go index 050e81fd..49344008 100644 --- a/lnwire/update_fulfill_htlc.go +++ b/lnwire/update_fulfill_htlc.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // UpdateFulfillHTLC is sent by Alice to Bob when she wishes to settle a // particular HTLC referenced by its HTLCKey within a specific active channel @@ -76,3 +78,11 @@ func (c *UpdateFulfillHTLC) MaxPayloadLength(uint32) uint32 { // 32 + 8 + 32 return 72 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFulfillHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/peer.go b/peer.go index 247daab7..df33543c 100644 --- a/peer.go +++ b/peer.go @@ -1038,7 +1038,7 @@ out: } var ( - isChanUpdate bool + isLinkUpdate bool targetChan lnwire.ChannelID ) @@ -1105,7 +1105,7 @@ out: // If not we hand the error to the channel link for // this channel. default: - isChanUpdate = true + isLinkUpdate = true targetChan = msg.ChanID // Also marked this channel as failed, so we @@ -1114,32 +1114,14 @@ out: p.failedChannels[targetChan] = struct{}{} } - // TODO(roasbeef): create ChanUpdater interface for the below - case *lnwire.UpdateAddHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFulfillHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFailMalformedHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFailHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.RevokeAndAck: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.CommitSig: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFee: - isChanUpdate = true - targetChan = msg.ChanID case *lnwire.ChannelReestablish: - isChanUpdate = true + isLinkUpdate = true targetChan = msg.ChanID + case LinkUpdater: + isLinkUpdate = true + targetChan = msg.TargetChanID() + case *lnwire.ChannelUpdate, *lnwire.ChannelAnnouncement, *lnwire.NodeAnnouncement, @@ -1157,7 +1139,7 @@ out: "%v", uint16(msg.MsgType()), p) } - if isChanUpdate { + if isLinkUpdate { // If this is a channel update, then we need to feed it // into the channel's in-order message stream. chanStream, ok := chanMsgStreams[targetChan] @@ -2543,4 +2525,12 @@ func (p *peer) StartTime() time.Time { return p.startTime } +// LinkUpdater is an interface implemented by most messages in BOLT 2 that are +// allowed to update the channel state. +type LinkUpdater interface { + // TargetChanID returns the channel id of the link for which this + // message is intended. + TargetChanID() lnwire.ChannelID +} + // TODO(roasbeef): make all start/stop mutexes a CAS From f33a1a61e68fe4ded4c36baa10cf7966bafa1e86 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 25 Sep 2019 12:01:11 -0700 Subject: [PATCH 2/4] peer: extract error handling logic --- peer.go | 82 ++++++++++++++++++++++++++++++++------------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/peer.go b/peer.go index df33543c..cead16c6 100644 --- a/peer.go +++ b/peer.go @@ -155,6 +155,10 @@ type peer struct { // channels to the source peer which handled the funding workflow. newChannels chan *newChannelMsg + // activeMsgStreams is a map from channel id to the channel streams that + // proxy messages to individual, active links. + activeMsgStreams map[lnwire.ChannelID]*msgStream + // activeChanCloses is a map that keep track of all the active // cooperative channel closures that are active. Any channel closing // messages are directed to one of these active state machines. Once @@ -253,6 +257,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), newChannels: make(chan *newChannelMsg, 1), + activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), + activeChanCloses: make(map[lnwire.ChannelID]*channelCloser), localCloseChanReqs: make(chan *htlcswitch.ChanClose), linkFailures: make(chan linkFailureReport), @@ -990,8 +996,6 @@ func (p *peer) readHandler() { discStream := newDiscMsgStream(p) discStream.Start() defer discStream.Stop() - - chanMsgStreams := make(map[lnwire.ChannelID]*msgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() @@ -1081,38 +1085,8 @@ out: } case *lnwire.Error: - key := p.addr.IdentityKey - - switch { - // In the case of an all-zero channel ID we want to - // forward the error to all channels with this peer. - case msg.ChanID == lnwire.ConnectionWideID: - for chanID, chanStream := range chanMsgStreams { - chanStream.AddMsg(nextMsg) - - // Also marked this channel as failed, - // so we won't try to restart it on - // reconnect with this peer. - p.failedChannels[chanID] = struct{}{} - } - - // If the channel ID for the error message corresponds - // to a pending channel, then the funding manager will - // handle the error. - case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): - p.server.fundingMgr.processFundingError(msg, key) - - // If not we hand the error to the channel link for - // this channel. - default: - isLinkUpdate = true - targetChan = msg.ChanID - - // Also marked this channel as failed, so we - // won't try to restart it on reconnect with - // this peer. - p.failedChannels[targetChan] = struct{}{} - } + isLinkUpdate = p.handleError(msg) + targetChan = msg.ChanID case *lnwire.ChannelReestablish: isLinkUpdate = true @@ -1142,13 +1116,13 @@ out: if isLinkUpdate { // If this is a channel update, then we need to feed it // into the channel's in-order message stream. - chanStream, ok := chanMsgStreams[targetChan] + chanStream, ok := p.activeMsgStreams[targetChan] if !ok { // If a stream hasn't yet been created, then // we'll do so, add it to the map, and finally // start it. chanStream = newChanMsgStream(p, targetChan) - chanMsgStreams[targetChan] = chanStream + p.activeMsgStreams[targetChan] = chanStream chanStream.Start() defer chanStream.Stop() } @@ -1166,6 +1140,42 @@ out: peerLog.Tracef("readHandler for peer %v done", p) } +// handleError processes an error message read from the remote peer. The boolean +// returns indicates whether the message should be delivered to a targeted peer. +// +// NOTE: This method should only be called from within the readHandler. +func (p *peer) handleError(msg *lnwire.Error) bool { + key := p.addr.IdentityKey + + switch { + + // In the case of an all-zero channel ID we want to forward the error to + // all channels with this peer. + case msg.ChanID == lnwire.ConnectionWideID: + for chanID, chanStream := range p.activeMsgStreams { + chanStream.AddMsg(msg) + + // Also marked this channel as failed, so we won't try + // to restart it on reconnect with this peer. + p.failedChannels[chanID] = struct{}{} + } + return false + + // If the channel ID for the error message corresponds to a pending + // channel, then the funding manager will handle the error. + case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): + p.server.fundingMgr.processFundingError(msg, key) + return false + + // If not we hand the error to the channel link for this channel. + default: + // Mark this channel as failed, so we won't try to restart it on + // reconnect with this peer. + p.failedChannels[msg.ChanID] = struct{}{} + return true + } +} + // messageSummary returns a human-readable string that summarizes a // incoming/outgoing message. Not all messages will have a summary, only those // which have additional data that can be informative at a glance. From ced113452d30e01b81697f2f49f84375ab90c8c4 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 25 Sep 2019 12:01:23 -0700 Subject: [PATCH 3/4] peer: only mark active channels as failed Also adds similar sanity check for LinkUpdater msgs, so that we don't process messages for inactive channels. --- peer.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/peer.go b/peer.go index cead16c6..e861a65f 100644 --- a/peer.go +++ b/peer.go @@ -1042,8 +1042,8 @@ out: } var ( - isLinkUpdate bool targetChan lnwire.ChannelID + isLinkUpdate bool ) switch msg := nextMsg.(type) { @@ -1085,16 +1085,16 @@ out: } case *lnwire.Error: - isLinkUpdate = p.handleError(msg) targetChan = msg.ChanID + isLinkUpdate = p.handleError(msg) case *lnwire.ChannelReestablish: isLinkUpdate = true targetChan = msg.ChanID case LinkUpdater: - isLinkUpdate = true targetChan = msg.TargetChanID() + isLinkUpdate = p.isActiveChannel(targetChan) case *lnwire.ChannelUpdate, *lnwire.ChannelAnnouncement, @@ -1140,6 +1140,15 @@ out: peerLog.Tracef("readHandler for peer %v done", p) } +// isActiveChannel returns true if the provided channel id is active, otherwise +// returns false. +func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { + p.activeChanMtx.RLock() + _, ok := p.activeChannels[chanID] + p.activeChanMtx.RUnlock() + return ok +} + // handleError processes an error message read from the remote peer. The boolean // returns indicates whether the message should be delivered to a targeted peer. // @@ -1168,11 +1177,14 @@ func (p *peer) handleError(msg *lnwire.Error) bool { return false // If not we hand the error to the channel link for this channel. - default: + case p.isActiveChannel(msg.ChanID): // Mark this channel as failed, so we won't try to restart it on // reconnect with this peer. p.failedChannels[msg.ChanID] = struct{}{} return true + + default: + return false } } From a280a4987c9392e43bf8052a4913bbcb4a56ea2d Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Wed, 25 Sep 2019 12:01:34 -0700 Subject: [PATCH 4/4] peer: resend channel reestablishes without starting chan streams This prevents the DLP protocol from breaking as a result of the refactor, since the closing or closed channels won't be included in the peer's active map. --- peer.go | 50 ++++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/peer.go b/peer.go index e861a65f..3ca6b672 100644 --- a/peer.go +++ b/peer.go @@ -907,33 +907,7 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { // active goroutine dedicated to this channel. if chanLink == nil { link, err := p.server.htlcSwitch.GetLink(cid) - switch { - - // If we failed to find the link in question, - // and the message received was a channel sync - // message, then this might be a peer trying to - // resync closed channel. In this case we'll - // try to resend our last channel sync message, - // such that the peer can recover funds from - // the closed channel. - case err != nil && isChanSyncMsg: - peerLog.Debugf("Unable to find "+ - "link(%v) to handle channel "+ - "sync, attempting to resend "+ - "last ChanSync message", cid) - - err := p.resendChanSyncMsg(cid) - if err != nil { - // TODO(halseth): send error to - // peer? - peerLog.Errorf( - "resend failed: %v", - err, - ) - } - return - - case err != nil: + if err != nil { peerLog.Errorf("recv'd update for "+ "unknown channel %v from %v: "+ "%v", cid, p, err) @@ -1089,8 +1063,23 @@ out: isLinkUpdate = p.handleError(msg) case *lnwire.ChannelReestablish: - isLinkUpdate = true targetChan = msg.ChanID + isLinkUpdate = p.isActiveChannel(targetChan) + + // If we failed to find the link in question, and the + // message received was a channel sync message, then + // this might be a peer trying to resync closed channel. + // In this case we'll try to resend our last channel + // sync message, such that the peer can recover funds + // from the closed channel. + if !isLinkUpdate { + err := p.resendChanSyncMsg(targetChan) + if err != nil { + // TODO(halseth): send error to peer? + peerLog.Errorf("resend failed: %v", + err) + } + } case LinkUpdater: targetChan = msg.TargetChanID() @@ -2411,6 +2400,11 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { cid) } + if !c.RemotePub.IsEqual(p.IdentityKey()) { + return fmt.Errorf("ignoring channel reestablish from "+ + "peer=%x", p.IdentityKey()) + } + peerLog.Debugf("Re-sending channel sync message for channel %v to "+ "peer %v", cid, p)