diff --git a/fundingmanager.go b/fundingmanager.go index bf0da77b..abded36d 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -99,6 +99,11 @@ type fundingErrorMsg struct { peerAddress *lnwire.NetAddress } +type newChannelReq struct { + channel *lnwallet.LightningChannel + done chan struct{} +} + // pendingChannels is a map instantiated per-peer which tracks all active // pending single funded channels indexed by their pending channel identifier. type pendingChannels map[uint64]*reservationWithCtx @@ -176,6 +181,13 @@ type fundingManager struct { // requests from a local subsystem within the daemon. fundingRequests chan *initFundingMsg + // newChanBarriers is a map from a channel point to a 'barrier' which + // will be signalled once the channel is fully open. This barrier acts + // as a synchronization point for any incoming/outgoing HTLCs before + // the channel has been fully opened. + barrierMtx sync.RWMutex + newChanBarriers map[wire.OutPoint]chan struct{} + fakeProof *channelProof quit chan struct{} @@ -202,6 +214,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) { }, activeReservations: make(map[serializedPubKey]pendingChannels), + newChanBarriers: make(map[wire.OutPoint]chan struct{}), fundingMsgs: make(chan interface{}, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize), queries: make(chan interface{}, 1), @@ -619,16 +632,15 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { return } - // Register a new barrier for this channel to properly synchronize with - // the peer's readHandler once the channel is open. - peer, err := f.cfg.FindPeer(peerKey) - if err != nil { - fndgLog.Errorf("Error finding peer: %v", err) - cancelReservation() - resCtx.err <- err - return - } - peer.barrierInits <- *outPoint + // A new channel has almost finished the funding process. In order to + // properly synchronize with the writeHandler goroutine, we add a new + // channel to the barriers map which will be closed once the channel is + // fully open. + f.barrierMtx.Lock() + fndgLog.Debugf("Creating chan barrier for "+ + "ChannelPoint(%v)", outPoint) + f.newChanBarriers[*outPoint] = make(chan struct{}) + f.barrierMtx.Unlock() fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)", outPoint, chanID) @@ -713,15 +725,15 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) { return } - // Register a new barrier for this channel to properly synchronize with - // the peer's readHandler once the channel is open. - peer, err := f.cfg.FindPeer(peerKey) - if err != nil { - fndgLog.Errorf("Error finding peer: %v", err) - cancelReservation() - return - } - peer.barrierInits <- fundingOut + // A new channel has almost finished the funding process. In order to + // properly synchronize with the writeHandler goroutine, we add a new + // channel to the barriers map which will be closed once the channel is + // fully open. + f.barrierMtx.Lock() + fndgLog.Debugf("Creating chan barrier for "+ + "ChannelPoint(%v)", fundingOut) + f.newChanBarriers[fundingOut] = make(chan struct{}) + f.barrierMtx.Unlock() fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)", chanID, fundingOut) @@ -921,7 +933,23 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) fndgLog.Errorf("Error finding peer: %v", err) return } - peer.newChannels <- openChanDetails.Channel + newChanDone := make(chan struct{}) + newChanReq := &newChannelReq{ + channel: openChanDetails.Channel, + done: newChanDone, + } + peer.newChannels <- newChanReq + + <-newChanDone + + // Close the active channel barrier signalling the readHandler + // that commitment related modifications to this channel can + // now proceed. + f.barrierMtx.Lock() + fndgLog.Debugf("Closing chan barrier for ChannelPoint(%v)", fundingPoint) + close(f.newChanBarriers[*fundingPoint]) + delete(f.newChanBarriers, *fundingPoint) + f.barrierMtx.Unlock() // Afterwards we send the breach arbiter the new channel so it // can watch for attempts to breach the channel's contract by @@ -1052,8 +1080,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // counterparty for attempting to cheat us. f.cfg.ArbiterChan <- openChan - // Finally, notify the target peer of the newly open channel. - peer.newChannels <- openChan + // Finally, notify the target peer of the newly opened channel. + newChanDone := make(chan struct{}) + newChanReq := &newChannelReq{ + channel: openChan, + done: newChanDone, + } + peer.newChannels <- newChanReq + + <-newChanDone + + // Close the active channel barrier signalling the readHandler that + // commitment related modifications to this channel can now proceed. + fundingPoint := resCtx.reservation.FundingOutpoint() + f.barrierMtx.Lock() + fndgLog.Debugf("Closing chan barrier for ChannelPoint(%v)", fundingPoint) + close(f.newChanBarriers[*fundingPoint]) + delete(f.newChanBarriers, *fundingPoint) + f.barrierMtx.Unlock() } // initFundingWorkflow sends a message to the funding manager instructing it @@ -1156,6 +1200,26 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { } } +// waitUntilChannelOpen is designed to prevent other lnd subsystems from +// sending new update messages to a channel before the channel is fully +// opened. +func (f *fundingManager) waitUntilChannelOpen(targetChan wire.OutPoint) { + f.barrierMtx.RLock() + barrier, ok := f.newChanBarriers[targetChan] + f.barrierMtx.RUnlock() + if ok { + fndgLog.Tracef("waiting for chan barrier signal for "+ + "ChannelPoint(%v)", targetChan) + select { + case <-barrier: + case <-f.quit: // TODO(roasbeef): add timer? + break + } + fndgLog.Tracef("barrier for ChannelPoint(%v) closed", + targetChan) + } +} + // processErrorGeneric sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric, diff --git a/peer.go b/peer.go index 85336a48..0c94eab8 100644 --- a/peer.go +++ b/peer.go @@ -119,17 +119,9 @@ type peer struct { htlcManMtx sync.RWMutex htlcManagers map[wire.OutPoint]chan lnwire.Message - // newChanBarriers is a map from a channel point to a 'barrier' which - // will be signalled once the channel is fully open. This barrier acts - // as a synchronization point for any incoming/outgoing HTLCs before - // the channel has been fully opened. - barrierMtx sync.RWMutex - newChanBarriers map[wire.OutPoint]chan struct{} - barrierInits chan wire.OutPoint - // newChannels is used by the fundingManager to send fully opened // channels to the source peer which handled the funding workflow. - newChannels chan *lnwallet.LightningChannel + newChannels chan *newChannelReq // localCloseChanReqs is a channel in which any local requests to close // a particular channel are sent over. @@ -190,12 +182,10 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, sendQueue: make(chan outgoinMsg, 1), outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), - barrierInits: make(chan wire.OutPoint), - newChanBarriers: make(map[wire.OutPoint]chan struct{}), activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), htlcManagers: make(map[wire.OutPoint]chan lnwire.Message), chanSnapshotReqs: make(chan *chanSnapshotReq), - newChannels: make(chan *lnwallet.LightningChannel, 1), + newChannels: make(chan *newChannelReq, 1), localCloseChanReqs: make(chan *closeLinkReq), remoteCloseChanReqs: make(chan *lnwire.CloseRequest), @@ -482,27 +472,7 @@ out: } if isChanUpdate { - // We might be receiving an update to a newly funded - // channel in which we were the responder. Therefore - // we need to possibly block until the new channel has - // propagated internally through the system. - // TODO(roasbeef): replace with atomic load from/into - // map? - p.barrierMtx.RLock() - barrier, ok := p.newChanBarriers[targetChan] - p.barrierMtx.RUnlock() - if ok { - peerLog.Tracef("waiting for chan barrier "+ - "signal for ChannelPoint(%v)", targetChan) - select { - case <-barrier: - case <-p.quit: // TODO(roasbeef): add timer? - break out - } - peerLog.Tracef("barrier for ChannelPoint(%v) "+ - "closed", targetChan) - } - + p.server.fundingMgr.waitUntilChannelOpen(targetChan) // Dispatch the commitment update message to the proper // active goroutine dedicated to this channel. p.htlcManMtx.Lock() @@ -746,23 +716,11 @@ out: p.activeChanMtx.RUnlock() req.resp <- snapshots - case pendingChanPoint := <-p.barrierInits: - // A new channel has almost finished the funding - // process. In order to properly synchronize with the - // writeHandler goroutine, we add a new channel to the - // barriers map which will be closed once the channel - // is fully open. - p.barrierMtx.Lock() - peerLog.Tracef("Creating chan barrier for "+ - "ChannelPoint(%v)", pendingChanPoint) - p.newChanBarriers[pendingChanPoint] = make(chan struct{}) - p.barrierMtx.Unlock() - - case newChan := <-p.newChannels: - chanPoint := *newChan.ChannelPoint() + case newChanReq := <-p.newChannels: + chanPoint := *newChanReq.channel.ChannelPoint() p.activeChanMtx.Lock() - p.activeChannels[chanPoint] = newChan + p.activeChannels[chanPoint] = newChanReq.channel p.activeChanMtx.Unlock() peerLog.Infof("New channel active ChannelPoint(%v) "+ @@ -770,7 +728,7 @@ out: // Now that the channel is open, notify the Htlc // Switch of a new active link. - chanSnapShot := newChan.StateSnapshot() + chanSnapShot := newChanReq.channel.StateSnapshot() downstreamLink := make(chan *htlcPacket, 10) plexChan := p.server.htlcSwitch.RegisterLink(p, chanSnapShot, downstreamLink) @@ -784,16 +742,9 @@ out: p.htlcManMtx.Unlock() p.wg.Add(1) - go p.htlcManager(newChan, plexChan, downstreamLink, upstreamLink) + go p.htlcManager(newChanReq.channel, plexChan, downstreamLink, upstreamLink) - // Close the active channel barrier signalling the - // readHandler that commitment related modifications to - // this channel can now proceed. - p.barrierMtx.Lock() - peerLog.Tracef("Closing chan barrier for ChannelPoint(%v)", chanPoint) - close(p.newChanBarriers[chanPoint]) - delete(p.newChanBarriers, chanPoint) - p.barrierMtx.Unlock() + close(newChanReq.done) case req := <-p.localCloseChanReqs: p.handleLocalClose(req)