diff --git a/lnwire/commit_sig.go b/lnwire/commit_sig.go index 5002d5fe..72c235b3 100644 --- a/lnwire/commit_sig.go +++ b/lnwire/commit_sig.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // CommitSig is sent by either side to stage any pending HTLC's in the // receiver's pending set into a new commitment state. Implicitly, the new @@ -83,3 +85,11 @@ func (c *CommitSig) MaxPayloadLength(uint32) uint32 { // 32 + 64 + 2 + max_allowed_htlcs return MaxMessagePayload } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *CommitSig) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/revoke_and_ack.go b/lnwire/revoke_and_ack.go index 623bcc36..f6395108 100644 --- a/lnwire/revoke_and_ack.go +++ b/lnwire/revoke_and_ack.go @@ -81,3 +81,11 @@ func (c *RevokeAndAck) MaxPayloadLength(uint32) uint32 { // 32 + 32 + 33 return 97 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *RevokeAndAck) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_add_htlc.go b/lnwire/update_add_htlc.go index d127db44..b3add950 100644 --- a/lnwire/update_add_htlc.go +++ b/lnwire/update_add_htlc.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // OnionPacketSize is the size of the serialized Sphinx onion packet included // in each UpdateAddHTLC message. The breakdown of the onion packet is as @@ -107,3 +109,11 @@ func (c *UpdateAddHTLC) MaxPayloadLength(uint32) uint32 { // 1450 return 32 + 8 + 4 + 8 + 32 + 1366 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateAddHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fail_htlc.go b/lnwire/update_fail_htlc.go index ed59af17..17fc3cd4 100644 --- a/lnwire/update_fail_htlc.go +++ b/lnwire/update_fail_htlc.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // OpaqueReason is an opaque encrypted byte slice that encodes the exact // failure reason and additional some supplemental data. The contents of this @@ -83,3 +85,11 @@ func (c *UpdateFailHTLC) MaxPayloadLength(uint32) uint32 { return length } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFailHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fail_malformed_htlc.go b/lnwire/update_fail_malformed_htlc.go index 6415b882..68f0a61b 100644 --- a/lnwire/update_fail_malformed_htlc.go +++ b/lnwire/update_fail_malformed_htlc.go @@ -73,3 +73,11 @@ func (c *UpdateFailMalformedHTLC) MaxPayloadLength(uint32) uint32 { // 32 + 8 + 32 + 2 return 74 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFailMalformedHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fee.go b/lnwire/update_fee.go index fde0cb8b..5657633b 100644 --- a/lnwire/update_fee.go +++ b/lnwire/update_fee.go @@ -68,3 +68,11 @@ func (c *UpdateFee) MaxPayloadLength(uint32) uint32 { // 32 + 4 return 36 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFee) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/lnwire/update_fulfill_htlc.go b/lnwire/update_fulfill_htlc.go index 050e81fd..49344008 100644 --- a/lnwire/update_fulfill_htlc.go +++ b/lnwire/update_fulfill_htlc.go @@ -1,6 +1,8 @@ package lnwire -import "io" +import ( + "io" +) // UpdateFulfillHTLC is sent by Alice to Bob when she wishes to settle a // particular HTLC referenced by its HTLCKey within a specific active channel @@ -76,3 +78,11 @@ func (c *UpdateFulfillHTLC) MaxPayloadLength(uint32) uint32 { // 32 + 8 + 32 return 72 } + +// TargetChanID returns the channel id of the link for which this message is +// intended. +// +// NOTE: Part of lnd.LinkUpdater interface. +func (c *UpdateFulfillHTLC) TargetChanID() ChannelID { + return c.ChanID +} diff --git a/peer.go b/peer.go index 247daab7..3ca6b672 100644 --- a/peer.go +++ b/peer.go @@ -155,6 +155,10 @@ type peer struct { // channels to the source peer which handled the funding workflow. newChannels chan *newChannelMsg + // activeMsgStreams is a map from channel id to the channel streams that + // proxy messages to individual, active links. + activeMsgStreams map[lnwire.ChannelID]*msgStream + // activeChanCloses is a map that keep track of all the active // cooperative channel closures that are active. Any channel closing // messages are directed to one of these active state machines. Once @@ -253,6 +257,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel), newChannels: make(chan *newChannelMsg, 1), + activeMsgStreams: make(map[lnwire.ChannelID]*msgStream), + activeChanCloses: make(map[lnwire.ChannelID]*channelCloser), localCloseChanReqs: make(chan *htlcswitch.ChanClose), linkFailures: make(chan linkFailureReport), @@ -901,33 +907,7 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { // active goroutine dedicated to this channel. if chanLink == nil { link, err := p.server.htlcSwitch.GetLink(cid) - switch { - - // If we failed to find the link in question, - // and the message received was a channel sync - // message, then this might be a peer trying to - // resync closed channel. In this case we'll - // try to resend our last channel sync message, - // such that the peer can recover funds from - // the closed channel. - case err != nil && isChanSyncMsg: - peerLog.Debugf("Unable to find "+ - "link(%v) to handle channel "+ - "sync, attempting to resend "+ - "last ChanSync message", cid) - - err := p.resendChanSyncMsg(cid) - if err != nil { - // TODO(halseth): send error to - // peer? - peerLog.Errorf( - "resend failed: %v", - err, - ) - } - return - - case err != nil: + if err != nil { peerLog.Errorf("recv'd update for "+ "unknown channel %v from %v: "+ "%v", cid, p, err) @@ -990,8 +970,6 @@ func (p *peer) readHandler() { discStream := newDiscMsgStream(p) discStream.Start() defer discStream.Stop() - - chanMsgStreams := make(map[lnwire.ChannelID]*msgStream) out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, err := p.readNextMessage() @@ -1038,8 +1016,8 @@ out: } var ( - isChanUpdate bool targetChan lnwire.ChannelID + isLinkUpdate bool ) switch msg := nextMsg.(type) { @@ -1081,64 +1059,31 @@ out: } case *lnwire.Error: - key := p.addr.IdentityKey + targetChan = msg.ChanID + isLinkUpdate = p.handleError(msg) - switch { - // In the case of an all-zero channel ID we want to - // forward the error to all channels with this peer. - case msg.ChanID == lnwire.ConnectionWideID: - for chanID, chanStream := range chanMsgStreams { - chanStream.AddMsg(nextMsg) + case *lnwire.ChannelReestablish: + targetChan = msg.ChanID + isLinkUpdate = p.isActiveChannel(targetChan) - // Also marked this channel as failed, - // so we won't try to restart it on - // reconnect with this peer. - p.failedChannels[chanID] = struct{}{} + // If we failed to find the link in question, and the + // message received was a channel sync message, then + // this might be a peer trying to resync closed channel. + // In this case we'll try to resend our last channel + // sync message, such that the peer can recover funds + // from the closed channel. + if !isLinkUpdate { + err := p.resendChanSyncMsg(targetChan) + if err != nil { + // TODO(halseth): send error to peer? + peerLog.Errorf("resend failed: %v", + err) } - - // If the channel ID for the error message corresponds - // to a pending channel, then the funding manager will - // handle the error. - case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): - p.server.fundingMgr.processFundingError(msg, key) - - // If not we hand the error to the channel link for - // this channel. - default: - isChanUpdate = true - targetChan = msg.ChanID - - // Also marked this channel as failed, so we - // won't try to restart it on reconnect with - // this peer. - p.failedChannels[targetChan] = struct{}{} } - // TODO(roasbeef): create ChanUpdater interface for the below - case *lnwire.UpdateAddHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFulfillHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFailMalformedHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFailHTLC: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.RevokeAndAck: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.CommitSig: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.UpdateFee: - isChanUpdate = true - targetChan = msg.ChanID - case *lnwire.ChannelReestablish: - isChanUpdate = true - targetChan = msg.ChanID + case LinkUpdater: + targetChan = msg.TargetChanID() + isLinkUpdate = p.isActiveChannel(targetChan) case *lnwire.ChannelUpdate, *lnwire.ChannelAnnouncement, @@ -1157,16 +1102,16 @@ out: "%v", uint16(msg.MsgType()), p) } - if isChanUpdate { + if isLinkUpdate { // If this is a channel update, then we need to feed it // into the channel's in-order message stream. - chanStream, ok := chanMsgStreams[targetChan] + chanStream, ok := p.activeMsgStreams[targetChan] if !ok { // If a stream hasn't yet been created, then // we'll do so, add it to the map, and finally // start it. chanStream = newChanMsgStream(p, targetChan) - chanMsgStreams[targetChan] = chanStream + p.activeMsgStreams[targetChan] = chanStream chanStream.Start() defer chanStream.Stop() } @@ -1184,6 +1129,54 @@ out: peerLog.Tracef("readHandler for peer %v done", p) } +// isActiveChannel returns true if the provided channel id is active, otherwise +// returns false. +func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { + p.activeChanMtx.RLock() + _, ok := p.activeChannels[chanID] + p.activeChanMtx.RUnlock() + return ok +} + +// handleError processes an error message read from the remote peer. The boolean +// returns indicates whether the message should be delivered to a targeted peer. +// +// NOTE: This method should only be called from within the readHandler. +func (p *peer) handleError(msg *lnwire.Error) bool { + key := p.addr.IdentityKey + + switch { + + // In the case of an all-zero channel ID we want to forward the error to + // all channels with this peer. + case msg.ChanID == lnwire.ConnectionWideID: + for chanID, chanStream := range p.activeMsgStreams { + chanStream.AddMsg(msg) + + // Also marked this channel as failed, so we won't try + // to restart it on reconnect with this peer. + p.failedChannels[chanID] = struct{}{} + } + return false + + // If the channel ID for the error message corresponds to a pending + // channel, then the funding manager will handle the error. + case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key): + p.server.fundingMgr.processFundingError(msg, key) + return false + + // If not we hand the error to the channel link for this channel. + case p.isActiveChannel(msg.ChanID): + // Mark this channel as failed, so we won't try to restart it on + // reconnect with this peer. + p.failedChannels[msg.ChanID] = struct{}{} + return true + + default: + return false + } +} + // messageSummary returns a human-readable string that summarizes a // incoming/outgoing message. Not all messages will have a summary, only those // which have additional data that can be informative at a glance. @@ -2407,6 +2400,11 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error { cid) } + if !c.RemotePub.IsEqual(p.IdentityKey()) { + return fmt.Errorf("ignoring channel reestablish from "+ + "peer=%x", p.IdentityKey()) + } + peerLog.Debugf("Re-sending channel sync message for channel %v to "+ "peer %v", cid, p) @@ -2543,4 +2541,12 @@ func (p *peer) StartTime() time.Time { return p.startTime } +// LinkUpdater is an interface implemented by most messages in BOLT 2 that are +// allowed to update the channel state. +type LinkUpdater interface { + // TargetChanID returns the channel id of the link for which this + // message is intended. + TargetChanID() lnwire.ChannelID +} + // TODO(roasbeef): make all start/stop mutexes a CAS