fundingmanager: moved channel barriers from peer to funding manager.

Because peers may become disconnected or otherwise unavailable,
the channel barriers must be stored and managed within the fundingmanager.
This commit is contained in:
bryanvu 2017-01-23 15:33:46 -08:00 committed by Olaoluwa Osuntokun
parent 3e02ea11ef
commit 0dd6cb99c1
2 changed files with 95 additions and 80 deletions

@ -99,6 +99,11 @@ type fundingErrorMsg struct {
peerAddress *lnwire.NetAddress peerAddress *lnwire.NetAddress
} }
type newChannelReq struct {
channel *lnwallet.LightningChannel
done chan struct{}
}
// pendingChannels is a map instantiated per-peer which tracks all active // pendingChannels is a map instantiated per-peer which tracks all active
// pending single funded channels indexed by their pending channel identifier. // pending single funded channels indexed by their pending channel identifier.
type pendingChannels map[uint64]*reservationWithCtx type pendingChannels map[uint64]*reservationWithCtx
@ -176,6 +181,13 @@ type fundingManager struct {
// requests from a local subsystem within the daemon. // requests from a local subsystem within the daemon.
fundingRequests chan *initFundingMsg fundingRequests chan *initFundingMsg
// newChanBarriers is a map from a channel point to a 'barrier' which
// will be signalled once the channel is fully open. This barrier acts
// as a synchronization point for any incoming/outgoing HTLCs before
// the channel has been fully opened.
barrierMtx sync.RWMutex
newChanBarriers map[wire.OutPoint]chan struct{}
fakeProof *channelProof fakeProof *channelProof
quit chan struct{} quit chan struct{}
@ -202,6 +214,7 @@ func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
}, },
activeReservations: make(map[serializedPubKey]pendingChannels), activeReservations: make(map[serializedPubKey]pendingChannels),
newChanBarriers: make(map[wire.OutPoint]chan struct{}),
fundingMsgs: make(chan interface{}, msgBufferSize), fundingMsgs: make(chan interface{}, msgBufferSize),
fundingRequests: make(chan *initFundingMsg, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize),
queries: make(chan interface{}, 1), queries: make(chan interface{}, 1),
@ -619,16 +632,15 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
return return
} }
// Register a new barrier for this channel to properly synchronize with // A new channel has almost finished the funding process. In order to
// the peer's readHandler once the channel is open. // properly synchronize with the writeHandler goroutine, we add a new
peer, err := f.cfg.FindPeer(peerKey) // channel to the barriers map which will be closed once the channel is
if err != nil { // fully open.
fndgLog.Errorf("Error finding peer: %v", err) f.barrierMtx.Lock()
cancelReservation() fndgLog.Debugf("Creating chan barrier for "+
resCtx.err <- err "ChannelPoint(%v)", outPoint)
return f.newChanBarriers[*outPoint] = make(chan struct{})
} f.barrierMtx.Unlock()
peer.barrierInits <- *outPoint
fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)", outPoint, fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)", outPoint,
chanID) chanID)
@ -713,15 +725,15 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
return return
} }
// Register a new barrier for this channel to properly synchronize with // A new channel has almost finished the funding process. In order to
// the peer's readHandler once the channel is open. // properly synchronize with the writeHandler goroutine, we add a new
peer, err := f.cfg.FindPeer(peerKey) // channel to the barriers map which will be closed once the channel is
if err != nil { // fully open.
fndgLog.Errorf("Error finding peer: %v", err) f.barrierMtx.Lock()
cancelReservation() fndgLog.Debugf("Creating chan barrier for "+
return "ChannelPoint(%v)", fundingOut)
} f.newChanBarriers[fundingOut] = make(chan struct{})
peer.barrierInits <- fundingOut f.barrierMtx.Unlock()
fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)", fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)",
chanID, fundingOut) chanID, fundingOut)
@ -921,7 +933,23 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
fndgLog.Errorf("Error finding peer: %v", err) fndgLog.Errorf("Error finding peer: %v", err)
return return
} }
peer.newChannels <- openChanDetails.Channel newChanDone := make(chan struct{})
newChanReq := &newChannelReq{
channel: openChanDetails.Channel,
done: newChanDone,
}
peer.newChannels <- newChanReq
<-newChanDone
// Close the active channel barrier signalling the readHandler
// that commitment related modifications to this channel can
// now proceed.
f.barrierMtx.Lock()
fndgLog.Debugf("Closing chan barrier for ChannelPoint(%v)", fundingPoint)
close(f.newChanBarriers[*fundingPoint])
delete(f.newChanBarriers, *fundingPoint)
f.barrierMtx.Unlock()
// Afterwards we send the breach arbiter the new channel so it // Afterwards we send the breach arbiter the new channel so it
// can watch for attempts to breach the channel's contract by // can watch for attempts to breach the channel's contract by
@ -1052,8 +1080,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
// counterparty for attempting to cheat us. // counterparty for attempting to cheat us.
f.cfg.ArbiterChan <- openChan f.cfg.ArbiterChan <- openChan
// Finally, notify the target peer of the newly open channel. // Finally, notify the target peer of the newly opened channel.
peer.newChannels <- openChan newChanDone := make(chan struct{})
newChanReq := &newChannelReq{
channel: openChan,
done: newChanDone,
}
peer.newChannels <- newChanReq
<-newChanDone
// Close the active channel barrier signalling the readHandler that
// commitment related modifications to this channel can now proceed.
fundingPoint := resCtx.reservation.FundingOutpoint()
f.barrierMtx.Lock()
fndgLog.Debugf("Closing chan barrier for ChannelPoint(%v)", fundingPoint)
close(f.newChanBarriers[*fundingPoint])
delete(f.newChanBarriers, *fundingPoint)
f.barrierMtx.Unlock()
} }
// initFundingWorkflow sends a message to the funding manager instructing it // initFundingWorkflow sends a message to the funding manager instructing it
@ -1156,6 +1200,26 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
} }
} }
// waitUntilChannelOpen is designed to prevent other lnd subsystems from
// sending new update messages to a channel before the channel is fully
// opened.
func (f *fundingManager) waitUntilChannelOpen(targetChan wire.OutPoint) {
f.barrierMtx.RLock()
barrier, ok := f.newChanBarriers[targetChan]
f.barrierMtx.RUnlock()
if ok {
fndgLog.Tracef("waiting for chan barrier signal for "+
"ChannelPoint(%v)", targetChan)
select {
case <-barrier:
case <-f.quit: // TODO(roasbeef): add timer?
break
}
fndgLog.Tracef("barrier for ChannelPoint(%v) closed",
targetChan)
}
}
// processErrorGeneric sends a message to the fundingManager allowing it to // processErrorGeneric sends a message to the fundingManager allowing it to
// process the occurred generic error. // process the occurred generic error.
func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric, func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric,

67
peer.go

@ -119,17 +119,9 @@ type peer struct {
htlcManMtx sync.RWMutex htlcManMtx sync.RWMutex
htlcManagers map[wire.OutPoint]chan lnwire.Message htlcManagers map[wire.OutPoint]chan lnwire.Message
// newChanBarriers is a map from a channel point to a 'barrier' which
// will be signalled once the channel is fully open. This barrier acts
// as a synchronization point for any incoming/outgoing HTLCs before
// the channel has been fully opened.
barrierMtx sync.RWMutex
newChanBarriers map[wire.OutPoint]chan struct{}
barrierInits chan wire.OutPoint
// newChannels is used by the fundingManager to send fully opened // newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow. // channels to the source peer which handled the funding workflow.
newChannels chan *lnwallet.LightningChannel newChannels chan *newChannelReq
// localCloseChanReqs is a channel in which any local requests to close // localCloseChanReqs is a channel in which any local requests to close
// a particular channel are sent over. // a particular channel are sent over.
@ -190,12 +182,10 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
sendQueue: make(chan outgoinMsg, 1), sendQueue: make(chan outgoinMsg, 1),
outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), outgoingQueue: make(chan outgoinMsg, outgoingQueueLen),
barrierInits: make(chan wire.OutPoint),
newChanBarriers: make(map[wire.OutPoint]chan struct{}),
activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel),
htlcManagers: make(map[wire.OutPoint]chan lnwire.Message), htlcManagers: make(map[wire.OutPoint]chan lnwire.Message),
chanSnapshotReqs: make(chan *chanSnapshotReq), chanSnapshotReqs: make(chan *chanSnapshotReq),
newChannels: make(chan *lnwallet.LightningChannel, 1), newChannels: make(chan *newChannelReq, 1),
localCloseChanReqs: make(chan *closeLinkReq), localCloseChanReqs: make(chan *closeLinkReq),
remoteCloseChanReqs: make(chan *lnwire.CloseRequest), remoteCloseChanReqs: make(chan *lnwire.CloseRequest),
@ -482,27 +472,7 @@ out:
} }
if isChanUpdate { if isChanUpdate {
// We might be receiving an update to a newly funded p.server.fundingMgr.waitUntilChannelOpen(targetChan)
// channel in which we were the responder. Therefore
// we need to possibly block until the new channel has
// propagated internally through the system.
// TODO(roasbeef): replace with atomic load from/into
// map?
p.barrierMtx.RLock()
barrier, ok := p.newChanBarriers[targetChan]
p.barrierMtx.RUnlock()
if ok {
peerLog.Tracef("waiting for chan barrier "+
"signal for ChannelPoint(%v)", targetChan)
select {
case <-barrier:
case <-p.quit: // TODO(roasbeef): add timer?
break out
}
peerLog.Tracef("barrier for ChannelPoint(%v) "+
"closed", targetChan)
}
// Dispatch the commitment update message to the proper // Dispatch the commitment update message to the proper
// active goroutine dedicated to this channel. // active goroutine dedicated to this channel.
p.htlcManMtx.Lock() p.htlcManMtx.Lock()
@ -746,23 +716,11 @@ out:
p.activeChanMtx.RUnlock() p.activeChanMtx.RUnlock()
req.resp <- snapshots req.resp <- snapshots
case pendingChanPoint := <-p.barrierInits: case newChanReq := <-p.newChannels:
// A new channel has almost finished the funding chanPoint := *newChanReq.channel.ChannelPoint()
// process. In order to properly synchronize with the
// writeHandler goroutine, we add a new channel to the
// barriers map which will be closed once the channel
// is fully open.
p.barrierMtx.Lock()
peerLog.Tracef("Creating chan barrier for "+
"ChannelPoint(%v)", pendingChanPoint)
p.newChanBarriers[pendingChanPoint] = make(chan struct{})
p.barrierMtx.Unlock()
case newChan := <-p.newChannels:
chanPoint := *newChan.ChannelPoint()
p.activeChanMtx.Lock() p.activeChanMtx.Lock()
p.activeChannels[chanPoint] = newChan p.activeChannels[chanPoint] = newChanReq.channel
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
peerLog.Infof("New channel active ChannelPoint(%v) "+ peerLog.Infof("New channel active ChannelPoint(%v) "+
@ -770,7 +728,7 @@ out:
// Now that the channel is open, notify the Htlc // Now that the channel is open, notify the Htlc
// Switch of a new active link. // Switch of a new active link.
chanSnapShot := newChan.StateSnapshot() chanSnapShot := newChanReq.channel.StateSnapshot()
downstreamLink := make(chan *htlcPacket, 10) downstreamLink := make(chan *htlcPacket, 10)
plexChan := p.server.htlcSwitch.RegisterLink(p, plexChan := p.server.htlcSwitch.RegisterLink(p,
chanSnapShot, downstreamLink) chanSnapShot, downstreamLink)
@ -784,16 +742,9 @@ out:
p.htlcManMtx.Unlock() p.htlcManMtx.Unlock()
p.wg.Add(1) p.wg.Add(1)
go p.htlcManager(newChan, plexChan, downstreamLink, upstreamLink) go p.htlcManager(newChanReq.channel, plexChan, downstreamLink, upstreamLink)
// Close the active channel barrier signalling the close(newChanReq.done)
// readHandler that commitment related modifications to
// this channel can now proceed.
p.barrierMtx.Lock()
peerLog.Tracef("Closing chan barrier for ChannelPoint(%v)", chanPoint)
close(p.newChanBarriers[chanPoint])
delete(p.newChanBarriers, chanPoint)
p.barrierMtx.Unlock()
case req := <-p.localCloseChanReqs: case req := <-p.localCloseChanReqs:
p.handleLocalClose(req) p.handleLocalClose(req)