From 9b29fa3a52672094224a2f1b12ba9ab0f593cece Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 12 Jul 2016 17:38:09 -0700 Subject: [PATCH] lnd: use channel barriers to synchronize on-chain events and a peer's readHandler --- fundingmanager.go | 11 ++++++++--- peer.go | 27 ++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 17007270..f839ac05 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -406,8 +406,6 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { return } - // TODO(roasbeef): create new chan barrier - // Now that we have their contribution, we can extract, then send over // both the funding out point and our signature for their version of // the commitment transaction to the remote peer. @@ -419,6 +417,10 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { return } + // Register a new barrier for this channel to properly synchronize with + // the peer's readHandler once the channel is open. + fmsg.peer.barrierInits <- *outPoint + fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)", outPoint, msg.ChannelID) @@ -477,7 +479,10 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) { return } - // TODO(roasbeef): create new chan barrier + // Register a new barrier for this channel to properly synchronize with + // the peer's readHandler once the channel is open. + fmsg.peer.barrierInits <- *fundingOut + fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)", fmsg.msg.ChannelID, fundingOut) diff --git a/peer.go b/peer.go index 58b8f3a1..f052956a 100644 --- a/peer.go +++ b/peer.go @@ -107,13 +107,12 @@ type peer struct { // will be signalled once the channel is fully open. This barrier acts // as a synchronization point for any incoming/outgoing HTLCs before // the channel has been fully opened. - // TODO(roasbeef): barrier to sync chan open and handling of first htlc - // message. + barrierMtx sync.RWMutex newChanBarriers map[wire.OutPoint]chan struct{} + barrierInits chan wire.OutPoint // newChannels is used by the fundingManager to send fully opened // channels to the source peer which handled the funding workflow. - // TODO(roasbeef): barrier to block until chan open before update newChannels chan *lnwallet.LightningChannel // localCloseChanReqs is a channel in which any local requests to @@ -161,6 +160,7 @@ func newPeer(conn net.Conn, server *server, net wire.BitcoinNet, inbound bool) ( sendQueue: make(chan outgoinMsg, 1), outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), + barrierInits: make(chan wire.OutPoint), newChanBarriers: make(map[wire.OutPoint]chan struct{}), activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), htlcManagers: make(map[wire.OutPoint]chan lnwire.Message), @@ -473,11 +473,16 @@ out: snapshots = append(snapshots, snapshot) } req.resp <- snapshots + case pendingChanPoint := <-p.barrierInits: + p.barrierMtx.Lock() + peerLog.Tracef("Creating chan barrier for "+ + "ChannelPoint(%v)", pendingChanPoint) + p.newChanBarriers[pendingChanPoint] = make(chan struct{}) + p.barrierMtx.Unlock() case newChan := <-p.newChannels: chanPoint := *newChan.ChannelPoint() p.activeChannels[chanPoint] = newChan - // TODO(roasbeef): signal channel barrier peerLog.Infof("New channel active ChannelPoint(%v) "+ "with peerId(%v)", chanPoint, p.id) @@ -485,12 +490,24 @@ out: // Switch of a new active link. chanSnapShot := newChan.StateSnapshot() downstreamLink := make(chan lnwire.Message) - p.server.htlcSwitch.RegisterLink(p, chanSnapShot, downstreamLink) + plexChan := p.server.htlcSwitch.RegisterLink(p, + chanSnapShot, downstreamLink) + // With the channel registered to the HtlcSwitch spawn + // a goroutine to handle commitment updates for this + // new channel. upstreamLink := make(chan lnwire.Message) p.htlcManagers[chanPoint] = upstreamLink p.wg.Add(1) go p.htlcManager(newChan, downstreamLink, upstreamLink) + // Close the active channel barrier signalling the + // readHandler that commitment related modifications to + // this channel can now proceed. + p.barrierMtx.Lock() + peerLog.Tracef("Closing chan barrier for ChannelPoint(%v)", chanPoint) + close(p.newChanBarriers[chanPoint]) + delete(p.newChanBarriers, chanPoint) + p.barrierMtx.Unlock() case req := <-p.localCloseChanReqs: p.handleLocalClose(req) case req := <-p.remoteCloseChanReqs: