diff --git a/discovery/service.go b/discovery/service.go index 9625dd96..5397dcb7 100644 --- a/discovery/service.go +++ b/discovery/service.go @@ -16,17 +16,6 @@ import ( "github.com/roasbeef/btcutil" ) -// waitingProofKey is the proof key which uniquely identifies the announcement -// signature message. The goal of this key is distinguish the local and remote -// proof for the same channel id. -// -// TODO(andrew.shvv) move to the channeldb package after waiting proof map -// becomes persistent. -type waitingProofKey struct { - chanID uint64 - isRemote bool -} - // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { @@ -79,18 +68,27 @@ type Config struct { // network the pending batch of new announcements we've received since // the last trickle tick. TrickleDelay time.Duration + + // DB is a global boltdb instance which is needed to pass it in + // waiting proof storage to make waiting proofs persistent. + DB *channeldb.DB } // New creates a new AuthenticatedGossiper instance, initialized with the // passed configuration paramters. func New(cfg Config) (*AuthenticatedGossiper, error) { + storage, err := channeldb.NewWaitingProofStore(cfg.DB) + if err != nil { + return nil, err + } + return &AuthenticatedGossiper{ cfg: &cfg, networkMsgs: make(chan *networkMsg), quit: make(chan struct{}), syncRequests: make(chan *syncRequest), prematureAnnouncements: make(map[uint32][]*networkMsg), - waitingProofs: make(map[waitingProofKey]*lnwire.AnnounceSignatures), + waitingProofs: storage, }, nil } @@ -110,7 +108,7 @@ type AuthenticatedGossiper struct { quit chan struct{} wg sync.WaitGroup - // cfg is a copy of the configuration struct that the discovery service + // cfg is a copy of the configuration struct that the gossiper service // was initialized with. cfg *Config @@ -128,17 +126,15 @@ type AuthenticatedGossiper struct { // TODO(roasbeef): limit premature networkMsgs to N prematureAnnouncements map[uint32][]*networkMsg - // waitingProofs is a map of partial channel proof announcement - // messages. We use this map to buffer half of the material needed to - // reconstruct a full authenticated channel announcement. Once we - // receive the other half the channel proof, we'll be able to properly - // validate it an re-broadcast it out to the network. - // - // TODO(andrew.shvv) make this map persistent. - waitingProofs map[waitingProofKey]*lnwire.AnnounceSignatures + // waitingProofs is a persistent storage of partial channel proof + // announcement messages. We use it to buffer half of the material + // needed to reconstruct a full authenticated channel announcement. Once + // we receive the other half the channel proof, we'll be able to + // properly validate it an re-broadcast it out to the network. + waitingProofs *channeldb.WaitingProofStore // networkMsgs is a channel that carries new network broadcasted - // message from outside the discovery service to be processed by the + // message from outside the gossiper service to be processed by the // networkHandler. networkMsgs chan *networkMsg @@ -415,7 +411,7 @@ func (d *AuthenticatedGossiper) networkHandler() { nodePub, err) } - // The discovery has been signalled to exit, to we exit our + // The gossiper has been signalled to exit, to we exit our // main loop so the wait group can be decremented. case <-d.quit: return @@ -689,8 +685,15 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l if err != nil { // TODO(andrew.shvv) this is dangerous because remote // node might rewrite the waiting proof. - key := newProofKey(shortChanID, nMsg.isRemote) - d.waitingProofs[key] = msg + proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) + if err := d.waitingProofs.Add(proof); err != nil { + err := errors.Errorf("unable to store "+ + "the proof for short_chan_id=%v: %v", + shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil + } log.Infof("Orphan %v proof announcement with "+ "short_chan_id=%v, adding"+ @@ -720,11 +723,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l // announcement. If we didn't receive the opposite half of the // proof than we should store it this one, and wait for // opposite to be received. - oppositeKey := newProofKey(chanInfo.ChannelID, !nMsg.isRemote) - oppositeProof, ok := d.waitingProofs[oppositeKey] - if !ok { - key := newProofKey(chanInfo.ChannelID, nMsg.isRemote) - d.waitingProofs[key] = msg + proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) + oppositeProof, err := d.waitingProofs.Get(proof.OppositeKey()) + if err != nil && err != channeldb.ErrWaitingProofNotFound { + err := errors.Errorf("unable to get "+ + "the opposite proof for short_chan_id=%v: %v", + shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil + } + + if err == channeldb.ErrWaitingProofNotFound { + if err := d.waitingProofs.Add(proof); err != nil { + err := errors.Errorf("unable to store "+ + "the proof for short_chan_id=%v: %v", + shortChanID, err) + log.Error(err) + nMsg.err <- err + return nil + } // If proof was sent by a local sub-system, then we'll // send the announcement signature to the remote node @@ -805,7 +823,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l nMsg.err <- err return nil } - delete(d.waitingProofs, oppositeKey) + + if err := d.waitingProofs.Remove(proof.OppositeKey()); err != nil { + err := errors.Errorf("unable remove opposite proof "+ + "for the channel with chanID=%v: %v", msg.ChannelID, err) + log.Error(err) + nMsg.err <- err + return nil + } // Proof was successfully created and now can announce the // channel to the remain network. diff --git a/discovery/service_test.go b/discovery/service_test.go index 0a01e8c0..f4f7e682 100644 --- a/discovery/service_test.go +++ b/discovery/service_test.go @@ -13,6 +13,9 @@ import ( "time" + "io/ioutil" + "os" + "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -55,6 +58,31 @@ var ( proofMatureDelta uint32 ) +// makeTestDB creates a new instance of the ChannelDB for testing purposes. A +// callback which cleans up the created temporary directories is also returned +// and intended to be executed after the test completes. +func makeTestDB() (*channeldb.DB, func(), error) { + // First, create a temporary directory to be used for the duration of + // this test. + tempDirName, err := ioutil.TempDir("", "channeldb") + if err != nil { + return nil, nil, err + } + + // Next, create channeldb for the first time. + cdb, err := channeldb.Open(tempDirName) + if err != nil { + return nil, nil, err + } + + cleanUp := func() { + cdb.Close() + os.RemoveAll(tempDirName) + } + + return cdb, cleanUp, nil +} + type mockSigner struct { privKey *btcec.PrivateKey } @@ -369,7 +397,7 @@ func createRemoteChannelAnnouncement(blockHeight uint32) (*lnwire.ChannelAnnounc } type testCtx struct { - discovery *AuthenticatedGossiper + gossiper *AuthenticatedGossiper router *mockGraphSource notifier *mockNotifier broadcastedMessage chan lnwire.Message @@ -383,8 +411,13 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { notifier := newMockNotifier() router := newMockRouter(startHeight) + db, cleanUpDb, err := makeTestDB() + if err != nil { + return nil, nil, err + } + broadcastedMessage := make(chan lnwire.Message, 10) - discovery, err := New(Config{ + gossiper, err := New(Config{ Notifier: notifier, Broadcast: func(_ *btcec.PublicKey, msgs ...lnwire.Message) error { for _, msg := range msgs { @@ -398,22 +431,26 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) { Router: router, TrickleDelay: trickleDelay, ProofMatureDelta: proofMatureDelta, + DB: db, }) if err != nil { + cleanUpDb() return nil, nil, fmt.Errorf("unable to create router %v", err) } - if err := discovery.Start(); err != nil { + if err := gossiper.Start(); err != nil { + cleanUpDb() return nil, nil, fmt.Errorf("unable to start router: %v", err) } cleanUp := func() { - discovery.Stop() + gossiper.Stop() + cleanUpDb() } return &testCtx{ router: router, notifier: notifier, - discovery: discovery, + gossiper: gossiper, broadcastedMessage: broadcastedMessage, }, cleanUp, nil } @@ -428,7 +465,7 @@ func TestProcessAnnouncement(t *testing.T) { defer cleanup() // Create node valid, signed announcement, process it with with - // discovery service, check that valid announcement have been + // gossiper service, check that valid announcement have been // propagated farther into the lightning network, and check that we // added new node into router. na, err := createNodeAnnouncement(nodeKeyPriv1) @@ -436,7 +473,7 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create node announcement: %v", err) } - err = <-ctx.discovery.ProcessRemoteAnnouncement(na, na.NodeID) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(na, na.NodeID) if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -459,7 +496,7 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create channel announcement: %v", err) } - err = <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID) if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -482,7 +519,7 @@ func TestProcessAnnouncement(t *testing.T) { t.Fatalf("can't create update announcement: %v", err) } - err = <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID) if err != nil { t.Fatalf("can't process remote announcement: %v", err) } @@ -523,7 +560,7 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID): + case <-ctx.gossiper.ProcessRemoteAnnouncement(ca, na.NodeID): t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -542,7 +579,7 @@ func TestPrematureAnnouncement(t *testing.T) { } select { - case <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID): + case <-ctx.gossiper.ProcessRemoteAnnouncement(ua, na.NodeID): t.Fatal("announcement was proceeded") case <-time.After(100 * time.Millisecond): } @@ -596,7 +633,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -606,7 +643,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.discovery.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey) + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -616,7 +653,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -628,7 +665,7 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { // Pretending that we receive local channel announcement from funding // manager, thereby kick off the announcement exchange process. - err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -639,11 +676,21 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { case <-time.After(2 * trickleDelay): } - if len(ctx.discovery.waitingProofs) != 1 { - t.Fatal("local proof announcement should be stored") + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) } - err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + if number != 1 { + t.Fatal("wrong number of objects in storage") + } + + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -655,6 +702,20 @@ func TestSignatureAnnouncementLocalFirst(t *testing.T) { t.Fatal("announcement wasn't broadcast") } } + + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil && err != channeldb.ErrWaitingProofNotFound { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("waiting proof should be removed from storage") + } } // TestOrphanSignatureAnnouncement ensures that the gossiper properly @@ -678,18 +739,28 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // manager, thereby kick off the announcement exchange process, in // this case the announcement should be added in the orphan batch // because we haven't announce the channel yet. - err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey) if err != nil { t.Fatalf("unable to proceed announcement: %v", err) } - if len(ctx.discovery.waitingProofs) != 1 { - t.Fatal("local proof announcement should be stored") + number := 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 1 { + t.Fatal("wrong number of objects in storage") } // Recreate lightning network topology. Initialize router with channel // between two nodes. - err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localChanAnn, localKey) + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localChanAnn, localKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -700,7 +771,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.discovery.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey) + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -710,7 +781,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { case <-time.After(2 * trickleDelay): } - err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey) + err = <-ctx.gossiper.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -722,7 +793,7 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { // After that we process local announcement, and waiting to receive // the channel announcement. - err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localProofAnn, localKey) + err = <-ctx.gossiper.ProcessLocalAnnouncement(batch.localProofAnn, localKey) if err != nil { t.Fatalf("unable to process :%v", err) } @@ -735,7 +806,17 @@ func TestOrphanSignatureAnnouncement(t *testing.T) { } } - if len(ctx.discovery.waitingProofs) != 0 { - t.Fatal("index should be removed") + number = 0 + if err := ctx.gossiper.waitingProofs.ForAll( + func(*channeldb.WaitingProof) error { + number++ + return nil + }, + ); err != nil { + t.Fatalf("unable to retrieve objects from store: %v", err) + } + + if number != 0 { + t.Fatal("wrong number of objects in storage") } } diff --git a/discovery/utils.go b/discovery/utils.go index 214531a4..d50c5dbd 100644 --- a/discovery/utils.go +++ b/discovery/utils.go @@ -1,8 +1,6 @@ package discovery import ( - "encoding/binary" - "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwallet" @@ -10,31 +8,6 @@ import ( "github.com/roasbeef/btcd/btcec" ) -// newProofKey constructs a new announcement signature message key. -func newProofKey(chanID uint64, isRemote bool) waitingProofKey { - return waitingProofKey{ - chanID: chanID, - isRemote: isRemote, - } -} - -// ToBytes returns a serialized representation of the key. -func (k waitingProofKey) ToBytes() []byte { - var key [9]byte - - var b uint8 - if k.isRemote { - b = 0 - } else { - b = 1 - } - - binary.BigEndian.PutUint64(key[:8], k.chanID) - key[8] = b - - return key[:] -} - // createChanAnnouncement is a helper function which creates all channel // announcements given the necessary channel related database items. This // function is used to transform out databse structs into the coresponding wire diff --git a/server.go b/server.go index e1e8326b..d79fb04a 100644 --- a/server.go +++ b/server.go @@ -254,6 +254,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, SendToPeer: s.sendToPeer, TrickleDelay: time.Millisecond * 300, ProofMatureDelta: 0, + DB: chanDB, }) if err != nil { return nil, err