Merge pull request #3535 from cfromknecht/link-updater-refactor

peer: link updater refactor
This commit is contained in:
Olaoluwa Osuntokun 2019-09-25 16:37:40 -07:00 committed by GitHub
commit 1e456a6bc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 160 additions and 90 deletions

@ -1,6 +1,8 @@
package lnwire
import "io"
import (
"io"
)
// CommitSig is sent by either side to stage any pending HTLC's in the
// receiver's pending set into a new commitment state. Implicitly, the new
@ -83,3 +85,11 @@ func (c *CommitSig) MaxPayloadLength(uint32) uint32 {
// 32 + 64 + 2 + max_allowed_htlcs
return MaxMessagePayload
}
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
func (c *CommitSig) TargetChanID() ChannelID {
return c.ChanID
}

@ -81,3 +81,11 @@ func (c *RevokeAndAck) MaxPayloadLength(uint32) uint32 {
// 32 + 32 + 33
return 97
}
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
func (c *RevokeAndAck) TargetChanID() ChannelID {
return c.ChanID
}

@ -1,6 +1,8 @@
package lnwire
import "io"
import (
"io"
)
// OnionPacketSize is the size of the serialized Sphinx onion packet included
// in each UpdateAddHTLC message. The breakdown of the onion packet is as
@ -107,3 +109,11 @@ func (c *UpdateAddHTLC) MaxPayloadLength(uint32) uint32 {
// 1450
return 32 + 8 + 4 + 8 + 32 + 1366
}
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
func (c *UpdateAddHTLC) TargetChanID() ChannelID {
return c.ChanID
}

@ -1,6 +1,8 @@
package lnwire
import "io"
import (
"io"
)
// OpaqueReason is an opaque encrypted byte slice that encodes the exact
// failure reason and additional some supplemental data. The contents of this
@ -83,3 +85,11 @@ func (c *UpdateFailHTLC) MaxPayloadLength(uint32) uint32 {
return length
}
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
func (c *UpdateFailHTLC) TargetChanID() ChannelID {
return c.ChanID
}

@ -73,3 +73,11 @@ func (c *UpdateFailMalformedHTLC) MaxPayloadLength(uint32) uint32 {
// 32 + 8 + 32 + 2
return 74
}
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
func (c *UpdateFailMalformedHTLC) TargetChanID() ChannelID {
return c.ChanID
}

@ -68,3 +68,11 @@ func (c *UpdateFee) MaxPayloadLength(uint32) uint32 {
// 32 + 4
return 36
}
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
func (c *UpdateFee) TargetChanID() ChannelID {
return c.ChanID
}

@ -1,6 +1,8 @@
package lnwire
import "io"
import (
"io"
)
// UpdateFulfillHTLC is sent by Alice to Bob when she wishes to settle a
// particular HTLC referenced by its HTLCKey within a specific active channel
@ -76,3 +78,11 @@ func (c *UpdateFulfillHTLC) MaxPayloadLength(uint32) uint32 {
// 32 + 8 + 32
return 72
}
// TargetChanID returns the channel id of the link for which this message is
// intended.
//
// NOTE: Part of lnd.LinkUpdater interface.
func (c *UpdateFulfillHTLC) TargetChanID() ChannelID {
return c.ChanID
}

178
peer.go

@ -155,6 +155,10 @@ type peer struct {
// channels to the source peer which handled the funding workflow.
newChannels chan *newChannelMsg
// activeMsgStreams is a map from channel id to the channel streams that
// proxy messages to individual, active links.
activeMsgStreams map[lnwire.ChannelID]*msgStream
// activeChanCloses is a map that keep track of all the active
// cooperative channel closures that are active. Any channel closing
// messages are directed to one of these active state machines. Once
@ -253,6 +257,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
activeChannels: make(map[lnwire.ChannelID]*lnwallet.LightningChannel),
newChannels: make(chan *newChannelMsg, 1),
activeMsgStreams: make(map[lnwire.ChannelID]*msgStream),
activeChanCloses: make(map[lnwire.ChannelID]*channelCloser),
localCloseChanReqs: make(chan *htlcswitch.ChanClose),
linkFailures: make(chan linkFailureReport),
@ -901,33 +907,7 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
// active goroutine dedicated to this channel.
if chanLink == nil {
link, err := p.server.htlcSwitch.GetLink(cid)
switch {
// If we failed to find the link in question,
// and the message received was a channel sync
// message, then this might be a peer trying to
// resync closed channel. In this case we'll
// try to resend our last channel sync message,
// such that the peer can recover funds from
// the closed channel.
case err != nil && isChanSyncMsg:
peerLog.Debugf("Unable to find "+
"link(%v) to handle channel "+
"sync, attempting to resend "+
"last ChanSync message", cid)
err := p.resendChanSyncMsg(cid)
if err != nil {
// TODO(halseth): send error to
// peer?
peerLog.Errorf(
"resend failed: %v",
err,
)
}
return
case err != nil:
if err != nil {
peerLog.Errorf("recv'd update for "+
"unknown channel %v from %v: "+
"%v", cid, p, err)
@ -990,8 +970,6 @@ func (p *peer) readHandler() {
discStream := newDiscMsgStream(p)
discStream.Start()
defer discStream.Stop()
chanMsgStreams := make(map[lnwire.ChannelID]*msgStream)
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, err := p.readNextMessage()
@ -1038,8 +1016,8 @@ out:
}
var (
isChanUpdate bool
targetChan lnwire.ChannelID
isLinkUpdate bool
)
switch msg := nextMsg.(type) {
@ -1081,64 +1059,31 @@ out:
}
case *lnwire.Error:
key := p.addr.IdentityKey
targetChan = msg.ChanID
isLinkUpdate = p.handleError(msg)
switch {
// In the case of an all-zero channel ID we want to
// forward the error to all channels with this peer.
case msg.ChanID == lnwire.ConnectionWideID:
for chanID, chanStream := range chanMsgStreams {
chanStream.AddMsg(nextMsg)
case *lnwire.ChannelReestablish:
targetChan = msg.ChanID
isLinkUpdate = p.isActiveChannel(targetChan)
// Also marked this channel as failed,
// so we won't try to restart it on
// reconnect with this peer.
p.failedChannels[chanID] = struct{}{}
// If we failed to find the link in question, and the
// message received was a channel sync message, then
// this might be a peer trying to resync closed channel.
// In this case we'll try to resend our last channel
// sync message, such that the peer can recover funds
// from the closed channel.
if !isLinkUpdate {
err := p.resendChanSyncMsg(targetChan)
if err != nil {
// TODO(halseth): send error to peer?
peerLog.Errorf("resend failed: %v",
err)
}
// If the channel ID for the error message corresponds
// to a pending channel, then the funding manager will
// handle the error.
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key):
p.server.fundingMgr.processFundingError(msg, key)
// If not we hand the error to the channel link for
// this channel.
default:
isChanUpdate = true
targetChan = msg.ChanID
// Also marked this channel as failed, so we
// won't try to restart it on reconnect with
// this peer.
p.failedChannels[targetChan] = struct{}{}
}
// TODO(roasbeef): create ChanUpdater interface for the below
case *lnwire.UpdateAddHTLC:
isChanUpdate = true
targetChan = msg.ChanID
case *lnwire.UpdateFulfillHTLC:
isChanUpdate = true
targetChan = msg.ChanID
case *lnwire.UpdateFailMalformedHTLC:
isChanUpdate = true
targetChan = msg.ChanID
case *lnwire.UpdateFailHTLC:
isChanUpdate = true
targetChan = msg.ChanID
case *lnwire.RevokeAndAck:
isChanUpdate = true
targetChan = msg.ChanID
case *lnwire.CommitSig:
isChanUpdate = true
targetChan = msg.ChanID
case *lnwire.UpdateFee:
isChanUpdate = true
targetChan = msg.ChanID
case *lnwire.ChannelReestablish:
isChanUpdate = true
targetChan = msg.ChanID
case LinkUpdater:
targetChan = msg.TargetChanID()
isLinkUpdate = p.isActiveChannel(targetChan)
case *lnwire.ChannelUpdate,
*lnwire.ChannelAnnouncement,
@ -1157,16 +1102,16 @@ out:
"%v", uint16(msg.MsgType()), p)
}
if isChanUpdate {
if isLinkUpdate {
// If this is a channel update, then we need to feed it
// into the channel's in-order message stream.
chanStream, ok := chanMsgStreams[targetChan]
chanStream, ok := p.activeMsgStreams[targetChan]
if !ok {
// If a stream hasn't yet been created, then
// we'll do so, add it to the map, and finally
// start it.
chanStream = newChanMsgStream(p, targetChan)
chanMsgStreams[targetChan] = chanStream
p.activeMsgStreams[targetChan] = chanStream
chanStream.Start()
defer chanStream.Stop()
}
@ -1184,6 +1129,54 @@ out:
peerLog.Tracef("readHandler for peer %v done", p)
}
// isActiveChannel returns true if the provided channel id is active, otherwise
// returns false.
func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool {
p.activeChanMtx.RLock()
_, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock()
return ok
}
// handleError processes an error message read from the remote peer. The boolean
// returns indicates whether the message should be delivered to a targeted peer.
//
// NOTE: This method should only be called from within the readHandler.
func (p *peer) handleError(msg *lnwire.Error) bool {
key := p.addr.IdentityKey
switch {
// In the case of an all-zero channel ID we want to forward the error to
// all channels with this peer.
case msg.ChanID == lnwire.ConnectionWideID:
for chanID, chanStream := range p.activeMsgStreams {
chanStream.AddMsg(msg)
// Also marked this channel as failed, so we won't try
// to restart it on reconnect with this peer.
p.failedChannels[chanID] = struct{}{}
}
return false
// If the channel ID for the error message corresponds to a pending
// channel, then the funding manager will handle the error.
case p.server.fundingMgr.IsPendingChannel(msg.ChanID, key):
p.server.fundingMgr.processFundingError(msg, key)
return false
// If not we hand the error to the channel link for this channel.
case p.isActiveChannel(msg.ChanID):
// Mark this channel as failed, so we won't try to restart it on
// reconnect with this peer.
p.failedChannels[msg.ChanID] = struct{}{}
return true
default:
return false
}
}
// messageSummary returns a human-readable string that summarizes a
// incoming/outgoing message. Not all messages will have a summary, only those
// which have additional data that can be informative at a glance.
@ -2407,6 +2400,11 @@ func (p *peer) resendChanSyncMsg(cid lnwire.ChannelID) error {
cid)
}
if !c.RemotePub.IsEqual(p.IdentityKey()) {
return fmt.Errorf("ignoring channel reestablish from "+
"peer=%x", p.IdentityKey())
}
peerLog.Debugf("Re-sending channel sync message for channel %v to "+
"peer %v", cid, p)
@ -2543,4 +2541,12 @@ func (p *peer) StartTime() time.Time {
return p.startTime
}
// LinkUpdater is an interface implemented by most messages in BOLT 2 that are
// allowed to update the channel state.
type LinkUpdater interface {
// TargetChanID returns the channel id of the link for which this
// message is intended.
TargetChanID() lnwire.ChannelID
}
// TODO(roasbeef): make all start/stop mutexes a CAS