peer: no pending channels in switch, buffer msgs until link active

This commit is contained in:
nsa 2020-03-06 22:43:51 -05:00
parent 966cd2112c
commit f00159f64c

196
peer.go

@ -160,6 +160,13 @@ type peer struct {
// activeChannels is a map which stores the state machines of all // activeChannels is a map which stores the state machines of all
// active channels. Channels are indexed into the map by the txid of // active channels. Channels are indexed into the map by the txid of
// the funding transaction which opened the channel. // the funding transaction which opened the channel.
//
// NOTE: On startup, pending channels are stored as nil in this map.
// Confirmed channels have channel data populated in the map. This means
// that accesses to this map should nil-check the LightningChannel to
// see if this is a pending channel or not. The tradeoff here is either
// having two maps everywhere (one for pending, one for confirmed chans)
// or having an extra nil-check per access.
activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel
// addedChannels tracks any new channels opened during this peer's // addedChannels tracks any new channels opened during this peer's
@ -564,9 +571,21 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
peerLog.Tracef("Using link policy of: %v", peerLog.Tracef("Using link policy of: %v",
spew.Sdump(forwardingPolicy)) spew.Sdump(forwardingPolicy))
// Register this new channel link with the HTLC Switch. This is // If the channel is pending, set the value to nil in the
// necessary to properly route multi-hop payments, and forward // activeChannels map. This is done to signify that the channel is
// new payments triggered by RPC clients. // pending. We don't add the link to the switch here - it's the funding
// manager's responsibility to spin up pending channels. Adding them
// here would just be extra work as we'll tear them down when creating
// + adding the final link.
if lnChan.IsPending() {
p.activeChanMtx.Lock()
p.activeChannels[chanID] = nil
p.activeChanMtx.Unlock()
continue
}
// Subscribe to the set of on-chain events for this channel.
chainEvents, err := p.server.chainArb.SubscribeChannelEvents( chainEvents, err := p.server.chainArb.SubscribeChannelEvents(
*chanPoint, *chanPoint,
) )
@ -574,7 +593,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) (
return nil, err return nil, err
} }
// Create the link and add it to the switch.
err = p.addLink( err = p.addLink(
chanPoint, lnChan, forwardingPolicy, chainEvents, chanPoint, lnChan, forwardingPolicy, chainEvents,
currentHeight, true, currentHeight, true,
@ -927,6 +945,69 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) {
ms.msgCond.Signal() ms.msgCond.Signal()
} }
// waitUntilLinkActive waits until the target link is active and returns a
// ChannelLink to pass messages to. It accomplishes this by subscribing to
// an ActiveLinkEvent which is emitted by the link when it first starts up.
func waitUntilLinkActive(p *peer, cid lnwire.ChannelID) htlcswitch.ChannelLink {
// Subscribe to receive channel events.
//
// NOTE: If the link is already active by SubscribeChannelEvents, then
// GetLink will retrieve the link and we can send messages. If the link
// becomes active between SubscribeChannelEvents and GetLink, then GetLink
// will retrieve the link. If the link becomes active after GetLink, then
// we will get an ActiveLinkEvent notification and retrieve the link. If
// the call to GetLink is before SubscribeChannelEvents, however, there
// will be a race condition.
sub, err := p.server.channelNotifier.SubscribeChannelEvents()
if err != nil {
// If we have a non-nil error, then the server is shutting down and we
// can exit here and return nil. This means no message will be delivered
// to the link.
return nil
}
defer sub.Cancel()
// The link may already be active by this point, and we may have missed the
// ActiveLinkEvent. Check if the link exists.
link, _ := p.server.htlcSwitch.GetLink(cid)
if link != nil {
return link
}
// If the link is nil, we must wait for it to be active.
for {
select {
// A new event has been sent by the ChannelNotifier. We first check
// whether the event is an ActiveLinkEvent. If it is, we'll check
// that the event is for this channel. Otherwise, we discard the
// message.
case e := <-sub.Updates():
event, ok := e.(channelnotifier.ActiveLinkEvent)
if !ok {
// Ignore this notification.
continue
}
chanPoint := event.ChannelPoint
// Check whether the retrieved chanPoint matches the target
// channel id.
if !cid.IsChanPoint(chanPoint) {
continue
}
// The link shouldn't be nil as we received an
// ActiveLinkEvent. If it is nil, we return nil and the
// calling function should catch it.
link, _ = p.server.htlcSwitch.GetLink(cid)
return link
case <-p.quit:
return nil
}
}
}
// newChanMsgStream is used to create a msgStream between the peer and // newChanMsgStream is used to create a msgStream between the peer and
// particular channel link in the htlcswitch. We utilize additional // particular channel link in the htlcswitch. We utilize additional
// synchronization with the fundingManager to ensure we don't attempt to // synchronization with the fundingManager to ensure we don't attempt to
@ -942,51 +1023,17 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream {
fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]),
1000, 1000,
func(msg lnwire.Message) { func(msg lnwire.Message) {
_, isChanSyncMsg := msg.(*lnwire.ChannelReestablish) // This check is fine because if the link no longer exists, it will
// be removed from the activeChannels map and subsequent messages
// If this is the chanSync message, then we'll deliver // shouldn't reach the chan msg stream.
// it immediately to the active link.
if !isChanSyncMsg {
// We'll send a message to the funding manager
// and wait iff an active funding process for
// this channel hasn't yet completed. We do
// this in order to account for the following
// scenario: we send the funding locked message
// to the other side, they immediately send a
// channel update message, but we haven't yet
// sent the channel to the channelManager.
err := p.server.fundingMgr.waitUntilChannelOpen(
cid, p.quit,
)
if err != nil {
// If we have a non-nil error, then the
// funding manager is shutting down, s
// we can exit here without attempting
// to deliver the message.
return
}
}
// In order to avoid unnecessarily delivering message
// as the peer is exiting, we'll check quickly to see
// if we need to exit.
select {
case <-p.quit:
return
default:
}
// Dispatch the commitment update message to the proper
// active goroutine dedicated to this channel.
if chanLink == nil { if chanLink == nil {
link, err := p.server.htlcSwitch.GetLink(cid) chanLink = waitUntilLinkActive(p, cid)
if err != nil {
peerLog.Errorf("recv'd update for "+ // If the link is still not active and the calling function
"unknown channel %v from %v: "+ // errored out, just return.
"%v", cid, p, err) if chanLink == nil {
return return
} }
chanLink = link
} }
// In order to avoid unnecessarily delivering message // In order to avoid unnecessarily delivering message
@ -1229,13 +1276,23 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool {
// channel with the peer to mitigate dos attack vectors where a peer costlessly // channel with the peer to mitigate dos attack vectors where a peer costlessly
// connects to us and spams us with errors. // connects to us and spams us with errors.
func (p *peer) storeError(err error) { func (p *peer) storeError(err error) {
var haveChannels bool
p.activeChanMtx.RLock() p.activeChanMtx.RLock()
channelCount := len(p.activeChannels) for _, channel := range p.activeChannels {
// Pending channels will be nil in the activeChannels map.
if channel == nil {
continue
}
haveChannels = true
break
}
p.activeChanMtx.RUnlock() p.activeChanMtx.RUnlock()
// If we do not have any active channels with the peer, we do not store // If we do not have any active channels with the peer, we do not store
// errors as a dos mitigation. // errors as a dos mitigation.
if channelCount == 0 { if !haveChannels {
peerLog.Tracef("no channels with peer: %v, not storing err", p) peerLog.Tracef("no channels with peer: %v, not storing err", p)
return return
} }
@ -1777,6 +1834,11 @@ func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot {
snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels)) snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels))
for _, activeChan := range p.activeChannels { for _, activeChan := range p.activeChannels {
// If the activeChan is nil, then we skip it as the channel is pending.
if activeChan == nil {
continue
}
// We'll only return a snapshot for channels that are // We'll only return a snapshot for channels that are
// *immedately* available for routing payments over. // *immedately* available for routing payments over.
if activeChan.RemoteNextRevocation() == nil { if activeChan.RemoteNextRevocation() == nil {
@ -1829,9 +1891,12 @@ out:
chanPoint := &newChan.FundingOutpoint chanPoint := &newChan.FundingOutpoint
chanID := lnwire.NewChanIDFromOutPoint(chanPoint) chanID := lnwire.NewChanIDFromOutPoint(chanPoint)
// Make sure this channel is not already active. // Only update RemoteNextRevocation if the channel is in the
// activeChannels map and if we added the link to the switch.
// Only active channels will be added to the switch.
p.activeChanMtx.Lock() p.activeChanMtx.Lock()
if currentChan, ok := p.activeChannels[chanID]; ok { currentChan, ok := p.activeChannels[chanID]
if ok && currentChan != nil {
peerLog.Infof("Already have ChannelPoint(%v), "+ peerLog.Infof("Already have ChannelPoint(%v), "+
"ignoring.", chanPoint) "ignoring.", chanPoint)
@ -1877,6 +1942,8 @@ out:
continue continue
} }
// This refreshes the activeChannels entry if the link was not in
// the switch, also populates for new entries.
p.activeChannels[chanID] = lnChan p.activeChannels[chanID] = lnChan
p.addedChannels[chanID] = struct{}{} p.addedChannels[chanID] = struct{}{}
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
@ -1925,10 +1992,17 @@ out:
TimeLockDelta: defaultPolicy.TimeLockDelta, TimeLockDelta: defaultPolicy.TimeLockDelta,
} }
// If we've reached this point, there are two possible scenarios.
// If the channel was in the active channels map as nil, then it
// was loaded from disk and we need to send reestablish. Else,
// it was not loaded from disk and we don't need to send
// reestablish as this is a fresh channel.
shouldReestablish := ok
// Create the link and add it to the switch. // Create the link and add it to the switch.
err = p.addLink( err = p.addLink(
chanPoint, lnChan, forwardingPolicy, chanPoint, lnChan, forwardingPolicy,
chainEvents, currentHeight, false, chainEvents, currentHeight, shouldReestablish,
) )
if err != nil { if err != nil {
err := fmt.Errorf("can't register new channel "+ err := fmt.Errorf("can't register new channel "+
@ -2047,6 +2121,11 @@ out:
// our active channel back to their default state. // our active channel back to their default state.
p.activeChanMtx.Lock() p.activeChanMtx.Lock()
for _, channel := range p.activeChannels { for _, channel := range p.activeChannels {
// If the channel is nil, continue as it's a pending channel.
if channel == nil {
continue
}
channel.ResetState() channel.ResetState()
} }
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
@ -2066,6 +2145,11 @@ func (p *peer) reenableActiveChannels() {
var activePublicChans []wire.OutPoint var activePublicChans []wire.OutPoint
p.activeChanMtx.RLock() p.activeChanMtx.RLock()
for chanID, lnChan := range p.activeChannels { for chanID, lnChan := range p.activeChannels {
// If the lnChan is nil, continue as this is a pending channel.
if lnChan == nil {
continue
}
dbChan := lnChan.State() dbChan := lnChan.State()
isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0
if !isPublic || dbChan.IsPending { if !isPublic || dbChan.IsPending {
@ -2109,7 +2193,10 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e
p.activeChanMtx.RLock() p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID] channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock() p.activeChanMtx.RUnlock()
if !ok {
// If the channel isn't in the map or the channel is nil, return
// ErrChannelNotFound as the channel is pending.
if !ok || channel == nil {
return nil, ErrChannelNotFound return nil, ErrChannelNotFound
} }
@ -2218,7 +2305,10 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) {
p.activeChanMtx.RLock() p.activeChanMtx.RLock()
channel, ok := p.activeChannels[chanID] channel, ok := p.activeChannels[chanID]
p.activeChanMtx.RUnlock() p.activeChanMtx.RUnlock()
if !ok {
// Though this function can't be called for pending channels, we still
// check whether channel is nil for safety.
if !ok || channel == nil {
err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+ err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+
"unknown", chanID) "unknown", chanID)
peerLog.Errorf(err.Error()) peerLog.Errorf(err.Error())