routing: use multimutex.Mutex instead of package internal mutex

This commit is contained in:
Johan T. Halseth 2018-01-23 16:24:21 +01:00
parent 0df3ff4994
commit b07f242dc2
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -15,6 +15,7 @@ import (
"github.com/lightningnetwork/lnd/htlcswitch"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/routing/chainview"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
@ -162,97 +163,6 @@ func newRouteTuple(amt lnwire.MilliSatoshi, dest []byte) routeTuple {
return r
}
// cntMutex is a struct that wraps a counter and a mutex, and is used
// to keep track of the number of goroutines waiting for access to the
// mutex, such that we can forget about it when the counter is zero.
type cntMutex struct {
cnt int
sync.Mutex
}
// mutexForID is a struct that keeps track of a set of mutexes with
// a given ID. It can be used for making sure only one goroutine
// gets given the mutex per ID. Here it is currently used to making
// sure we only process one ChannelEdgePolicy per channelID at a
// given time.
type mutexForID struct {
// mutexes is a map of IDs to a cntMutex. The cntMutex for
// a given ID will hold the mutex to be used by all
// callers requesting access for the ID, in addition to
// the count of callers.
mutexes map[uint64]*cntMutex
// mapMtx is used to give synchronize concurrent access
// to the mutexes map.
mapMtx sync.Mutex
}
func newMutexForID() *mutexForID {
return &mutexForID{
mutexes: make(map[uint64]*cntMutex),
}
}
// Lock locks the mutex by the given ID. If the mutex is already
// locked by this ID, Lock blocks until the mutex is available.
func (c *mutexForID) Lock(id uint64) {
c.mapMtx.Lock()
mtx, ok := c.mutexes[id]
if ok {
// If the mutex already existed in the map, we
// increment its counter, to indicate that there
// now is one more goroutine waiting for it.
mtx.cnt++
} else {
// If it was not in the map, it means no other
// goroutine has locked the mutex for this ID,
// and we can create a new mutex with count 1
// and add it to the map.
mtx = &cntMutex{
cnt: 1,
}
c.mutexes[id] = mtx
}
c.mapMtx.Unlock()
// Acquire the mutex for this ID.
mtx.Lock()
}
// Unlock unlocks the mutex by the given ID. It is a run-time
// error if the mutex is not locked by the ID on entry to Unlock.
func (c *mutexForID) Unlock(id uint64) {
// Since we are done with all the work for this
// update, we update the map to reflect that.
c.mapMtx.Lock()
mtx, ok := c.mutexes[id]
if !ok {
// The mutex not existing in the map means
// an unlock for an ID not currently locked
// was attempted.
panic(fmt.Sprintf("double unlock for id %v",
id))
}
// Decrement the counter. If the count goes to
// zero, it means this caller was the last one
// to wait for the mutex, and we can delete it
// from the map. We can do this safely since we
// are under the mapMtx, meaning that all other
// goroutines waiting for the mutex already
// have incremented it, or will create a new
// mutex when they get the mapMtx.
mtx.cnt--
if mtx.cnt == 0 {
delete(c.mutexes, id)
}
c.mapMtx.Unlock()
// Unlock the mutex for this ID.
mtx.Unlock()
}
// ChannelRouter is the layer 3 router within the Lightning stack. Below the
// ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain
// itself. The primary role of the ChannelRouter is to respond to queries for
@ -325,7 +235,7 @@ type ChannelRouter struct {
// channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses.
channelEdgeMtx *mutexForID
channelEdgeMtx *multimutex.Mutex
sync.RWMutex
@ -355,7 +265,7 @@ func New(cfg Config) (*ChannelRouter, error) {
topologyClients: make(map[uint64]*topologyClient),
ntfnClientUpdates: make(chan *topologyClientUpdate),
missionControl: newMissionControl(cfg.Graph, selfNode),
channelEdgeMtx: newMutexForID(),
channelEdgeMtx: multimutex.NewMutex(),
selfNode: selfNode,
routeCache: make(map[routeTuple][]*Route),
quit: make(chan struct{}),