discovery: make gossiper able to resend failed AnnounceSignatures

This commit makes the gossiper track the state of a local
AnnounceSignature message, such that it can retry sending
it to the remote peer if needed. It will also persist this
state in the WaitingProofStore, such that it can resume
from this state at startup.
This commit is contained in:
Johan T. Halseth 2017-11-17 19:21:50 -08:00 committed by Olaoluwa Osuntokun
parent 8a79bbf383
commit e3ae204fcb

@ -2,12 +2,14 @@ package discovery
import (
"bytes"
"encoding/binary"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/boltdb/bolt"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
@ -20,6 +22,15 @@ import (
"github.com/roasbeef/btcd/wire"
)
var (
// messageStoreKey is a key used to create a top level bucket in
// the gossiper database, used for storing messages that are to
// be sent to peers. Currently this is used for reliably sending
// AnnounceSignatures messages, by peristing them until a send
// operation has succeeded.
messageStoreKey = []byte("message-store")
)
// networkMsg couples a routing related wire message with the peer that
// originally sent it.
type networkMsg struct {
@ -78,6 +89,11 @@ type Config struct {
// messages to a particular peer identified by the target public key.
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
NotifyWhenOnline func(peer *btcec.PublicKey, connectedChan chan<- struct{})
// ProofMatureDelta the number of confirmations which is needed before
// exchange the channel announcement proofs.
ProofMatureDelta uint32
@ -327,6 +343,14 @@ func (d *AuthenticatedGossiper) Start() error {
}
d.bestHeight = height
// In case we had an AnnounceSignatures ready to be sent when the
// gossiper was last shut down, we must continue on our quest to
// deliver this message to our peer such that they can craft the
// full channel proof.
if err := d.resendAnnounceSignatures(); err != nil {
return err
}
d.wg.Add(1)
go d.networkHandler()
@ -526,6 +550,136 @@ func (d *deDupedAnnouncements) Emit() []lnwire.Message {
return announcements
}
// resendAnnounceSignatures will inspect the messageStore database
// bucket for AnnounceSignatures messages that we recently tried
// to send to a peer. If the associated channels still not have the
// full channel proofs assembled, we will try to resend them. If
// we have the full proof, we can safely delete the message from
// the messageStore.
func (d *AuthenticatedGossiper) resendAnnounceSignatures() error {
type msgTuple struct {
peer *btcec.PublicKey
msg *lnwire.AnnounceSignatures
dbKey []byte
}
// Fetch all the AnnounceSignatures messages that was added
// to the database.
// TODO(halseth): database access should be abstracted
// behind interface.
var msgsResend []msgTuple
if err := d.cfg.DB.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(messageStoreKey)
if bucket == nil {
return nil
}
// Iterate over each message added to the database.
if err := bucket.ForEach(func(k, v []byte) error {
// The database value represents the encoded
// AnnounceSignatures message.
r := bytes.NewReader(v)
msg := &lnwire.AnnounceSignatures{}
if err := msg.Decode(r, 0); err != nil {
return err
}
// The first 33 bytes of the database key is
// the peer's public key.
peer, err := btcec.ParsePubKey(k[:33], btcec.S256())
if err != nil {
return err
}
t := msgTuple{peer, msg, k}
// Add the message to the slice, such that we
// can resend it after the database transaction
// is over.
msgsResend = append(msgsResend, t)
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}
// deleteMsg removes the message associated with the passed
// msgTuple from the messageStore.
deleteMsg := func(t msgTuple) error {
log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID)
if err := d.cfg.DB.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(messageStoreKey)
if bucket == nil {
return fmt.Errorf("bucket " +
"unexpectedly did not exist")
}
return bucket.Delete(t.dbKey[:])
}); err != nil {
return fmt.Errorf("Failed deleting message "+
"from database: %v", err)
}
return nil
}
// We now iterate over these messages, resending those that we
// don't have the full proof for, deleting the rest.
for _, t := range msgsResend {
// Check if the full channel proof exists in our graph.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
t.msg.ShortChannelID)
if err != nil {
// If the channel cannot be found, it is most likely
// a leftover message for a channel that was closed.
// In this case we delete it from the message store.
log.Warnf("unable to fetch channel info for "+
"chanID=%v from graph: %v. Will delete local"+
"proof from database",
t.msg.ChannelID, err)
if err := deleteMsg(t); err != nil {
return err
}
continue
}
// 1. If the full proof does not exist in the graph,
// it means that we haven't received the remote proof
// yet (or that we crashed before able to assemble the
// full proof). Since the remote node might think they
// have delivered their proof to us, we will resend
// _our_ proof to trigger a resend on their part:
// they will then be able to assemble and send us the
// full proof.
if chanInfo.AuthProof == nil {
err := d.sendAnnSigReliably(t.msg, t.peer)
if err != nil {
return err
}
continue
}
// 2. If the proof does exist in the graph, we have
// successfully received the remote proof and assembled
// the full proof. In this case we can safely delete the
// local proof from the database. In case the remote
// hasn't been able to assemble the full proof yet
// (maybe because of a crash), we will send them the full
// proof if we notice that they retry sending their half
// proof.
if chanInfo.AuthProof != nil {
log.Debugf("Deleting message for chanID=%v from "+
"messageStore", t.msg.ChannelID)
if err := deleteMsg(t); err != nil {
return err
}
}
}
return nil
}
// networkHandler is the primary goroutine that drives this service. The roles
// of this goroutine includes answering queries related to the state of the
// network, syncing up newly connected peers, and also periodically
@ -1010,8 +1164,78 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
// The edge will get rejected if we already
// added the same edge without AuthProof to the
// graph. If the received announcement contains
// a proof, we can add this proof to our edge.
// We can end up in this situatation in the case
// where we create a channel, but for some
// reason fail to receive the remote peer's
// proof, while the remote peer is able to fully
// assemble the proof and craft the
// ChannelAnnouncement.
// TODO(halseth): the following chunk of code
// should be moved into own method, indentation
// and readability is not exactly on point.
chanInfo, e1, e2, err2 := d.cfg.Router.GetChannelByID(
msg.ShortChannelID)
if err2 != nil {
log.Errorf("Failed fetching channel "+
"edge: %v", err2)
nMsg.err <- err2
return nil
}
// If the edge already exists in the graph, but
// has no proof attached, we can add that now.
if chanInfo.AuthProof == nil && proof != nil {
chanAnn, e1Ann, e2Ann :=
createChanAnnouncement(proof,
chanInfo, e1, e2)
// Validate the assembled proof.
err := ValidateChannelAnn(chanAnn)
if err != nil {
err := errors.Errorf("assembled"+
"channel announcement "+
"proof for shortChanID=%v"+
" isn't valid: %v",
msg.ShortChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
err = d.cfg.Router.AddProof(
msg.ShortChannelID, proof)
if err != nil {
err := errors.Errorf("unable "+
"add proof to "+
"shortChanID=%v: %v",
msg.ShortChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
announcements = append(announcements,
chanAnn)
if e1Ann != nil {
announcements = append(
announcements, e1Ann)
}
if e2Ann != nil {
announcements = append(
announcements, e2Ann)
}
nMsg.err <- nil
return announcements
}
// If not, this was just an outdated edge.
log.Debugf("Router rejected channel edge: %v",
err)
} else {
log.Errorf("Router rejected channel edge: %v",
err)
@ -1283,7 +1507,8 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
// Ensure that we know of a channel with the target channel ID
// before proceeding further.
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID)
if err != nil {
// TODO(andrew.shvv) this is dangerous because remote
// node might rewrite the waiting proof.
@ -1320,6 +1545,72 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
return nil
}
// If proof was sent by a local sub-system, then we'll
// send the announcement signature to the remote node
// so they can also reconstruct the full channel
// announcement.
if !nMsg.isRemote {
var remotePeer *btcec.PublicKey
if isFirstNode {
remotePeer = chanInfo.NodeKey2
} else {
remotePeer = chanInfo.NodeKey1
}
// Since the remote peer might not be online
// we'll call a method that will attempt to
// deliver the proof when it comes online.
if err := d.sendAnnSigReliably(msg, remotePeer); err != nil {
err := errors.Errorf("unable to send reliably "+
"to remote for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
}
// Check if we already have the full proof for this channel.
if chanInfo.AuthProof != nil {
// If we already have the fully assembled proof, then
// the peer sending us their proof has probably not
// received our local proof yet. So be kind and send
// them the full proof.
if nMsg.isRemote {
peerID := nMsg.peer.SerializeCompressed()
log.Debugf("Got AnnounceSignatures for " +
"channel with full proof.")
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Debugf("Received half proof for "+
"channel %v with existing "+
"full proof. Sending full "+
"proof to peer=%x",
msg.ChannelID,
peerID)
chanAnn, _, _ := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2)
err := d.cfg.SendToPeer(nMsg.peer, chanAnn)
if err != nil {
log.Errorf("Failed sending "+
"full proof to "+
"peer=%x: %v",
peerID, err)
return
}
log.Debugf("Full proof sent to peer=%x"+
" for chanID=%v", peerID, msg.ChannelID)
}()
}
log.Debugf("Already have proof for channel "+
"with chanID=%v", msg.ChannelID)
nMsg.err <- nil
return nil
}
// Check that we received the opposite proof. If so, then we're
// now able to construct the full proof, and create the channel
// announcement. If we didn't receive the opposite half of the
@ -1346,33 +1637,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
return nil
}
// If proof was sent by a local sub-system, then we'll
// send the announcement signature to the remote node
// so they can also reconstruct the full channel
// announcement.
if !nMsg.isRemote {
// Check that first node of the channel info
// corresponds to us.
var remotePeer *btcec.PublicKey
if isFirstNode {
remotePeer = chanInfo.NodeKey2
} else {
remotePeer = chanInfo.NodeKey1
}
err := d.cfg.SendToPeer(remotePeer, msg)
if err != nil {
log.Errorf("unable to send "+
"announcement message to peer: %x",
remotePeer.SerializeCompressed())
}
log.Infof("Sent channel announcement proof "+
"for short_chan_id=%v to remote peer: "+
"%x", shortChanID,
remotePeer.SerializeCompressed())
}
log.Infof("1/2 of channel ann proof received for "+
"short_chan_id=%v, waiting for other half",
shortChanID)
@ -1381,9 +1645,9 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
return nil
}
// If we now have both halves of the channel announcement
// proof, then we'll reconstruct the initial announcement so we
// can validate it shortly below.
// We now have both halves of the channel announcement proof,
// then we'll reconstruct the initial announcement so we can
// validate it shortly below.
var dbProof channeldb.ChannelAuthProof
if isFirstNode {
dbProof.NodeSig1 = msg.NodeSignature
@ -1450,26 +1714,6 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
announcements = append(announcements, e2Ann)
}
// If this a local announcement, then we'll send it to the
// remote side so they can reconstruct the full channel
// announcement proof.
if !nMsg.isRemote {
var remotePeer *btcec.PublicKey
if isFirstNode {
remotePeer = chanInfo.NodeKey2
} else {
remotePeer = chanInfo.NodeKey1
}
log.Debugf("Sending local AnnounceSignatures message "+
"to peer(%x)", remotePeer.SerializeCompressed())
if err = d.cfg.SendToPeer(remotePeer, msg); err != nil {
log.Errorf("unable to send announcement "+
"message to peer: %x",
remotePeer.SerializeCompressed())
}
}
nMsg.err <- nil
return announcements
@ -1479,6 +1723,90 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []l
}
}
// sendAnnSigReliably will try to send the provided local AnnounceSignatures
// to the remote peer, waiting for it to come online if necessary. This
// method returns after adding the message to persistent storage, such
// that the caller knows that the message will be delivered at one point.
func (d *AuthenticatedGossiper) sendAnnSigReliably(
msg *lnwire.AnnounceSignatures, remotePeer *btcec.PublicKey) error {
// We first add this message to the database, such that in case
// we do not succeed in sending it to the peer, we'll fetch it
// from the DB next time we start, and retry. We use the peer ID
// + shortChannelID as key, as there possibly is more than one
// channel oepning in progress to the same peer.
var key [41]byte
copy(key[:33], remotePeer.SerializeCompressed())
binary.BigEndian.PutUint64(key[33:], msg.ShortChannelID.ToUint64())
err := d.cfg.DB.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(messageStoreKey)
if err != nil {
return err
}
// Encode the AnnounceSignatures message.
var b bytes.Buffer
if err := msg.Encode(&b, 0); err != nil {
return err
}
// Add the encoded message to the database using the peer
// + shortChanID as key.
return bucket.Put(key[:], b.Bytes())
})
if err != nil {
return err
}
// We have succeeded adding the message to the database. We now launch
// a goroutine that will keep on trying sending the message to the
// remote peer until it succeeds, or the gossiper shuts down. In case
// of success, the message will be removed from the database.
d.wg.Add(1)
go func() {
defer d.wg.Done()
for {
log.Debugf("Sending AnnounceSignatures for channel "+
"%v to remote peer %x", msg.ChannelID,
remotePeer.SerializeCompressed())
err := d.cfg.SendToPeer(remotePeer, msg)
if err == nil {
// Sending succeeded, we can
// continue the flow.
break
}
log.Errorf("unable to send AnnounceSignatures message "+
"to peer(%x): %v. Will retry when online.",
remotePeer.SerializeCompressed(), err)
connected := make(chan struct{})
d.cfg.NotifyWhenOnline(remotePeer, connected)
select {
case <-connected:
log.Infof("peer %x reconnected. Retry sending" +
" AnnounceSignatures.")
// Retry sending.
case <-d.quit:
log.Infof("Gossiper shutting down, did not send" +
" AnnounceSignatures.")
return
}
}
log.Infof("Sent channel announcement proof to remote peer: %x",
remotePeer.SerializeCompressed())
}()
// This method returns after the message has been added to the database,
// such that the caller don't have to wait until the message is actually
// delivered, but can be assured that it will be delivered eventually
// when this method returns.
return nil
}
// updateChannel creates a new fully signed update for the channel, and updates
// the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,