From 73b4bc4b6810b76e71d1095e2e1dab901daa9ed2 Mon Sep 17 00:00:00 2001 From: Wilmer Paulino Date: Tue, 5 Feb 2019 17:18:34 -0800 Subject: [PATCH] server+discovery: remove channeldb.DB reference within the gossiper Now that we've replaced the built-in messageStore with the channeldb.GossipMessageStore, the reference to channeldb.DB is no longer needed. --- discovery/gossiper.go | 40 +++++++++++-------------- discovery/gossiper_test.go | 61 ++++++++++++++++++++------------------ server.go | 23 +++++++------- 3 files changed, 62 insertions(+), 62 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f1195d45..d11f5299 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -124,9 +124,14 @@ type Config struct { // should check if we need re-broadcast any of our personal channels. RetransmitDelay time.Duration - // DB is a global boltdb instance which is needed to pass it in waiting - // proof storage to make waiting proofs persistent. - DB *channeldb.DB + // WaitingProofStore is a persistent storage of partial channel proof + // announcement messages. We use it to buffer half of the material + // needed to reconstruct a full authenticated channel announcement. + // Once we receive the other half the channel proof, we'll be able to + // properly validate it and re-broadcast it out to the network. + // + // TODO(wilmer): make interface to prevent channeldb dependency. + WaitingProofStore *channeldb.WaitingProofStore // MessageStore is a persistent storage of gossip messages which we will // use to determine which messages need to be resent for a given peer. @@ -188,13 +193,6 @@ type AuthenticatedGossiper struct { prematureChannelUpdates map[uint64][]*networkMsg pChanUpdMtx sync.Mutex - // waitingProofs is a persistent storage of partial channel proof - // announcement messages. We use it to buffer half of the material - // needed to reconstruct a full authenticated channel announcement. - // Once we receive the other half the channel proof, we'll be able to - // properly validate it and re-broadcast it out to the network. - waitingProofs *channeldb.WaitingProofStore - // networkMsgs is a channel that carries new network broadcasted // message from outside the gossiper service to be processed by the // networkHandler. @@ -229,12 +227,7 @@ type AuthenticatedGossiper struct { // New creates a new AuthenticatedGossiper instance, initialized with the // passed configuration parameters. -func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { - storage, err := channeldb.NewWaitingProofStore(cfg.DB) - if err != nil { - return nil, err - } - +func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { return &AuthenticatedGossiper{ selfKey: selfKey, cfg: &cfg, @@ -243,11 +236,10 @@ func New(cfg Config, selfKey *btcec.PublicKey) (*AuthenticatedGossiper, error) { chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), prematureAnnouncements: make(map[uint32][]*networkMsg), prematureChannelUpdates: make(map[uint64][]*networkMsg), - waitingProofs: storage, channelMtx: multimutex.NewMutex(), recentRejects: make(map[uint64]struct{}), peerSyncers: make(map[routing.Vertex]*gossipSyncer), - }, nil + } } // SynchronizeNode sends a message to the service indicating it should @@ -2084,7 +2076,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - if err := d.waitingProofs.Add(proof); err != nil { + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) @@ -2198,7 +2191,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( // proof than we should store it this one, and wait for // opposite to be received. proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) - oppositeProof, err := d.waitingProofs.Get(proof.OppositeKey()) + oppositeProof, err := d.cfg.WaitingProofStore.Get( + proof.OppositeKey(), + ) if err != nil && err != channeldb.ErrWaitingProofNotFound { err := fmt.Errorf("unable to get "+ "the opposite proof for short_chan_id=%v: %v", @@ -2209,7 +2204,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( } if err == channeldb.ErrWaitingProofNotFound { - if err := d.waitingProofs.Add(proof); err != nil { + err := d.cfg.WaitingProofStore.Add(proof) + if err != nil { err := fmt.Errorf("unable to store "+ "the proof for short_chan_id=%v: %v", shortChanID, err) @@ -2278,7 +2274,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement( return nil } - err = d.waitingProofs.Remove(proof.OppositeKey()) + err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) if err != nil { err := fmt.Errorf("unable remove opposite proof "+ "for the channel with chanID=%v: %v", diff --git a/discovery/gossiper_test.go b/discovery/gossiper_test.go index d32ca0af..0e2e51f4 100644 --- a/discovery/gossiper_test.go +++ b/discovery/gossiper_test.go @@ -593,8 +593,14 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { return nil, nil, err } + waitingProofStore, err := channeldb.NewWaitingProofStore(db) + if err != nil { + cleanUpDb() + return nil, nil, err + } + broadcastedMessage := make(chan msgWithSenders, 10) - gossiper, err := New(Config{ + gossiper := New(Config{ Notifier: notifier, Broadcast: func(senders map[routing.Vertex]struct{}, msgs ...lnwire.Message) error { @@ -614,17 +620,14 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { FindPeer: func(target *btcec.PublicKey) (lnpeer.Peer, error) { return &mockPeer{target, nil, nil}, nil }, - Router: router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - DB: db, - MessageStore: newMockMessageStore(), + Router: router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: waitingProofStore, + MessageStore: newMockMessageStore(), }, nodeKeyPub1) - if err != nil { - cleanUpDb() - return nil, nil, fmt.Errorf("unable to create router %v", err) - } + if err := gossiper.Start(); err != nil { cleanUpDb() return nil, nil, fmt.Errorf("unable to start router: %v", err) @@ -993,7 +996,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1026,7 +1029,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1096,7 +1099,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1236,7 +1239,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(p *channeldb.WaitingProof) error { number++ return nil @@ -1406,7 +1409,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1461,7 +1464,7 @@ func TestSignatureAnnouncementRetry(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1624,7 +1627,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1640,7 +1643,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { // Shut down gossiper, and restart. This should trigger a new attempt // to send the message to the peer. ctx.gossiper.Stop() - gossiper, err := New(Config{ + gossiper := New(Config{ Notifier: ctx.gossiper.cfg.Notifier, Broadcast: ctx.gossiper.cfg.Broadcast, SendToPeer: func(target *btcec.PublicKey, @@ -1651,12 +1654,12 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { connectedChan chan<- lnpeer.Peer) { notifyPeers <- connectedChan }, - Router: ctx.gossiper.cfg.Router, - TrickleDelay: trickleDelay, - RetransmitDelay: retransmitDelay, - ProofMatureDelta: proofMatureDelta, - DB: ctx.gossiper.cfg.DB, - MessageStore: ctx.gossiper.cfg.MessageStore, + Router: ctx.gossiper.cfg.Router, + TrickleDelay: trickleDelay, + RetransmitDelay: retransmitDelay, + ProofMatureDelta: proofMatureDelta, + WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore, + MessageStore: ctx.gossiper.cfg.MessageStore, }, ctx.gossiper.selfKey) if err != nil { t.Fatalf("unable to recreate gossiper: %v", err) @@ -1723,7 +1726,7 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -1912,7 +1915,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2494,7 +2497,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } number := 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil @@ -2523,7 +2526,7 @@ func TestReceiveRemoteChannelUpdateFirst(t *testing.T) { } number = 0 - if err := ctx.gossiper.waitingProofs.ForAll( + if err := ctx.gossiper.cfg.WaitingProofStore.ForAll( func(*channeldb.WaitingProof) error { number++ return nil diff --git a/server.go b/server.go index 13210c6e..89b8d39c 100644 --- a/server.go +++ b/server.go @@ -587,8 +587,12 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, if err != nil { return nil, err } + waitingProofStore, err := channeldb.NewWaitingProofStore(s.chanDB) + if err != nil { + return nil, err + } - s.authGossiper, err = discovery.New(discovery.Config{ + s.authGossiper = discovery.New(discovery.Config{ Router: s.chanRouter, Notifier: s.cc.chainNotifier, ChainHash: *activeNetParams.GenesisHash, @@ -598,19 +602,16 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, FindPeer: func(pub *btcec.PublicKey) (lnpeer.Peer, error) { return s.FindPeer(pub) }, - NotifyWhenOnline: s.NotifyWhenOnline, - ProofMatureDelta: 0, - TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), - RetransmitDelay: time.Minute * 30, - DB: chanDB, - MessageStore: gossipMessageStore, - AnnSigner: s.nodeSigner, + NotifyWhenOnline: s.NotifyWhenOnline, + ProofMatureDelta: 0, + TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay), + RetransmitDelay: time.Minute * 30, + WaitingProofStore: waitingProofStore, + MessageStore: gossipMessageStore, + AnnSigner: s.nodeSigner, }, s.identityPriv.PubKey(), ) - if err != nil { - return nil, err - } utxnStore, err := newNurseryStore(activeNetParams.GenesisHash, chanDB) if err != nil {