discovery/gossiper: restrict database access per channel ID

This commit uses the multimutex.Mutex to esure database
state stays consistent when handling an announcement, by
restricting access to one goroutine per channel ID.

This fixes a bug where the goroutine would read the
database, make some decisions based on what was read,
then write its data to the database, but the read data
would be outdated at this point. For instance, an
AuthProof could have been added between reading the
database and when the decision whether to announce
the channel is made, making it not announce it.
Similarly, when receiving the AuthProof, the edge
policy could be added between reading the edge state
and adding the proof to the database, also resulting
in the edge not being announced.
This commit is contained in:
Johan T. Halseth 2018-01-23 16:26:15 +01:00
parent b07f242dc2
commit a55c74f80c
No known key found for this signature in database
GPG Key ID: 15BAADA29DA20D26

@ -16,6 +16,7 @@ import (
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/routing" "github.com/lightningnetwork/lnd/routing"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/chaincfg/chainhash"
@ -186,6 +187,12 @@ type AuthenticatedGossiper struct {
// selfKey is the identity public key of the backing Lighting node. // selfKey is the identity public key of the backing Lighting node.
selfKey *btcec.PublicKey selfKey *btcec.PublicKey
// channelMtx is used to restrict the database access to one
// goroutine per channel ID. This is done to ensure that when
// the gossiper is handling an announcement, the db state stays
// consistent between when the DB is first read to it's written.
channelMtx *multimutex.Mutex
sync.Mutex sync.Mutex
} }
@ -206,6 +213,7 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
prematureAnnouncements: make(map[uint32][]*networkMsg), prematureAnnouncements: make(map[uint32][]*networkMsg),
prematureChannelUpdates: make(map[uint64][]*networkMsg), prematureChannelUpdates: make(map[uint64][]*networkMsg),
waitingProofs: storage, waitingProofs: storage,
channelMtx: multimutex.NewMutex(),
}, nil }, nil
} }
@ -1371,6 +1379,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// present in this channel are not present in the database, a // present in this channel are not present in the database, a
// partial node will be added to represent each node while we // partial node will be added to represent each node while we
// wait for a node announcement. // wait for a node announcement.
//
// Before we add the edge to the database, we obtain
// the mutex for this channel ID. We do this to ensure
// no other goroutine has read the database and is now
// making decisions based on this DB state, before it
// writes to the DB.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
if err := d.cfg.Router.AddEdge(edge); err != nil { if err := d.cfg.Router.AddEdge(edge); err != nil {
// If the edge was rejected due to already being known, // If the edge was rejected due to already being known,
// then it may be that case that this new message has a // then it may be that case that this new message has a
@ -1511,6 +1527,13 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// Get the node pub key as far as we don't have it in channel // Get the node pub key as far as we don't have it in channel
// update announcement message. We'll need this to properly // update announcement message. We'll need this to properly
// verify message signature. // verify message signature.
//
// We make sure to obtain the mutex for this channel ID
// before we acces the database. This ensures the state
// we read from the database has not changed between this
// point and when we call UpdateEdge() later.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil { if err != nil {
switch err { switch err {
@ -1674,6 +1697,12 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n
// Ensure that we know of a channel with the target channel ID // Ensure that we know of a channel with the target channel ID
// before proceeding further. // before proceeding further.
//
// We must acquire the mutex for this channel ID before getting
// the channel from the database, to ensure what we read does
// not change before we call AddProof() later.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID) msg.ShortChannelID)
if err != nil { if err != nil {