Merge pull request #2084 from halseth/node-announcement-stale

Retransmit NodeAnnouncement on regular intervals
This commit is contained in:
Wilmer Paulino 2019-09-17 11:37:05 -07:00 committed by GitHub
commit 9e4c4c5041
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 320 additions and 77 deletions

@ -147,6 +147,12 @@ type Config struct {
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// SelfNodeAnnouncement is a function that fetches our own current node
// announcement, for use when determining whether we should update our
// peers about our presence on the network. If the refresh is true, a
// new and updated announcement will be returned.
SelfNodeAnnouncement func(refresh bool) (lnwire.NodeAnnouncement, error)
// ProofMatureDelta the number of confirmations which is needed before
// exchange the channel announcement proofs.
ProofMatureDelta uint32
@ -156,9 +162,17 @@ type Config struct {
// the last trickle tick.
TrickleDelay time.Duration
// RetransmitDelay is the period of a timer which indicates that we
// should check if we need re-broadcast any of our personal channels.
RetransmitDelay time.Duration
// RetransmitTicker is a ticker that ticks with a period which
// indicates that we should check if we need re-broadcast any of our
// personal channels.
RetransmitTicker ticker.Ticker
// RebroadcastInterval is the maximum time we wait between sending out
// channel updates for our active channels and our own node
// announcement. We do this to ensure our active presence on the
// network is known, and we are not being considered a zombie node or
// having zombie channels.
RebroadcastInterval time.Duration
// WaitingProofStore is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
@ -881,16 +895,16 @@ func (d *AuthenticatedGossiper) networkHandler() {
announcements := deDupedAnnouncements{}
announcements.Reset()
retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay)
defer retransmitTimer.Stop()
d.cfg.RetransmitTicker.Resume()
defer d.cfg.RetransmitTicker.Stop()
trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
defer trickleTimer.Stop()
// To start, we'll first check to see if there are any stale channels
// that we need to re-transmit.
if err := d.retransmitStaleChannels(); err != nil {
log.Errorf("Unable to rebroadcast stale channels: %v", err)
// To start, we'll first check to see if there are any stale channel or
// node announcements that we need to re-transmit.
if err := d.retransmitStaleAnns(time.Now()); err != nil {
log.Errorf("Unable to rebroadcast stale announcements: %v", err)
}
// We'll use this validation to ensure that we process jobs in their
@ -1101,13 +1115,14 @@ func (d *AuthenticatedGossiper) networkHandler() {
// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our
// personal channels. This addresses the case of "zombie"
// channels and channel advertisements that have been dropped,
// or not properly propagated through the network.
case <-retransmitTimer.C:
if err := d.retransmitStaleChannels(); err != nil {
// personal channels or node announcement. This addresses the
// case of "zombie" channels and channel advertisements that
// have been dropped, or not properly propagated through the
// network.
case tick := <-d.cfg.RetransmitTicker.Ticks():
if err := d.retransmitStaleAnns(tick); err != nil {
log.Errorf("unable to rebroadcast stale "+
"channels: %v", err)
"announcements: %v", err)
}
// The gossiper has been signalled to exit, to we exit our
@ -1155,18 +1170,23 @@ func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool {
}
}
// retransmitStaleChannels examines all outgoing channels that the source node
// is known to maintain to check to see if any of them are "stale". A channel
// is stale iff, the last timestamp of its rebroadcast is older then
// broadcastInterval.
func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
// retransmitStaleAnns examines all outgoing channels that the source node is
// known to maintain to check to see if any of them are "stale". A channel is
// stale iff, the last timestamp of its rebroadcast is older than the
// RebroadcastInterval. We also check if a refreshed node announcement should
// be resent.
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error {
// Iterate over all of our channels and check if any of them fall
// within the prune interval or re-broadcast interval.
type updateTuple struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []updateTuple
var (
havePublicChannels bool
edgesToUpdate []updateTuple
)
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
@ -1182,6 +1202,11 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
return nil
}
// We make a note that we have at least one public channel. We
// use this to determine whether we should send a node
// announcement below.
havePublicChannels = true
// If this edge has a ChannelUpdate that was created before the
// introduction of the MaxHTLC field, then we'll update this
// edge to propagate this information in the network.
@ -1193,14 +1218,12 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
return nil
}
const broadcastInterval = time.Hour * 24
timeElapsed := now.Sub(edge.LastUpdate)
timeElapsed := time.Since(edge.LastUpdate)
// If it's been a full day since we've re-broadcasted the
// channel, add the channel to the set of edges we need to
// update.
if timeElapsed >= broadcastInterval {
// If it's been longer than RebroadcastInterval since we've
// re-broadcasted the channel, add the channel to the set of
// edges we need to update.
if timeElapsed >= d.cfg.RebroadcastInterval {
edgesToUpdate = append(edgesToUpdate, updateTuple{
info: info,
edge: edge,
@ -1234,13 +1257,51 @@ func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
signedUpdates = append(signedUpdates, chanUpdate)
}
// If we don't have any channels to re-broadcast, then we'll exit
// If we don't have any public channels, we return as we don't want to
// broadcast anything that would reveal our existence.
if !havePublicChannels {
return nil
}
// We'll also check that our NodeAnnouncement is not too old.
currentNodeAnn, err := d.cfg.SelfNodeAnnouncement(false)
if err != nil {
return fmt.Errorf("unable to get current node announment: %v",
err)
}
timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0)
timeElapsed := now.Sub(timestamp)
// If it's been a full day since we've re-broadcasted the
// node announcement, refresh it and resend it.
nodeAnnStr := ""
if timeElapsed >= d.cfg.RebroadcastInterval {
newNodeAnn, err := d.cfg.SelfNodeAnnouncement(true)
if err != nil {
return fmt.Errorf("unable to get refreshed node "+
"announcement: %v", err)
}
signedUpdates = append(signedUpdates, &newNodeAnn)
nodeAnnStr = " and our refreshed node announcement"
// Before broadcasting the refreshed node announcement, add it
// to our own graph.
if err := d.addNode(&newNodeAnn); err != nil {
log.Errorf("Unable to add refreshed node announcement "+
"to graph: %v", err)
}
}
// If we don't have any updates to re-broadcast, then we'll exit
// early.
if len(signedUpdates) == 0 {
return nil
}
log.Infof("Retransmitting %v outgoing channels", len(edgesToUpdate))
log.Infof("Retransmitting %v outgoing channels%v",
len(edgesToUpdate), nodeAnnStr)
// With all the wire announcements properly crafted, we'll broadcast
// our known outgoing channels to all our immediate peers.
@ -1454,6 +1515,33 @@ func (d *AuthenticatedGossiper) processRejectedEdge(
return announcements, nil
}
// addNode processes the given node announcement, and adds it to our channel
// graph.
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement) error {
if err := routing.ValidateNodeAnn(msg); err != nil {
return fmt.Errorf("unable to validate node announcement: %v",
err)
}
timestamp := time.Unix(int64(msg.Timestamp), 0)
features := lnwire.NewFeatureVector(
msg.Features, lnwire.GlobalFeatures,
)
node := &channeldb.LightningNode{
HaveNodeAnnouncement: true,
LastUpdate: timestamp,
Addresses: msg.Addresses,
PubKeyBytes: msg.NodeID,
Alias: msg.Alias.String(),
AuthSigBytes: msg.Signature.ToSignatureBytes(),
Features: features,
Color: msg.RGBColor,
ExtraOpaqueData: msg.ExtraOpaqueData,
}
return d.cfg.Router.AddNode(node)
}
// processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid,
@ -1487,30 +1575,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
return nil
}
if err := routing.ValidateNodeAnn(msg); err != nil {
err := fmt.Errorf("unable to validate "+
"node announcement: %v", err)
log.Error(err)
nMsg.err <- err
return nil
}
features := lnwire.NewFeatureVector(
msg.Features, lnwire.GlobalFeatures,
)
node := &channeldb.LightningNode{
HaveNodeAnnouncement: true,
LastUpdate: timestamp,
Addresses: msg.Addresses,
PubKeyBytes: msg.NodeID,
Alias: msg.Alias.String(),
AuthSigBytes: msg.Signature.ToSignatureBytes(),
Features: features,
Color: msg.RGBColor,
ExtraOpaqueData: msg.ExtraOpaqueData,
}
if err := d.cfg.Router.AddNode(node); err != nil {
if err := d.addNode(msg); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
@ -1526,10 +1591,10 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// In order to ensure we don't leak unadvertised nodes, we'll
// make a quick check to ensure this node intends to publicly
// advertise itself to the network.
isPublic, err := d.cfg.Router.IsPublicNode(node.PubKeyBytes)
isPublic, err := d.cfg.Router.IsPublicNode(msg.NodeID)
if err != nil {
log.Errorf("Unable to determine if node %x is "+
"advertised: %v", node.PubKeyBytes, err)
"advertised: %v", msg.NodeID, err)
nMsg.err <- err
return nil
}

@ -58,6 +58,11 @@ var (
trickleDelay = time.Millisecond * 100
retransmitDelay = time.Hour * 1
proofMatureDelta uint32
// The test timestamp + rebroadcast interval makes sure messages won't
// be rebroadcasted automaticallty during the tests.
testTimestamp = uint32(1234567890)
rebroadcastInterval = time.Hour * 1000000
)
// makeTestDB creates a new instance of the ChannelDB for testing purposes. A
@ -163,10 +168,6 @@ func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error {
return nil
}
func (r *mockGraphSource) SelfEdges() ([]*channeldb.ChannelEdgePolicy, error) {
return nil, nil
}
func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) {
return r.bestHeight, nil
}
@ -471,7 +472,7 @@ type annBatch struct {
func createAnnouncements(blockHeight uint32) (*annBatch, error) {
var err error
var batch annBatch
timestamp := uint32(123456)
timestamp := testTimestamp
batch.nodeAnn1, err = createNodeAnnouncement(nodeKeyPriv1, timestamp)
if err != nil {
@ -739,9 +740,15 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
c := make(chan struct{})
return c
},
SelfNodeAnnouncement: func(bool) (lnwire.NodeAnnouncement, error) {
return lnwire.NodeAnnouncement{
Timestamp: testTimestamp,
}, nil
},
Router: router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
RetransmitTicker: ticker.NewForce(retransmitDelay),
RebroadcastInterval: rebroadcastInterval,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: waitingProofStore,
MessageStore: newMockMessageStore(),
@ -780,8 +787,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
func TestProcessAnnouncement(t *testing.T) {
t.Parallel()
timestamp := uint32(123456)
timestamp := testTimestamp
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
@ -889,7 +895,7 @@ func TestProcessAnnouncement(t *testing.T) {
func TestPrematureAnnouncement(t *testing.T) {
t.Parallel()
timestamp := uint32(123456)
timestamp := testTimestamp
ctx, cleanup, err := createTestCtx(0)
if err != nil {
@ -1491,9 +1497,11 @@ func TestSignatureAnnouncementRetryAtStartup(t *testing.T) {
Broadcast: ctx.gossiper.cfg.Broadcast,
NotifyWhenOnline: ctx.gossiper.reliableSender.cfg.NotifyWhenOnline,
NotifyWhenOffline: ctx.gossiper.reliableSender.cfg.NotifyWhenOffline,
SelfNodeAnnouncement: ctx.gossiper.cfg.SelfNodeAnnouncement,
Router: ctx.gossiper.cfg.Router,
TrickleDelay: trickleDelay,
RetransmitDelay: retransmitDelay,
RetransmitTicker: ticker.NewForce(retransmitDelay),
RebroadcastInterval: rebroadcastInterval,
ProofMatureDelta: proofMatureDelta,
WaitingProofStore: ctx.gossiper.cfg.WaitingProofStore,
MessageStore: ctx.gossiper.cfg.MessageStore,
@ -1657,6 +1665,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
t.Fatal("channel update announcement was broadcast")
case <-time.After(2 * trickleDelay):
}
select {
case msg := <-sentToPeer:
assertMessage(t, batch.chanUpdAnn1, msg)
@ -1797,7 +1806,7 @@ func TestSignatureAnnouncementFullProofWhenRemoteProof(t *testing.T) {
func TestDeDuplicatedAnnouncements(t *testing.T) {
t.Parallel()
timestamp := uint32(123456)
timestamp := testTimestamp
announcements := deDupedAnnouncements{}
announcements.Reset()
@ -2667,6 +2676,7 @@ func TestExtraDataChannelAnnouncementValidation(t *testing.T) {
func TestExtraDataChannelUpdateValidation(t *testing.T) {
t.Parallel()
timestamp := testTimestamp
ctx, cleanup, err := createTestCtx(0)
if err != nil {
t.Fatalf("can't create context: %v", err)
@ -2674,7 +2684,6 @@ func TestExtraDataChannelUpdateValidation(t *testing.T) {
defer cleanup()
remotePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
timestamp := uint32(123456)
// In this scenario, we'll create two announcements, one regular
// channel announcement, and another channel update announcement, that
@ -2741,7 +2750,7 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) {
defer cleanup()
remotePeer := &mockPeer{nodeKeyPriv1.PubKey(), nil, nil}
timestamp := uint32(123456)
timestamp := testTimestamp
// We'll create a node announcement that includes a set of opaque data
// which we don't know of, but will store anyway in order to ensure
@ -2763,6 +2772,171 @@ func TestExtraDataNodeAnnouncementValidation(t *testing.T) {
}
}
// assertBroadcast checks that num messages are being broadcasted from the
// gossiper. The broadcasted messages are returned.
func assertBroadcast(t *testing.T, ctx *testCtx, num int) []lnwire.Message {
t.Helper()
var msgs []lnwire.Message
for i := 0; i < num; i++ {
select {
case msg := <-ctx.broadcastedMessage:
msgs = append(msgs, msg.msg)
case <-time.After(time.Second):
t.Fatalf("expected %d messages to be broadcast, only "+
"got %d", num, i)
}
}
// No more messages should be broadcast.
select {
case msg := <-ctx.broadcastedMessage:
t.Fatalf("unexpected message was broadcast: %T", msg.msg)
case <-time.After(2 * trickleDelay):
}
return msgs
}
// assertProcessAnnouncemnt is a helper method that checks that the result of
// processing an announcement is successful.
func assertProcessAnnouncement(t *testing.T, result chan error) {
t.Helper()
select {
case err := <-result:
if err != nil {
t.Fatalf("unable to process :%v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("did not process announcement")
}
}
// TestRetransmit checks that the expected announcements are retransmitted when
// the retransmit ticker ticks.
func TestRetransmit(t *testing.T) {
t.Parallel()
ctx, cleanup, err := createTestCtx(proofMatureDelta)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey, err := btcec.ParsePubKey(batch.nodeAnn1.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remoteKey, err := btcec.ParsePubKey(batch.nodeAnn2.NodeID[:], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pubkey: %v", err)
}
remotePeer := &mockPeer{remoteKey, nil, nil}
// Process a local channel annoucement, channel update and node
// announcement. No messages should be broadcasted yet, since no proof
// has been exchanged.
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.localChanAnn, localKey,
),
)
assertBroadcast(t, ctx, 0)
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.chanUpdAnn1, localKey,
),
)
assertBroadcast(t, ctx, 0)
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.nodeAnn1, localKey,
),
)
assertBroadcast(t, ctx, 0)
// Add the remote channel update to the gossiper. Similarly, nothing
// should be broadcasted.
assertProcessAnnouncement(
t, ctx.gossiper.ProcessRemoteAnnouncement(
batch.chanUpdAnn2, remotePeer,
),
)
assertBroadcast(t, ctx, 0)
// Now add the local and remote proof to the gossiper, which should
// trigger a broadcast of the announcements.
assertProcessAnnouncement(
t, ctx.gossiper.ProcessLocalAnnouncement(
batch.localProofAnn, localKey,
),
)
assertBroadcast(t, ctx, 0)
assertProcessAnnouncement(
t, ctx.gossiper.ProcessRemoteAnnouncement(
batch.remoteProofAnn, remotePeer,
),
)
// checkAnncouncments make sure the expected number of channel
// announcements + channel updates + node announcements are broadcast.
checkAnnouncements := func(t *testing.T, chanAnns, chanUpds,
nodeAnns int) {
t.Helper()
num := chanAnns + chanUpds + nodeAnns
anns := assertBroadcast(t, ctx, num)
// Count the received announcements.
var chanAnn, chanUpd, nodeAnn int
for _, msg := range anns {
switch msg.(type) {
case *lnwire.ChannelAnnouncement:
chanAnn++
case *lnwire.ChannelUpdate:
chanUpd++
case *lnwire.NodeAnnouncement:
nodeAnn++
}
}
if chanAnn != chanAnns || chanUpd != chanUpds ||
nodeAnn != nodeAnns {
t.Fatalf("unexpected number of announcements: "+
"chanAnn=%d, chanUpd=%d, nodeAnn=%d",
chanAnn, chanUpd, nodeAnn)
}
}
// All announcements should be broadcast, including the remote channel
// update.
checkAnnouncements(t, 1, 2, 1)
// Now let the retransmit ticker tick, which should trigger updates to
// be rebroadcast.
now := time.Unix(int64(testTimestamp), 0)
future := now.Add(rebroadcastInterval + 10*time.Second)
select {
case ctx.gossiper.cfg.RetransmitTicker.(*ticker.Force).Force <- future:
case <-time.After(2 * time.Second):
t.Fatalf("unable to force tick")
}
// The channel announcement + local channel update + node announcement
// should be re-broadcast.
checkAnnouncements(t, 1, 1, 1)
}
// TestNodeAnnouncementNoChannels tests that NodeAnnouncements for nodes with
// no existing channels in the graph do not get forwarded.
func TestNodeAnnouncementNoChannels(t *testing.T) {

@ -710,16 +710,20 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB,
}
s.authGossiper = discovery.New(discovery.Config{
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
Router: s.chanRouter,
Notifier: s.cc.chainNotifier,
ChainHash: *activeNetParams.GenesisHash,
Broadcast: s.BroadcastMessage,
ChanSeries: chanSeries,
NotifyWhenOnline: s.NotifyWhenOnline,
NotifyWhenOffline: s.NotifyWhenOffline,
SelfNodeAnnouncement: func(refresh bool) (lnwire.NodeAnnouncement, error) {
return s.genNodeAnnouncement(refresh)
},
ProofMatureDelta: 0,
TrickleDelay: time.Millisecond * time.Duration(cfg.TrickleDelay),
RetransmitDelay: time.Minute * 30,
RetransmitTicker: ticker.New(time.Minute * 30),
RebroadcastInterval: time.Hour * 24,
WaitingProofStore: waitingProofStore,
MessageStore: gossipMessageStore,
AnnSigner: s.nodeSigner,