server+discovery: remove channeldb.DB reference within the gossiper

Now that we've replaced the built-in messageStore with the
channeldb.GossipMessageStore, the reference to channeldb.DB is no longer
needed.
This commit is contained in:
Wilmer Paulino 2019-02-05 17:18:34 -08:00
parent 2277535e6b
commit 73b4bc4b68
No known key found for this signature in database
GPG Key ID: 6DF57B9F9514972F
3 changed files with 62 additions and 62 deletions

View File

@ -124,9 +124,14 @@ type Config struct {
// should check if we need re-broadcast any of our personal channels.
RetransmitDelay time.Duration
// DB is a global boltdb instance which is needed to pass it in waiting
// proof storage to make waiting proofs persistent.
DB *channeldb.DB
// WaitingProofStore is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement.
// Once we receive the other half the channel proof, we'll be able to
// properly validate it and re-broadcast it out to the network.
//
// TODO(wilmer): make interface to prevent channeldb dependency.
WaitingProofStore *channeldb.WaitingProofStore
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
@ -188,13 +193,6 @@ type AuthenticatedGossiper struct {
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// waitingProofs is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement.
// Once we receive the other half the channel proof, we'll be able to
// properly validate it and re-broadcast it out to the network.
waitingProofs *channeldb.WaitingProofStore
// networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the
// networkHandler.
@ -229,12 +227,7 @@ type AuthenticatedGossiper struct {
// New creates a new AuthenticatedGossiper instance, initialized with the
// passed configuration parameters.
func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
storage, err := channeldb.NewWaitingProofStore(cfg.DB)
if err != nil {
return nil, err
}
func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
return &AuthenticatedGossiper{
selfKey: selfKey,
cfg: &cfg,
@ -243,11 +236,10 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) {
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
waitingProofs: storage,
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
peerSyncers: make(map[routing.Vertex]*gossipSyncer),
}, nil
}
}
// SynchronizeNode sends a message to the service indicating it should
@ -2084,7 +2076,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// TODO(andrew.shvv) this is dangerous because remote
// node might rewrite the waiting proof.
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
if err := d.waitingProofs.Add(proof); err != nil {
err := d.cfg.WaitingProofStore.Add(proof)
if err != nil {
err := fmt.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
@ -2198,7 +2191,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// proof than we should store it this one, and wait for
// opposite to be received.
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
oppositeProof, err := d.waitingProofs.Get(proof.OppositeKey())
oppositeProof, err := d.cfg.WaitingProofStore.Get(
proof.OppositeKey(),
)
if err != nil && err != channeldb.ErrWaitingProofNotFound {
err := fmt.Errorf("unable to get "+
"the opposite proof for short_chan_id=%v: %v",
@ -2209,7 +2204,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
}
if err == channeldb.ErrWaitingProofNotFound {
if err := d.waitingProofs.Add(proof); err != nil {
err := d.cfg.WaitingProofStore.Add(proof)
if err != nil {
err := fmt.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
@ -2278,7 +2274,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}
err = d.waitingProofs.Remove(proof.OppositeKey())
err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
if err != nil {
err := fmt.Errorf("unable remove opposite proof "+
"for the channel with chanID=%v: %v",

View File

@ -593,8 +593,14 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
return nil, nil, err
}
waitingProofStore, err := channeldb.NewWaitingProofStore(db)
if err != nil {
cleanUpDb()
return nil, nil, err
}
broadcastedMessage := make(chan msgWithSenders, 10)
gossiper, err := New(Config{
gossiper := New(Config{
Notifier: notifier,
Broadcast: func(senders map[routing.Vertex]struct{},
msgs ...lnwire.Message) error {
@ -614,17 +620,14 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) {
return &mockPeer{target, nil, nil}, nil
},
Router: router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
DB: db,
MessageStore: newMockMessageStore(),
Router: router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: waitingProofStore,
MessageStore: newMockMessageStore(),
}, nodeKeyPub1)
if err != nil {
cleanUpDb()
return nil, nil, fmt.Errorf("unable to create router %v", err)
}
if err := gossiper.Start(); err != nil {
cleanUpDb()
return nil, nil, fmt.Errorf("unable to start router: %v", err)
@ -993,7 +996,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
}
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -1026,7 +1029,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) {
}
number = 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -1096,7 +1099,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
}
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -1236,7 +1239,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) {
}
number = 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(p *channeldb.WaitingProof) error {
number++
return nil
@ -1406,7 +1409,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
}
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -1461,7 +1464,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) {
}
number = 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -1624,7 +1627,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
}
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -1640,7 +1643,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
// Shut down gossiper, and restart. This should trigger a new attempt
// to send the message to the peer.
ctx.gossiper.Stop()
gossiper, err := New(Config{
gossiper := New(Config{
Notifier: ctx.gossiper.cfg.Notifier,
Broadcast: ctx.gossiper.cfg.Broadcast,
SendToPeer: func(target *btcec.PublicKey,
@ -1651,12 +1654,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
connectedChan chan<- lnpeer.Peer) {
notifyPeers <- connectedChan
},
Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
DB: ctx.gossiper.cfg.DB,
MessageStore: ctx.gossiper.cfg.MessageStore,
Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
MessageStore: ctx.gossiper.cfg.MessageStore,
}, ctx.gossiper.selfKey)
if err != nil {
t.Fatalf("unable to recreate gossiper: %v", err)
@ -1723,7 +1726,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
}
number = 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -1912,7 +1915,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
}
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -2494,7 +2497,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
}
number := 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil
@ -2523,7 +2526,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) {
}
number = 0
if err := ctx.gossiper.waitingProofs.ForAll(
if err := ctx.gossiper.cfg.WaitingProofStore.ForAll(
func(*channeldb.WaitingProof) error {
number++
return nil

View File

@ -587,8 +587,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
if err != nil {
return nil, err
}
waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB)
if err != nil {
return nil, err
}
s.authGossiper, err = discovery.New(discovery.Config{
s.authGossiper = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
@ -598,19 +602,16 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) {
return s.FindPeer(pub)
},
NotifyWhenOnline: s.NotifyWhenOnline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
DB: chanDB,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
NotifyWhenOnline: s.NotifyWhenOnline,
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,
},
s.identityPriv.PubKey(),
)
if err != nil {
return nil, err
}
utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB)
if err != nil {