diff --git a/peer.go b/peer.go index 4368f71b..df77c4fd 100644 --- a/peer.go +++ b/peer.go @@ -160,6 +160,13 @@ type peer struct { // activeChannels is a map which stores the state machines of all // active channels. Channels are indexed into the map by the txid of // the funding transaction which opened the channel. + // + // NOTE: On startup, pending channels are stored as nil in this map. + // Confirmed channels have channel data populated in the map. This means + // that accesses to this map should nil-check the LightningChannel to + // see if this is a pending channel or not. The tradeoff here is either + // having two maps everywhere (one for pending, one for confirmed chans) + // or having an extra nil-check per access. activeChannels map[lnwire.ChannelID]*lnwallet.LightningChannel // addedChannels tracks any new channels opened during this peer's @@ -564,9 +571,21 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( peerLog.Tracef("Using link policy of: %v", spew.Sdump(forwardingPolicy)) - // Register this new channel link with the HTLC Switch. This is - // necessary to properly route multi-hop payments, and forward - // new payments triggered by RPC clients. + // If the channel is pending, set the value to nil in the + // activeChannels map. This is done to signify that the channel is + // pending. We don't add the link to the switch here - it's the funding + // manager's responsibility to spin up pending channels. Adding them + // here would just be extra work as we'll tear them down when creating + // + adding the final link. + if lnChan.IsPending() { + p.activeChanMtx.Lock() + p.activeChannels[chanID] = nil + p.activeChanMtx.Unlock() + + continue + } + + // Subscribe to the set of on-chain events for this channel. chainEvents, err := p.server.chainArb.SubscribeChannelEvents( *chanPoint, ) @@ -574,7 +593,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) ( return nil, err } - // Create the link and add it to the switch. err = p.addLink( chanPoint, lnChan, forwardingPolicy, chainEvents, currentHeight, true, @@ -927,6 +945,69 @@ func (ms *msgStream) AddMsg(msg lnwire.Message) { ms.msgCond.Signal() } +// waitUntilLinkActive waits until the target link is active and returns a +// ChannelLink to pass messages to. It accomplishes this by subscribing to +// an ActiveLinkEvent which is emitted by the link when it first starts up. +func waitUntilLinkActive(p *peer, cid lnwire.ChannelID) htlcswitch.ChannelLink { + // Subscribe to receive channel events. + // + // NOTE: If the link is already active by SubscribeChannelEvents, then + // GetLink will retrieve the link and we can send messages. If the link + // becomes active between SubscribeChannelEvents and GetLink, then GetLink + // will retrieve the link. If the link becomes active after GetLink, then + // we will get an ActiveLinkEvent notification and retrieve the link. If + // the call to GetLink is before SubscribeChannelEvents, however, there + // will be a race condition. + sub, err := p.server.channelNotifier.SubscribeChannelEvents() + if err != nil { + // If we have a non-nil error, then the server is shutting down and we + // can exit here and return nil. This means no message will be delivered + // to the link. + return nil + } + defer sub.Cancel() + + // The link may already be active by this point, and we may have missed the + // ActiveLinkEvent. Check if the link exists. + link, _ := p.server.htlcSwitch.GetLink(cid) + if link != nil { + return link + } + + // If the link is nil, we must wait for it to be active. + for { + select { + // A new event has been sent by the ChannelNotifier. We first check + // whether the event is an ActiveLinkEvent. If it is, we'll check + // that the event is for this channel. Otherwise, we discard the + // message. + case e := <-sub.Updates(): + event, ok := e.(channelnotifier.ActiveLinkEvent) + if !ok { + // Ignore this notification. + continue + } + + chanPoint := event.ChannelPoint + + // Check whether the retrieved chanPoint matches the target + // channel id. + if !cid.IsChanPoint(chanPoint) { + continue + } + + // The link shouldn't be nil as we received an + // ActiveLinkEvent. If it is nil, we return nil and the + // calling function should catch it. + link, _ = p.server.htlcSwitch.GetLink(cid) + return link + + case <-p.quit: + return nil + } + } +} + // newChanMsgStream is used to create a msgStream between the peer and // particular channel link in the htlcswitch. We utilize additional // synchronization with the fundingManager to ensure we don't attempt to @@ -942,51 +1023,17 @@ func newChanMsgStream(p *peer, cid lnwire.ChannelID) *msgStream { fmt.Sprintf("Update stream for ChannelID(%x) exiting", cid[:]), 1000, func(msg lnwire.Message) { - _, isChanSyncMsg := msg.(*lnwire.ChannelReestablish) - - // If this is the chanSync message, then we'll deliver - // it immediately to the active link. - if !isChanSyncMsg { - // We'll send a message to the funding manager - // and wait iff an active funding process for - // this channel hasn't yet completed. We do - // this in order to account for the following - // scenario: we send the funding locked message - // to the other side, they immediately send a - // channel update message, but we haven't yet - // sent the channel to the channelManager. - err := p.server.fundingMgr.waitUntilChannelOpen( - cid, p.quit, - ) - if err != nil { - // If we have a non-nil error, then the - // funding manager is shutting down, s - // we can exit here without attempting - // to deliver the message. - return - } - } - - // In order to avoid unnecessarily delivering message - // as the peer is exiting, we'll check quickly to see - // if we need to exit. - select { - case <-p.quit: - return - default: - } - - // Dispatch the commitment update message to the proper - // active goroutine dedicated to this channel. + // This check is fine because if the link no longer exists, it will + // be removed from the activeChannels map and subsequent messages + // shouldn't reach the chan msg stream. if chanLink == nil { - link, err := p.server.htlcSwitch.GetLink(cid) - if err != nil { - peerLog.Errorf("recv'd update for "+ - "unknown channel %v from %v: "+ - "%v", cid, p, err) + chanLink = waitUntilLinkActive(p, cid) + + // If the link is still not active and the calling function + // errored out, just return. + if chanLink == nil { return } - chanLink = link } // In order to avoid unnecessarily delivering message @@ -1229,13 +1276,23 @@ func (p *peer) isActiveChannel(chanID lnwire.ChannelID) bool { // channel with the peer to mitigate dos attack vectors where a peer costlessly // connects to us and spams us with errors. func (p *peer) storeError(err error) { + var haveChannels bool + p.activeChanMtx.RLock() - channelCount := len(p.activeChannels) + for _, channel := range p.activeChannels { + // Pending channels will be nil in the activeChannels map. + if channel == nil { + continue + } + + haveChannels = true + break + } p.activeChanMtx.RUnlock() // If we do not have any active channels with the peer, we do not store // errors as a dos mitigation. - if channelCount == 0 { + if !haveChannels { peerLog.Tracef("no channels with peer: %v, not storing err", p) return } @@ -1777,6 +1834,11 @@ func (p *peer) ChannelSnapshots() []*channeldb.ChannelSnapshot { snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels)) for _, activeChan := range p.activeChannels { + // If the activeChan is nil, then we skip it as the channel is pending. + if activeChan == nil { + continue + } + // We'll only return a snapshot for channels that are // *immedately* available for routing payments over. if activeChan.RemoteNextRevocation() == nil { @@ -1829,9 +1891,12 @@ out: chanPoint := &newChan.FundingOutpoint chanID := lnwire.NewChanIDFromOutPoint(chanPoint) - // Make sure this channel is not already active. + // Only update RemoteNextRevocation if the channel is in the + // activeChannels map and if we added the link to the switch. + // Only active channels will be added to the switch. p.activeChanMtx.Lock() - if currentChan, ok := p.activeChannels[chanID]; ok { + currentChan, ok := p.activeChannels[chanID] + if ok && currentChan != nil { peerLog.Infof("Already have ChannelPoint(%v), "+ "ignoring.", chanPoint) @@ -1877,6 +1942,8 @@ out: continue } + // This refreshes the activeChannels entry if the link was not in + // the switch, also populates for new entries. p.activeChannels[chanID] = lnChan p.addedChannels[chanID] = struct{}{} p.activeChanMtx.Unlock() @@ -1925,10 +1992,17 @@ out: TimeLockDelta: defaultPolicy.TimeLockDelta, } + // If we've reached this point, there are two possible scenarios. + // If the channel was in the active channels map as nil, then it + // was loaded from disk and we need to send reestablish. Else, + // it was not loaded from disk and we don't need to send + // reestablish as this is a fresh channel. + shouldReestablish := ok + // Create the link and add it to the switch. err = p.addLink( chanPoint, lnChan, forwardingPolicy, - chainEvents, currentHeight, false, + chainEvents, currentHeight, shouldReestablish, ) if err != nil { err := fmt.Errorf("can't register new channel "+ @@ -2047,6 +2121,11 @@ out: // our active channel back to their default state. p.activeChanMtx.Lock() for _, channel := range p.activeChannels { + // If the channel is nil, continue as it's a pending channel. + if channel == nil { + continue + } + channel.ResetState() } p.activeChanMtx.Unlock() @@ -2066,6 +2145,11 @@ func (p *peer) reenableActiveChannels() { var activePublicChans []wire.OutPoint p.activeChanMtx.RLock() for chanID, lnChan := range p.activeChannels { + // If the lnChan is nil, continue as this is a pending channel. + if lnChan == nil { + continue + } + dbChan := lnChan.State() isPublic := dbChan.ChannelFlags&lnwire.FFAnnounceChannel != 0 if !isPublic || dbChan.IsPending { @@ -2109,7 +2193,10 @@ func (p *peer) fetchActiveChanCloser(chanID lnwire.ChannelID) (*channelCloser, e p.activeChanMtx.RLock() channel, ok := p.activeChannels[chanID] p.activeChanMtx.RUnlock() - if !ok { + + // If the channel isn't in the map or the channel is nil, return + // ErrChannelNotFound as the channel is pending. + if !ok || channel == nil { return nil, ErrChannelNotFound } @@ -2218,7 +2305,10 @@ func (p *peer) handleLocalCloseReq(req *htlcswitch.ChanClose) { p.activeChanMtx.RLock() channel, ok := p.activeChannels[chanID] p.activeChanMtx.RUnlock() - if !ok { + + // Though this function can't be called for pending channels, we still + // check whether channel is nil for safety. + if !ok || channel == nil { err := fmt.Errorf("unable to close channel, ChannelID(%v) is "+ "unknown", chanID) peerLog.Errorf(err.Error())