From d98cac432bac262d24a6bce2e22e01ce78964589 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 17 Nov 2016 18:43:33 -0800 Subject: [PATCH] peer: ensure access to activeChannels and htlcManagers is thread safe --- peer.go | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/peer.go b/peer.go index 81940047..bee03b51 100644 --- a/peer.go +++ b/peer.go @@ -105,9 +105,11 @@ type peer struct { // activeChannels is a map which stores the state machines of all // active channels. Channels are indexed into the map by the txid of // the funding transaction which opened the channel. + activeChanMtx sync.RWMutex activeChannels map[wire.OutPoint]*lnwallet.LightningChannel chanSnapshotReqs chan *chanSnapshotReq + htlcManMtx sync.RWMutex htlcManagers map[wire.OutPoint]chan lnwire.Message // newChanBarriers is a map from a channel point to a 'barrier' which @@ -225,7 +227,10 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { Hash: chanID.Hash, Index: chanID.Index, } + p.activeChanMtx.Lock() p.activeChannels[chanPoint] = lnChan + p.activeChanMtx.Unlock() + peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint) // Notify the routing table of this newly loaded channel. @@ -249,7 +254,10 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { dbChan.Snapshot(), downstreamLink) upstreamLink := make(chan lnwire.Message, 10) + p.htlcManMtx.Lock() p.htlcManagers[chanPoint] = upstreamLink + p.htlcManMtx.Unlock() + p.wg.Add(1) go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink) } @@ -440,7 +448,9 @@ out: // Dispatch the commitment update message to the proper // active goroutine dedicated to this channel. + p.htlcManMtx.Lock() targetChan, ok := p.htlcManagers[*targetChan] + p.htlcManMtx.Unlock() if !ok { peerLog.Errorf("recv'd update for unknown channel %v", targetChan) @@ -623,11 +633,13 @@ out: for { select { case req := <-p.chanSnapshotReqs: + p.activeChanMtx.RLock() snapshots := make([]*channeldb.ChannelSnapshot, 0, len(p.activeChannels)) for _, activeChan := range p.activeChannels { snapshot := activeChan.StateSnapshot() snapshots = append(snapshots, snapshot) } + p.activeChanMtx.RUnlock() req.resp <- snapshots case pendingChanPoint := <-p.barrierInits: @@ -644,7 +656,10 @@ out: case newChan := <-p.newChannels: chanPoint := *newChan.ChannelPoint() + + p.activeChanMtx.Lock() p.activeChannels[chanPoint] = newChan + p.activeChanMtx.Unlock() peerLog.Infof("New channel active ChannelPoint(%v) "+ "with peerId(%v)", chanPoint, p.id) @@ -660,7 +675,10 @@ out: // a goroutine to handle commitment updates for this // new channel. upstreamLink := make(chan lnwire.Message, 10) + p.htlcManMtx.Lock() p.htlcManagers[chanPoint] = upstreamLink + p.htlcManMtx.Unlock() + p.wg.Add(1) go p.htlcManager(newChan, plexChan, downstreamLink, upstreamLink) @@ -761,7 +779,9 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { closingTxid *wire.ShaHash ) + p.activeChanMtx.RLock() channel := p.activeChannels[*req.chanPoint] + p.activeChanMtx.RUnlock() if req.forceClose { closingTxid, err = p.executeForceClose(channel) @@ -778,7 +798,8 @@ func (p *peer) handleLocalClose(req *closeLinkReq) { return } - // Update the caller w.r.t the current pending state of this request. + // Update the caller with a new event detailing the current pending + // state of this request. req.updates <- &lnrpc.CloseStatusUpdate{ Update: &lnrpc.CloseStatusUpdate_ClosePending{ ClosePending: &lnrpc.PendingUpdate{ @@ -842,7 +863,10 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { Hash: chanPoint.Hash, Index: chanPoint.Index, } + + p.activeChanMtx.RLock() channel := p.activeChannels[key] + p.activeChanMtx.RUnlock() // Now that we have their signature for the closure transaction, we // can assemble the final closure transaction, complete with our @@ -883,17 +907,26 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error { chanID := channel.ChannelPoint() + p.activeChanMtx.Lock() delete(p.activeChannels, *chanID) + p.activeChanMtx.Unlock() // Instruct the Htlc Switch to close this link as the channel is no // longer active. p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, chanID) + + p.htlcManMtx.RLock() htlcWireLink, ok := p.htlcManagers[*chanID] if !ok { + p.htlcManMtx.RUnlock() return nil } + p.htlcManMtx.RUnlock() + p.htlcManMtx.RLock() delete(p.htlcManagers, *chanID) + p.htlcManMtx.RUnlock() + close(htlcWireLink) if err := channel.DeleteState(); err != nil { @@ -1013,13 +1046,14 @@ out: for { select { case <-channel.UnilateralCloseSignal: - // TODO(roasbeef): eliminate false positive via local close peerLog.Warnf("Remote peer has closed ChannelPoint(%v) on-chain", state.chanPoint) if err := wipeChannel(p, channel); err != nil { peerLog.Errorf("unable to wipe channel %v", err) } + // TODO(roasbeef): send info about current HTLC's to + // utxoNursery break out case <-channel.ForceCloseSignal: peerLog.Warnf("ChannelPoint(%v) has been force "+ @@ -1327,7 +1361,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { for _, htlc := range htlcsToForward { // We don't need to forward any HTLC's that we // just settled above. - // TODO(roasbeef): key by index insteaad? + // TODO(roasbeef): key by index instead? if _, ok := settledPayments[htlc.RHash]; ok { continue }