lnd.xprv/discovery/gossiper.go

2618 lines
83 KiB
Go

package discovery
import (
"bytes"
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnpeer"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/multimutex"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
)
var (
// ErrGossiperShuttingDown is an error that is returned if the gossiper
// is in the process of being shut down.
ErrGossiperShuttingDown = errors.New("gossiper is shutting down")
// ErrGossipSyncerNotFound signals that we were unable to find an active
// gossip syncer corresponding to a gossip query message received from
// the remote peer.
ErrGossipSyncerNotFound = errors.New("gossip syncer not found")
)
// optionalMsgFields is a set of optional message fields that external callers
// can provide that serve useful when processing a specific network
// announcement.
type optionalMsgFields struct {
capacity *btcutil.Amount
channelPoint *wire.OutPoint
}
// apply applies the optional fields within the functional options.
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) {
for _, optionalMsgField := range optionalMsgFields {
optionalMsgField(f)
}
}
// OptionalMsgField is a functional option parameter that can be used to provide
// external information that is not included within a network message but serves
// useful when processing it.
type OptionalMsgField func(*optionalMsgFields)
// ChannelCapacity is an optional field that lets the gossiper know of the
// capacity of a channel.
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField {
return func(f *optionalMsgFields) {
f.capacity = &capacity
}
}
// ChannelPoint is an optional field that lets the gossiper know of the outpoint
// of a channel.
func ChannelPoint(op wire.OutPoint) OptionalMsgField {
return func(f *optionalMsgFields) {
f.channelPoint = &op
}
}
// networkMsg couples a routing related wire message with the peer that
// originally sent it.
type networkMsg struct {
peer lnpeer.Peer
source *btcec.PublicKey
msg lnwire.Message
optionalMsgFields *optionalMsgFields
isRemote bool
err chan error
}
// chanPolicyUpdateRequest is a request that is sent to the server when a caller
// wishes to update the channel policy (fees e.g.) for a particular set of
// channels. New ChannelUpdate messages will be crafted to be sent out during
// the next broadcast epoch and the fee updates committed to the lower layer.
type chanPolicyUpdateRequest struct {
targetChans []wire.OutPoint
newSchema routing.ChannelPolicy
errResp chan error
}
// Config defines the configuration for the service. ALL elements within the
// configuration MUST be non-nil for the service to carry out its duties.
type Config struct {
// ChainHash is a hash that indicates which resident chain of the
// AuthenticatedGossiper. Any announcements that don't match this
// chain hash will be ignored.
//
// TODO(roasbeef): eventually make into map so can de-multiplex
// incoming announcements
// * also need to do same for Notifier
ChainHash chainhash.Hash
// Router is the subsystem which is responsible for managing the
// topology of lightning network. After incoming channel, node, channel
// updates announcements are validated they are sent to the router in
// order to be included in the LN graph.
Router routing.ChannelGraphSource
// ChanSeries is an interfaces that provides access to a time series
// view of the current known channel graph. Each GossipSyncer enabled
// peer will utilize this in order to create and respond to channel
// graph time series queries.
ChanSeries ChannelGraphTimeSeries
// Notifier is used for receiving notifications of incoming blocks.
// With each new incoming block found we process previously premature
// announcements.
//
// TODO(roasbeef): could possibly just replace this with an epoch
// channel.
Notifier chainntnfs.ChainNotifier
// Broadcast broadcasts a particular set of announcements to all peers
// that the daemon is connected to. If supplied, the exclude parameter
// indicates that the target peer should be excluded from the
// broadcast.
Broadcast func(skips map[route.Vertex]struct{},
msg ...lnwire.Message) error
// NotifyWhenOnline is a function that allows the gossiper to be
// notified when a certain peer comes online, allowing it to
// retry sending a peer message.
//
// NOTE: The peerChan channel must be buffered.
//
// TODO(wilmer): use [33]byte to avoid unnecessary serializations.
NotifyWhenOnline func(peer *btcec.PublicKey, peerChan chan<- lnpeer.Peer)
// NotifyWhenOffline is a function that allows the gossiper to be
// notified when a certain peer disconnects, allowing it to request a
// notification for when it reconnects.
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{}
// ProofMatureDelta the number of confirmations which is needed before
// exchange the channel announcement proofs.
ProofMatureDelta uint32
// TrickleDelay the period of trickle timer which flushes to the
// network the pending batch of new announcements we've received since
// the last trickle tick.
TrickleDelay time.Duration
// RetransmitDelay is the period of a timer which indicates that we
// should check if we need re-broadcast any of our personal channels.
RetransmitDelay time.Duration
// WaitingProofStore is a persistent storage of partial channel proof
// announcement messages. We use it to buffer half of the material
// needed to reconstruct a full authenticated channel announcement.
// Once we receive the other half the channel proof, we'll be able to
// properly validate it and re-broadcast it out to the network.
//
// TODO(wilmer): make interface to prevent channeldb dependency.
WaitingProofStore *channeldb.WaitingProofStore
// MessageStore is a persistent storage of gossip messages which we will
// use to determine which messages need to be resent for a given peer.
MessageStore GossipMessageStore
// AnnSigner is an instance of the MessageSigner interface which will
// be used to manually sign any outgoing channel updates. The signer
// implementation should be backed by the public key of the backing
// Lightning node.
//
// TODO(roasbeef): extract ann crafting + sign from fundingMgr into
// here?
AnnSigner lnwallet.MessageSigner
// NumActiveSyncers is the number of peers for which we should have
// active syncers with. After reaching NumActiveSyncers, any future
// gossip syncers will be passive.
NumActiveSyncers int
// RotateTicker is a ticker responsible for notifying the SyncManager
// when it should rotate its active syncers. A single active syncer with
// a chansSynced state will be exchanged for a passive syncer in order
// to ensure we don't keep syncing with the same peers.
RotateTicker ticker.Ticker
// HistoricalSyncTicker is a ticker responsible for notifying the
// syncManager when it should attempt a historical sync with a gossip
// sync peer.
HistoricalSyncTicker ticker.Ticker
// ActiveSyncerTimeoutTicker is a ticker responsible for notifying the
// syncManager when it should attempt to start the next pending
// activeSyncer due to the current one not completing its state machine
// within the timeout.
ActiveSyncerTimeoutTicker ticker.Ticker
// MinimumBatchSize is minimum size of a sub batch of announcement
// messages.
MinimumBatchSize int
// SubBatchDelay is the delay between sending sub batches of
// gossip messages.
SubBatchDelay time.Duration
}
// AuthenticatedGossiper is a subsystem which is responsible for receiving
// announcements, validating them and applying the changes to router, syncing
// lightning network with newly connected nodes, broadcasting announcements
// after validation, negotiating the channel announcement proofs exchange and
// handling the premature announcements. All outgoing announcements are
// expected to be properly signed as dictated in BOLT#7, additionally, all
// incoming message are expected to be well formed and signed. Invalid messages
// will be rejected by this struct.
type AuthenticatedGossiper struct {
// Parameters which are needed to properly handle the start and stop of
// the service. To be used atomically.
started uint32
stopped uint32
// bestHeight is the height of the block at the tip of the main chain
// as we know it. To be used atomically.
bestHeight uint32
quit chan struct{}
wg sync.WaitGroup
// cfg is a copy of the configuration struct that the gossiper service
// was initialized with.
cfg *Config
// blockEpochs encapsulates a stream of block epochs that are sent at
// every new block height.
blockEpochs *chainntnfs.BlockEpochEvent
// prematureAnnouncements maps a block height to a set of network
// messages which are "premature" from our PoV. A message is premature
// if it claims to be anchored in a block which is beyond the current
// main chain tip as we know it. Premature network messages will be
// processed once the chain tip as we know it extends to/past the
// premature height.
//
// TODO(roasbeef): limit premature networkMsgs to N
prematureAnnouncements map[uint32][]*networkMsg
// prematureChannelUpdates is a map of ChannelUpdates we have received
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
// networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the
// networkHandler.
networkMsgs chan *networkMsg
// chanPolicyUpdates is a channel that requests to update the
// forwarding policy of a set of channels is sent over.
chanPolicyUpdates chan *chanPolicyUpdateRequest
// selfKey is the identity public key of the backing Lightning node.
selfKey *btcec.PublicKey
// channelMtx is used to restrict the database access to one
// goroutine per channel ID. This is done to ensure that when
// the gossiper is handling an announcement, the db state stays
// consistent between when the DB is first read until it's written.
channelMtx *multimutex.Mutex
rejectMtx sync.RWMutex
recentRejects map[uint64]struct{}
// syncMgr is a subsystem responsible for managing the gossip syncers
// for peers currently connected. When a new peer is connected, the
// manager will create its accompanying gossip syncer and determine
// whether it should have an activeSync or passiveSync sync type based
// on how many other gossip syncers are currently active. Any activeSync
// gossip syncers are started in a round-robin manner to ensure we're
// not syncing with multiple peers at the same time.
syncMgr *SyncManager
// reliableSender is a subsystem responsible for handling reliable
// message send requests to peers. This should only be used for channels
// that are unadvertised at the time of handling the message since if it
// is advertised, then peers should be able to get the message from the
// network.
reliableSender *reliableSender
sync.Mutex
}
// New creates a new AuthenticatedGossiper instance, initialized with the
// passed configuration parameters.
func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper {
gossiper := &AuthenticatedGossiper{
selfKey: selfKey,
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
syncMgr: newSyncManager(&SyncManagerCfg{
ChainHash: cfg.ChainHash,
ChanSeries: cfg.ChanSeries,
RotateTicker: cfg.RotateTicker,
HistoricalSyncTicker: cfg.HistoricalSyncTicker,
NumActiveSyncers: cfg.NumActiveSyncers,
}),
}
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{
NotifyWhenOnline: cfg.NotifyWhenOnline,
NotifyWhenOffline: cfg.NotifyWhenOffline,
MessageStore: cfg.MessageStore,
IsMsgStale: gossiper.isMsgStale,
})
return gossiper
}
// SynchronizeNode sends a message to the service indicating it should
// synchronize lightning topology state with the target node. This method is to
// be utilized when a node connections for the first time to provide it with
// the latest topology update state. In order to accomplish this, (currently)
// the entire network graph is read from disk, then serialized to the format
// defined within the current wire protocol. This cache of graph data is then
// sent directly to the target node.
func (d *AuthenticatedGossiper) SynchronizeNode(syncPeer lnpeer.Peer) error {
// TODO(roasbeef): need to also store sig data in db
// * will be nice when we switch to pairing sigs would only need one ^_^
// We'll collate all the gathered routing messages into a single slice
// containing all the messages to be sent to the target peer.
var announceMessages []lnwire.Message
// We'll use this map to ensure we don't send the same node
// announcement more than one time as one node may have many channel
// anns we'll need to send.
nodePubsSent := make(map[route.Vertex]struct{})
// As peers are expecting channel announcements before node
// announcements, we first retrieve the initial announcement, as well as
// the latest channel update announcement for both of the directed edges
// that make up each channel, and queue these to be sent to the peer.
var (
numEdges uint32
numNodes uint32
)
if err := d.cfg.Router.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
// First, using the parameters of the channel, along with the
// channel authentication proof, we'll create re-create the
// original authenticated channel announcement. If the channel
// also has known validated nodes, then we'll send that as
// well.
if chanInfo.AuthProof != nil {
chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2,
)
if err != nil {
return err
}
announceMessages = append(announceMessages, chanAnn)
if e1Ann != nil {
announceMessages = append(announceMessages, e1Ann)
// If this edge has a validated node
// announcement, that we haven't yet sent, then
// we'll send that as well.
nodePub := e1.Node.PubKeyBytes
hasNodeAnn := e1.Node.HaveNodeAnnouncement
if _, ok := nodePubsSent[nodePub]; !ok && hasNodeAnn {
nodeAnn, err := e1.Node.NodeAnnouncement(true)
if err != nil {
return err
}
announceMessages = append(
announceMessages, nodeAnn,
)
nodePubsSent[nodePub] = struct{}{}
numNodes++
}
}
if e2Ann != nil {
announceMessages = append(announceMessages, e2Ann)
// If this edge has a validated node
// announcement, that we haven't yet sent, then
// we'll send that as well.
nodePub := e2.Node.PubKeyBytes
hasNodeAnn := e2.Node.HaveNodeAnnouncement
if _, ok := nodePubsSent[nodePub]; !ok && hasNodeAnn {
nodeAnn, err := e2.Node.NodeAnnouncement(true)
if err != nil {
return err
}
announceMessages = append(
announceMessages, nodeAnn,
)
nodePubsSent[nodePub] = struct{}{}
numNodes++
}
}
numEdges++
}
return nil
}); err != nil && err != channeldb.ErrGraphNoEdgesFound {
log.Errorf("unable to sync infos with peer: %v", err)
return err
}
log.Infof("Syncing channel graph state with %x, sending %v "+
"vertexes and %v edges", syncPeer.PubKey(),
numNodes, numEdges)
// With all the announcement messages gathered, send them all in a
// single batch to the target peer.
return syncPeer.SendMessageLazy(false, announceMessages...)
}
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to update the
// channel forwarding policies for the specified channels. If no channels are
// specified, then the update will be applied to all outgoing channels from the
// source node. Policy updates are done in two stages: first, the
// AuthenticatedGossiper ensures the update has been committed by dependent
// sub-systems, then it signs and broadcasts new updates to the network.
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate(
newSchema routing.ChannelPolicy, chanPoints ...wire.OutPoint) error {
errChan := make(chan error, 1)
policyUpdate := &chanPolicyUpdateRequest{
targetChans: chanPoints,
newSchema: newSchema,
errResp: errChan,
}
select {
case d.chanPolicyUpdates <- policyUpdate:
return <-errChan
case <-d.quit:
return fmt.Errorf("AuthenticatedGossiper shutting down")
}
}
// Start spawns network messages handler goroutine and registers on new block
// notifications in order to properly handle the premature announcements.
func (d *AuthenticatedGossiper) Start() error {
if !atomic.CompareAndSwapUint32(&d.started, 0, 1) {
return nil
}
log.Info("Authenticated Gossiper is starting")
// First we register for new notifications of newly discovered blocks.
// We do this immediately so we'll later be able to consume any/all
// blocks which were discovered.
blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return err
}
d.blockEpochs = blockEpochs
height, err := d.cfg.Router.CurrentBlockHeight()
if err != nil {
return err
}
d.bestHeight = height
// Start the reliable sender. In case we had any pending messages ready
// to be sent when the gossiper was last shut down, we must continue on
// our quest to deliver them to their respective peers.
if err := d.reliableSender.Start(); err != nil {
return err
}
d.syncMgr.Start()
d.wg.Add(1)
go d.networkHandler()
return nil
}
// Stop signals any active goroutines for a graceful closure.
func (d *AuthenticatedGossiper) Stop() {
if !atomic.CompareAndSwapUint32(&d.stopped, 0, 1) {
return
}
log.Info("Authenticated Gossiper is stopping")
d.blockEpochs.Cancel()
d.syncMgr.Stop()
close(d.quit)
d.wg.Wait()
// We'll stop our reliable sender after all of the gossiper's goroutines
// have exited to ensure nothing can cause it to continue executing.
d.reliableSender.Stop()
}
// TODO(roasbeef): need method to get current gossip timestamp?
// * using mtx, check time rotate forward is needed?
// ProcessRemoteAnnouncement sends a new remote announcement message along with
// the peer that sent the routing message. The announcement will be processed
// then added to a queue for batched trickled announcement to all connected
// peers. Remote channel announcements should contain the announcement proof
// and be fully validated.
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
peer lnpeer.Peer) chan error {
errChan := make(chan error, 1)
// For messages in the known set of channel series queries, we'll
// dispatch the message directly to the GossipSyncer, and skip the main
// processing loop.
switch m := msg.(type) {
case *lnwire.QueryShortChanIDs,
*lnwire.QueryChannelRange,
*lnwire.ReplyChannelRange,
*lnwire.ReplyShortChanIDsEnd:
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
if !ok {
log.Warnf("Gossip syncer for peer=%x not found",
peer.PubKey())
errChan <- ErrGossipSyncerNotFound
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
syncer.ProcessQueryMsg(m, peer.QuitSignal())
errChan <- nil
return errChan
// If a peer is updating its current update horizon, then we'll dispatch
// that directly to the proper GossipSyncer.
case *lnwire.GossipTimestampRange:
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey())
if !ok {
log.Warnf("Gossip syncer for peer=%x not found",
peer.PubKey())
errChan <- ErrGossipSyncerNotFound
return errChan
}
// If we've found the message target, then we'll dispatch the
// message directly to it.
if err := syncer.ApplyGossipFilter(m); err != nil {
log.Warnf("Unable to apply gossip filter for peer=%x: "+
"%v", peer.PubKey(), err)
errChan <- err
return errChan
}
errChan <- nil
return errChan
}
nMsg := &networkMsg{
msg: msg,
isRemote: true,
peer: peer,
source: peer.IdentityKey(),
err: errChan,
}
select {
case d.networkMsgs <- nMsg:
// If the peer that sent us this error is quitting, then we don't need
// to send back an error and can return immediately.
case <-peer.QuitSignal():
return nil
case <-d.quit:
nMsg.err <- ErrGossiperShuttingDown
}
return nMsg.err
}
// ProcessLocalAnnouncement sends a new remote announcement message along with
// the peer that sent the routing message. The announcement will be processed
// then added to a queue for batched trickled announcement to all connected
// peers. Local channel announcements don't contain the announcement proof and
// will not be fully validated. Once the channel proofs are received, the
// entire channel announcement and update messages will be re-constructed and
// broadcast to the rest of the network.
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
source *btcec.PublicKey, optionalFields ...OptionalMsgField) chan error {
optionalMsgFields := &optionalMsgFields{}
optionalMsgFields.apply(optionalFields...)
nMsg := &networkMsg{
msg: msg,
optionalMsgFields: optionalMsgFields,
isRemote: false,
source: source,
err: make(chan error, 1),
}
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- ErrGossiperShuttingDown
}
return nMsg.err
}
// channelUpdateID is a unique identifier for ChannelUpdate messages, as
// channel updates can be identified by the (ShortChannelID, ChannelFlags)
// tuple.
type channelUpdateID struct {
// channelID represents the set of data which is needed to
// retrieve all necessary data to validate the channel existence.
channelID lnwire.ShortChannelID
// Flags least-significant bit must be set to 0 if the creating node
// corresponds to the first node in the previously sent channel
// announcement and 1 otherwise.
flags lnwire.ChanUpdateChanFlags
}
// msgWithSenders is a wrapper struct around a message, and the set of peers
// that originally sent us this message. Using this struct, we can ensure that
// we don't re-send a message to the peer that sent it to us in the first
// place.
type msgWithSenders struct {
// msg is the wire message itself.
msg lnwire.Message
// sender is the set of peers that sent us this message.
senders map[route.Vertex]struct{}
}
// mergeSyncerMap is used to merge the set of senders of a particular message
// with peers that we have an active GossipSyncer with. We do this to ensure
// that we don't broadcast messages to any peers that we have active gossip
// syncers for.
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) {
for peerPub := range syncers {
m.senders[peerPub] = struct{}{}
}
}
// deDupedAnnouncements de-duplicates announcements that have been added to the
// batch. Internally, announcements are stored in three maps
// (one each for channel announcements, channel updates, and node
// announcements). These maps keep track of unique announcements and ensure no
// announcements are duplicated. We keep the three message types separate, such
// that we can send channel announcements first, then channel updates, and
// finally node announcements when it's time to broadcast them.
type deDupedAnnouncements struct {
// channelAnnouncements are identified by the short channel id field.
channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders
// channelUpdates are identified by the channel update id field.
channelUpdates map[channelUpdateID]msgWithSenders
// nodeAnnouncements are identified by the Vertex field.
nodeAnnouncements map[route.Vertex]msgWithSenders
sync.Mutex
}
// Reset operates on deDupedAnnouncements to reset the storage of
// announcements.
func (d *deDupedAnnouncements) Reset() {
d.Lock()
defer d.Unlock()
d.reset()
}
// reset is the private version of the Reset method. We have this so we can
// call this method within method that are already holding the lock.
func (d *deDupedAnnouncements) reset() {
// Storage of each type of announcement (channel announcements, channel
// updates, node announcements) is set to an empty map where the
// appropriate key points to the corresponding lnwire.Message.
d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders)
d.channelUpdates = make(map[channelUpdateID]msgWithSenders)
d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders)
}
// addMsg adds a new message to the current batch. If the message is already
// present in the current batch, then this new instance replaces the latter,
// and the set of senders is updated to reflect which node sent us this
// message.
func (d *deDupedAnnouncements) addMsg(message networkMsg) {
// Depending on the message type (channel announcement, channel update,
// or node announcement), the message is added to the corresponding map
// in deDupedAnnouncements. Because each identifying key can have at
// most one value, the announcements are de-duplicated, with newer ones
// replacing older ones.
switch msg := message.msg.(type) {
// Channel announcements are identified by the short channel id field.
case *lnwire.ChannelAnnouncement:
deDupKey := msg.ShortChannelID
sender := route.NewVertex(message.source)
mws, ok := d.channelAnnouncements[deDupKey]
if !ok {
mws = msgWithSenders{
msg: msg,
senders: make(map[route.Vertex]struct{}),
}
mws.senders[sender] = struct{}{}
d.channelAnnouncements[deDupKey] = mws
return
}
mws.msg = msg
mws.senders[sender] = struct{}{}
d.channelAnnouncements[deDupKey] = mws
// Channel updates are identified by the (short channel id,
// channelflags) tuple.
case *lnwire.ChannelUpdate:
sender := route.NewVertex(message.source)
deDupKey := channelUpdateID{
msg.ShortChannelID,
msg.ChannelFlags,
}
oldTimestamp := uint32(0)
mws, ok := d.channelUpdates[deDupKey]
if ok {
// If we already have seen this message, record its
// timestamp.
oldTimestamp = mws.msg.(*lnwire.ChannelUpdate).Timestamp
}
// If we already had this message with a strictly newer
// timestamp, then we'll just discard the message we got.
if oldTimestamp > msg.Timestamp {
return
}
// If the message we just got is newer than what we previously
// have seen, or this is the first time we see it, then we'll
// add it to our map of announcements.
if oldTimestamp < msg.Timestamp {
mws = msgWithSenders{
msg: msg,
senders: make(map[route.Vertex]struct{}),
}
// We'll mark the sender of the message in the
// senders map.
mws.senders[sender] = struct{}{}
d.channelUpdates[deDupKey] = mws
return
}
// Lastly, if we had seen this exact message from before, with
// the same timestamp, we'll add the sender to the map of
// senders, such that we can skip sending this message back in
// the next batch.
mws.msg = msg
mws.senders[sender] = struct{}{}
d.channelUpdates[deDupKey] = mws
// Node announcements are identified by the Vertex field. Use the
// NodeID to create the corresponding Vertex.
case *lnwire.NodeAnnouncement:
sender := route.NewVertex(message.source)
deDupKey := route.Vertex(msg.NodeID)
// We do the same for node announcements as we did for channel
// updates, as they also carry a timestamp.
oldTimestamp := uint32(0)
mws, ok := d.nodeAnnouncements[deDupKey]
if ok {
oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp
}
// Discard the message if it's old.
if oldTimestamp > msg.Timestamp {
return
}
// Replace if it's newer.
if oldTimestamp < msg.Timestamp {
mws = msgWithSenders{
msg: msg,
senders: make(map[route.Vertex]struct{}),
}
mws.senders[sender] = struct{}{}
d.nodeAnnouncements[deDupKey] = mws
return
}
// Add to senders map if it's the same as we had.
mws.msg = msg
mws.senders[sender] = struct{}{}
d.nodeAnnouncements[deDupKey] = mws
}
}
// AddMsgs is a helper method to add multiple messages to the announcement
// batch.
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) {
d.Lock()
defer d.Unlock()
for _, msg := range msgs {
d.addMsg(msg)
}
}
// Emit returns the set of de-duplicated announcements to be sent out during
// the next announcement epoch, in the order of channel announcements, channel
// updates, and node announcements. Each message emitted, contains the set of
// peers that sent us the message. This way, we can ensure that we don't waste
// bandwidth by re-sending a message to the peer that sent it to us in the
// first place. Additionally, the set of stored messages are reset.
func (d *deDupedAnnouncements) Emit() []msgWithSenders {
d.Lock()
defer d.Unlock()
// Get the total number of announcements.
numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) +
len(d.nodeAnnouncements)
// Create an empty array of lnwire.Messages with a length equal to
// the total number of announcements.
msgs := make([]msgWithSenders, 0, numAnnouncements)
// Add the channel announcements to the array first.
for _, message := range d.channelAnnouncements {
msgs = append(msgs, message)
}
// Then add the channel updates.
for _, message := range d.channelUpdates {
msgs = append(msgs, message)
}
// Finally add the node announcements.
for _, message := range d.nodeAnnouncements {
msgs = append(msgs, message)
}
d.reset()
// Return the array of lnwire.messages.
return msgs
}
// calculateSubBatchSize is a helper function that calculates the size to break
// down the batchSize into.
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration,
minimumBatchSize, batchSize int) int {
if subBatchDelay > totalDelay {
return batchSize
}
subBatchSize := (int(batchSize)*int(subBatchDelay) + int(totalDelay) - 1) /
int(totalDelay)
if subBatchSize < minimumBatchSize {
return minimumBatchSize
}
return subBatchSize
}
// splitAnnouncementBatches takes an exiting list of announcements and
// decomposes it into sub batches controlled by the `subBatchSize`.
func splitAnnouncementBatches(subBatchSize int,
announcementBatch []msgWithSenders) [][]msgWithSenders {
var splitAnnouncementBatch [][]msgWithSenders
for subBatchSize < len(announcementBatch) {
// For slicing with minimal allocation
// https://github.com/golang/go/wiki/SliceTricks
announcementBatch, splitAnnouncementBatch =
announcementBatch[subBatchSize:],
append(splitAnnouncementBatch,
announcementBatch[0:subBatchSize:subBatchSize])
}
splitAnnouncementBatch = append(splitAnnouncementBatch, announcementBatch)
return splitAnnouncementBatch
}
// sendBatch broadcasts a list of announcements to our peers.
func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) {
syncerPeers := d.syncMgr.GossipSyncers()
// We'll first attempt to filter out this new message
// for all peers that have active gossip syncers
// active.
for _, syncer := range syncerPeers {
syncer.FilterGossipMsgs(announcementBatch...)
}
for _, msgChunk := range announcementBatch {
// With the syncers taken care of, we'll merge
// the sender map with the set of syncers, so
// we don't send out duplicate messages.
msgChunk.mergeSyncerMap(syncerPeers)
err := d.cfg.Broadcast(
msgChunk.senders, msgChunk.msg,
)
if err != nil {
log.Errorf("Unable to send batch "+
"announcements: %v", err)
continue
}
}
}
// networkHandler is the primary goroutine that drives this service. The roles
// of this goroutine includes answering queries related to the state of the
// network, syncing up newly connected peers, and also periodically
// broadcasting our latest topology state to all connected peers.
//
// NOTE: This MUST be run as a goroutine.
func (d *AuthenticatedGossiper) networkHandler() {
defer d.wg.Done()
// Initialize empty deDupedAnnouncements to store announcement batch.
announcements := deDupedAnnouncements{}
announcements.Reset()
retransmitTimer := time.NewTicker(d.cfg.RetransmitDelay)
defer retransmitTimer.Stop()
trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
defer trickleTimer.Stop()
// To start, we'll first check to see if there are any stale channels
// that we need to re-transmit.
if err := d.retransmitStaleChannels(); err != nil {
log.Errorf("Unable to rebroadcast stale channels: %v", err)
}
// We'll use this validation to ensure that we process jobs in their
// dependency order during parallel validation.
validationBarrier := routing.NewValidationBarrier(
runtime.NumCPU()*4, d.quit,
)
for {
select {
// A new policy update has arrived. We'll commit it to the
// sub-systems below us, then craft, sign, and broadcast a new
// ChannelUpdate for the set of affected clients.
case policyUpdate := <-d.chanPolicyUpdates:
// First, we'll now create new fully signed updates for
// the affected channels and also update the underlying
// graph with the new state.
newChanUpdates, err := d.processChanPolicyUpdate(
policyUpdate,
)
if err != nil {
log.Errorf("Unable to craft policy updates: %v",
err)
policyUpdate.errResp <- err
continue
}
// Finally, with the updates committed, we'll now add
// them to the announcement batch to be flushed at the
// start of the next epoch.
announcements.AddMsgs(newChanUpdates...)
policyUpdate.errResp <- nil
case announcement := <-d.networkMsgs:
switch announcement.msg.(type) {
// Channel announcement signatures are amongst the only
// messages that we'll process serially.
case *lnwire.AnnounceSignatures:
emittedAnnouncements := d.processNetworkAnnouncement(
announcement,
)
if emittedAnnouncements != nil {
announcements.AddMsgs(
emittedAnnouncements...,
)
}
continue
}
// If this message was recently rejected, then we won't
// attempt to re-process it.
if d.isRecentlyRejectedMsg(announcement.msg) {
announcement.err <- fmt.Errorf("recently " +
"rejected")
continue
}
// We'll set up any dependent, and wait until a free
// slot for this job opens up, this allow us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(announcement.msg)
d.wg.Add(1)
go func() {
defer d.wg.Done()
defer validationBarrier.CompleteJob()
// If this message has an existing dependency,
// then we'll wait until that has been fully
// validated before we proceed.
err := validationBarrier.WaitForDependants(
announcement.msg,
)
if err != nil {
if err != routing.ErrVBarrierShuttingDown {
log.Warnf("unexpected error "+
"during validation "+
"barrier shutdown: %v",
err)
}
announcement.err <- err
return
}
// Process the network announcement to
// determine if this is either a new
// announcement from our PoV or an edges to a
// prior vertex/edge we previously proceeded.
emittedAnnouncements := d.processNetworkAnnouncement(
announcement,
)
// If this message had any dependencies, then
// we can now signal them to continue.
validationBarrier.SignalDependants(
announcement.msg,
)
// If the announcement was accepted, then add
// the emitted announcements to our announce
// batch to be broadcast once the trickle timer
// ticks gain.
if emittedAnnouncements != nil {
// TODO(roasbeef): exclude peer that
// sent.
announcements.AddMsgs(
emittedAnnouncements...,
)
}
}()
// A new block has arrived, so we can re-process the previously
// premature announcements.
case newBlock, ok := <-d.blockEpochs.Epochs:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
if !ok {
return
}
// Once a new block arrives, we updates our running
// track of the height of the chain tip.
blockHeight := uint32(newBlock.Height)
atomic.StoreUint32(&d.bestHeight, blockHeight)
// Next we check if we have any premature announcements
// for this height, if so, then we process them once
// more as normal announcements.
d.Lock()
numPremature := len(d.prematureAnnouncements[blockHeight])
d.Unlock()
// Return early if no announcement to process.
if numPremature == 0 {
continue
}
log.Infof("Re-processing %v premature announcements "+
"for height %v", numPremature, blockHeight)
d.Lock()
for _, ann := range d.prematureAnnouncements[blockHeight] {
emittedAnnouncements := d.processNetworkAnnouncement(ann)
if emittedAnnouncements != nil {
announcements.AddMsgs(
emittedAnnouncements...,
)
}
}
delete(d.prematureAnnouncements, blockHeight)
d.Unlock()
// The trickle timer has ticked, which indicates we should
// flush to the network the pending batch of new announcements
// we've received since the last trickle tick.
case <-trickleTimer.C:
// Emit the current batch of announcements from
// deDupedAnnouncements.
announcementBatch := announcements.Emit()
// If the current announcements batch is nil, then we
// have no further work here.
if len(announcementBatch) == 0 {
continue
}
// Next, If we have new things to announce then
// broadcast them to all our immediately connected
// peers.
subBatchSize := calculateSubBatchSize(
d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize,
len(announcementBatch),
)
splitAnnouncementBatch := splitAnnouncementBatches(
subBatchSize, announcementBatch,
)
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Infof("Broadcasting %v new announcements in %d sub batches",
len(announcementBatch), len(splitAnnouncementBatch))
for _, announcementBatch := range splitAnnouncementBatch {
d.sendBatch(announcementBatch)
select {
case <-time.After(d.cfg.SubBatchDelay):
case <-d.quit:
return
}
}
}()
// The retransmission timer has ticked which indicates that we
// should check if we need to prune or re-broadcast any of our
// personal channels. This addresses the case of "zombie"
// channels and channel advertisements that have been dropped,
// or not properly propagated through the network.
case <-retransmitTimer.C:
if err := d.retransmitStaleChannels(); err != nil {
log.Errorf("unable to rebroadcast stale "+
"channels: %v", err)
}
// The gossiper has been signalled to exit, to we exit our
// main loop so the wait group can be decremented.
case <-d.quit:
return
}
}
}
// TODO(roasbeef): d/c peers that send updates not on our chain
// InitSyncState is called by outside sub-systems when a connection is
// established to a new peer that understands how to perform channel range
// queries. We'll allocate a new gossip syncer for it, and start any goroutines
// needed to handle new queries.
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) {
d.syncMgr.InitSyncState(syncPeer)
}
// PruneSyncState is called by outside sub-systems once a peer that we were
// previously connected to has been disconnected. In this case we can stop the
// existing GossipSyncer assigned to the peer and free up resources.
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) {
d.syncMgr.PruneSyncState(peer)
}
// isRecentlyRejectedMsg returns true if we recently rejected a message, and
// false otherwise, This avoids expensive reprocessing of the message.
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool {
d.rejectMtx.RLock()
defer d.rejectMtx.RUnlock()
switch m := msg.(type) {
case *lnwire.ChannelUpdate:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
return ok
case *lnwire.ChannelAnnouncement:
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()]
return ok
default:
return false
}
}
// retransmitStaleChannels examines all outgoing channels that the source node
// is known to maintain to check to see if any of them are "stale". A channel
// is stale iff, the last timestamp of its rebroadcast is older then
// broadcastInterval.
func (d *AuthenticatedGossiper) retransmitStaleChannels() error {
// Iterate over all of our channels and check if any of them fall
// within the prune interval or re-broadcast interval.
type updateTuple struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []updateTuple
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If there's no auth proof attached to this edge, it means
// that it is a private channel not meant to be announced to
// the greater network, so avoid sending channel updates for
// this channel to not leak its
// existence.
if info.AuthProof == nil {
log.Debugf("Skipping retransmission of channel "+
"without AuthProof: %v", info.ChannelID)
return nil
}
// If this edge has a ChannelUpdate that was created before the
// introduction of the MaxHTLC field, then we'll update this
// edge to propagate this information in the network.
if !edge.MessageFlags.HasMaxHtlc() {
edgesToUpdate = append(edgesToUpdate, updateTuple{
info: info,
edge: edge,
})
return nil
}
const broadcastInterval = time.Hour * 24
timeElapsed := time.Since(edge.LastUpdate)
// If it's been a full day since we've re-broadcasted the
// channel, add the channel to the set of edges we need to
// update.
if timeElapsed >= broadcastInterval {
edgesToUpdate = append(edgesToUpdate, updateTuple{
info: info,
edge: edge,
})
}
return nil
})
if err != nil && err != channeldb.ErrGraphNoEdgesFound {
return fmt.Errorf("unable to retrieve outgoing channels: %v",
err)
}
var signedUpdates []lnwire.Message
for _, chanToUpdate := range edgesToUpdate {
// Re-sign and update the channel on disk and retrieve our
// ChannelUpdate to broadcast.
chanAnn, chanUpdate, err := d.updateChannel(
chanToUpdate.info, chanToUpdate.edge,
)
if err != nil {
return fmt.Errorf("unable to update channel: %v", err)
}
// If we have a valid announcement to transmit, then we'll send
// that along with the update.
if chanAnn != nil {
signedUpdates = append(signedUpdates, chanAnn)
}
signedUpdates = append(signedUpdates, chanUpdate)
}
// If we don't have any channels to re-broadcast, then we'll exit
// early.
if len(signedUpdates) == 0 {
return nil
}
log.Infof("Retransmitting %v outgoing channels", len(edgesToUpdate))
// With all the wire announcements properly crafted, we'll broadcast
// our known outgoing channels to all our immediate peers.
if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil {
return fmt.Errorf("unable to re-broadcast channels: %v", err)
}
return nil
}
// processChanPolicyUpdate generates a new set of channel updates with the new
// channel policy applied for each specified channel identified by its channel
// point. In the case that no channel points are specified, then the update
// will be applied to all channels. Finally, the backing ChannelGraphSource is
// updated with the latest information reflecting the applied updates.
//
// TODO(roasbeef): generalize into generic for any channel update
func (d *AuthenticatedGossiper) processChanPolicyUpdate(
policyUpdate *chanPolicyUpdateRequest) ([]networkMsg, error) {
// First, we'll construct a set of all the channels that need to be
// updated.
chansToUpdate := make(map[wire.OutPoint]struct{})
for _, chanPoint := range policyUpdate.targetChans {
chansToUpdate[chanPoint] = struct{}{}
}
haveChanFilter := len(chansToUpdate) != 0
if haveChanFilter {
log.Infof("Updating routing policies for chan_points=%v",
spew.Sdump(chansToUpdate))
} else {
log.Infof("Updating routing policies for all chans")
}
type edgeWithInfo struct {
info *channeldb.ChannelEdgeInfo
edge *channeldb.ChannelEdgePolicy
}
var edgesToUpdate []edgeWithInfo
// Next, we'll loop over all the outgoing channels the router knows of.
// If we have a filter then we'll only collected those channels,
// otherwise we'll collect them all.
err := d.cfg.Router.ForAllOutgoingChannels(func(
info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) error {
// If we have a channel filter, and this channel isn't a part
// of it, then we'll skip it.
if _, ok := chansToUpdate[info.ChannelPoint]; !ok && haveChanFilter {
return nil
}
// Now that we know we should update this channel, we'll update
// its set of policies.
edge.FeeBaseMSat = policyUpdate.newSchema.BaseFee
edge.FeeProportionalMillionths = lnwire.MilliSatoshi(
policyUpdate.newSchema.FeeRate,
)
edge.TimeLockDelta = uint16(policyUpdate.newSchema.TimeLockDelta)
edgesToUpdate = append(edgesToUpdate, edgeWithInfo{
info: info,
edge: edge,
})
return nil
})
if err != nil {
return nil, err
}
// With the set of edges we need to update retrieved, we'll now re-sign
// them, and insert them into the database.
var chanUpdates []networkMsg
for _, edgeInfo := range edgesToUpdate {
// Now that we've collected all the channels we need to update,
// we'll Re-sign and update the backing ChannelGraphSource, and
// retrieve our ChannelUpdate to broadcast.
_, chanUpdate, err := d.updateChannel(
edgeInfo.info, edgeInfo.edge,
)
if err != nil {
return nil, err
}
// We'll avoid broadcasting any updates for private channels to
// avoid directly giving away their existence. Instead, we'll
// send the update directly to the remote party.
if edgeInfo.info.AuthProof == nil {
remotePubKey := remotePubFromChanInfo(
edgeInfo.info, chanUpdate.ChannelFlags,
)
err := d.reliableSender.sendMessage(
chanUpdate, remotePubKey,
)
if err != nil {
log.Errorf("Unable to reliably send %v for "+
"channel=%v to peer=%x: %v",
chanUpdate.MsgType(),
chanUpdate.ShortChannelID,
remotePubKey, err)
}
continue
}
// We set ourselves as the source of this message to indicate
// that we shouldn't skip any peers when sending this message.
chanUpdates = append(chanUpdates, networkMsg{
source: d.selfKey,
msg: chanUpdate,
})
}
return chanUpdates, nil
}
// processRejectedEdge examines a rejected edge to see if we can extract any
// new announcements from it. An edge will get rejected if we already added
// the same edge without AuthProof to the graph. If the received announcement
// contains a proof, we can add this proof to our edge. We can end up in this
// situation in the case where we create a channel, but for some reason fail
// to receive the remote peer's proof, while the remote peer is able to fully
// assemble the proof and craft the ChannelAnnouncement.
func (d *AuthenticatedGossiper) processRejectedEdge(
chanAnnMsg *lnwire.ChannelAnnouncement,
proof *channeldb.ChannelAuthProof) ([]networkMsg, error) {
// First, we'll fetch the state of the channel as we know if from the
// database.
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
chanAnnMsg.ShortChannelID,
)
if err != nil {
return nil, err
}
// The edge is in the graph, and has a proof attached, then we'll just
// reject it as normal.
if chanInfo.AuthProof != nil {
return nil, nil
}
// Otherwise, this means that the edge is within the graph, but it
// doesn't yet have a proper proof attached. If we did not receive
// the proof such that we now can add it, there's nothing more we
// can do.
if proof == nil {
return nil, nil
}
// We'll then create then validate the new fully assembled
// announcement.
chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(
proof, chanInfo, e1, e2,
)
if err != nil {
return nil, err
}
err = routing.ValidateChannelAnn(chanAnn)
if err != nil {
err := fmt.Errorf("assembled channel announcement proof "+
"for shortChanID=%v isn't valid: %v",
chanAnnMsg.ShortChannelID, err)
log.Error(err)
return nil, err
}
// If everything checks out, then we'll add the fully assembled proof
// to the database.
err = d.cfg.Router.AddProof(chanAnnMsg.ShortChannelID, proof)
if err != nil {
err := fmt.Errorf("unable add proof to shortChanID=%v: %v",
chanAnnMsg.ShortChannelID, err)
log.Error(err)
return nil, err
}
// As we now have a complete channel announcement for this channel,
// we'll construct the announcement so they can be broadcast out to all
// our peers.
announcements := make([]networkMsg, 0, 3)
announcements = append(announcements, networkMsg{
source: d.selfKey,
msg: chanAnn,
})
if e1Ann != nil {
announcements = append(announcements, networkMsg{
source: d.selfKey,
msg: e1Ann,
})
}
if e2Ann != nil {
announcements = append(announcements, networkMsg{
source: d.selfKey,
msg: e2Ann,
})
}
return announcements, nil
}
// processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid,
// or redundant, then nil is returned. Otherwise, the set of announcements will
// be returned which should be broadcasted to the rest of the network.
func (d *AuthenticatedGossiper) processNetworkAnnouncement(
nMsg *networkMsg) []networkMsg {
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
// TODO(roasbeef) make height delta 6
// * or configurable
bestHeight := atomic.LoadUint32(&d.bestHeight)
return chanID.BlockHeight+delta > bestHeight
}
var announcements []networkMsg
switch msg := nMsg.msg.(type) {
// A new node announcement has arrived which either presents new
// information about a node in one of the channels we know about, or a
// updating previously advertised information.
case *lnwire.NodeAnnouncement:
timestamp := time.Unix(int64(msg.Timestamp), 0)
// We'll quickly ask the router if it already has a
// newer update for this node so we can skip validating
// signatures if not required.
if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) {
nMsg.err <- nil
return nil
}
if err := routing.ValidateNodeAnn(msg); err != nil {
err := fmt.Errorf("unable to validate "+
"node announcement: %v", err)
log.Error(err)
nMsg.err <- err
return nil
}
features := lnwire.NewFeatureVector(
msg.Features, lnwire.GlobalFeatures,
)
node := &channeldb.LightningNode{
HaveNodeAnnouncement: true,
LastUpdate: timestamp,
Addresses: msg.Addresses,
PubKeyBytes: msg.NodeID,
Alias: msg.Alias.String(),
AuthSigBytes: msg.Signature.ToSignatureBytes(),
Features: features,
Color: msg.RGBColor,
ExtraOpaqueData: msg.ExtraOpaqueData,
}
if err := d.cfg.Router.AddNode(node); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
log.Debug(err)
} else {
log.Error(err)
}
nMsg.err <- err
return nil
}
// In order to ensure we don't leak unadvertised nodes, we'll
// make a quick check to ensure this node intends to publicly
// advertise itself to the network.
isPublic, err := d.cfg.Router.IsPublicNode(node.PubKeyBytes)
if err != nil {
log.Errorf("Unable to determine if node %x is "+
"advertised: %v", node.PubKeyBytes, err)
nMsg.err <- err
return nil
}
// If it does, we'll add their announcement to our batch so that
// it can be broadcast to the rest of our peers.
if isPublic {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: msg,
})
} else {
log.Tracef("Skipping broadcasting node announcement "+
"for %x due to being unadvertised", msg.NodeID)
}
nMsg.err <- nil
// TODO(roasbeef): get rid of the above
return announcements
// A new channel announcement has arrived, this indicates the
// *creation* of a new channel within the network. This only advertises
// the existence of a channel and not yet the routing policies in
// either direction of the channel.
case *lnwire.ChannelAnnouncement:
// We'll ignore any channel announcements that target any chain
// other than the set of chains we know of.
if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) {
err := fmt.Errorf("Ignoring ChannelAnnouncement from "+
"chain=%v, gossiper on chain=%v", msg.ChainHash,
d.cfg.ChainHash)
log.Errorf(err.Error())
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
nMsg.err <- err
return nil
}
// If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
blockHeight := msg.ShortChannelID.BlockHeight
log.Infof("Announcement for chan_id=(%v), is "+
"premature: advertises height %v, only "+
"height %v is known",
msg.ShortChannelID.ToUint64(),
msg.ShortChannelID.BlockHeight,
atomic.LoadUint32(&d.bestHeight))
d.Lock()
d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight],
nMsg,
)
d.Unlock()
return nil
}
// At this point, we'll now ask the router if this is a
// zombie/known edge. If so we can skip all the processing
// below.
if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) {
nMsg.err <- nil
return nil
}
// If this is a remote channel announcement, then we'll validate
// all the signatures within the proof as it should be well
// formed.
var proof *channeldb.ChannelAuthProof
if nMsg.isRemote {
if err := routing.ValidateChannelAnn(msg); err != nil {
err := fmt.Errorf("unable to validate "+
"announcement: %v", err)
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
log.Error(err)
nMsg.err <- err
return nil
}
// If the proof checks out, then we'll save the proof
// itself to the database so we can fetch it later when
// gossiping with other nodes.
proof = &channeldb.ChannelAuthProof{
NodeSig1Bytes: msg.NodeSig1.ToSignatureBytes(),
NodeSig2Bytes: msg.NodeSig2.ToSignatureBytes(),
BitcoinSig1Bytes: msg.BitcoinSig1.ToSignatureBytes(),
BitcoinSig2Bytes: msg.BitcoinSig2.ToSignatureBytes(),
}
}
// With the proof validate (if necessary), we can now store it
// within the database for our path finding and syncing needs.
var featureBuf bytes.Buffer
if err := msg.Features.Encode(&featureBuf); err != nil {
log.Errorf("unable to encode features: %v", err)
nMsg.err <- err
return nil
}
edge := &channeldb.ChannelEdgeInfo{
ChannelID: msg.ShortChannelID.ToUint64(),
ChainHash: msg.ChainHash,
NodeKey1Bytes: msg.NodeID1,
NodeKey2Bytes: msg.NodeID2,
BitcoinKey1Bytes: msg.BitcoinKey1,
BitcoinKey2Bytes: msg.BitcoinKey2,
AuthProof: proof,
Features: featureBuf.Bytes(),
ExtraOpaqueData: msg.ExtraOpaqueData,
}
// If there were any optional message fields provided, we'll
// include them in its serialized disk representation now.
if nMsg.optionalMsgFields != nil {
if nMsg.optionalMsgFields.capacity != nil {
edge.Capacity = *nMsg.optionalMsgFields.capacity
}
if nMsg.optionalMsgFields.channelPoint != nil {
edge.ChannelPoint = *nMsg.optionalMsgFields.channelPoint
}
}
// We will add the edge to the channel router. If the nodes
// present in this channel are not present in the database, a
// partial node will be added to represent each node while we
// wait for a node announcement.
//
// Before we add the edge to the database, we obtain
// the mutex for this channel ID. We do this to ensure
// no other goroutine has read the database and is now
// making decisions based on this DB state, before it
// writes to the DB.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
if err := d.cfg.Router.AddEdge(edge); err != nil {
// If the edge was rejected due to already being known,
// then it may be that case that this new message has a
// fresh channel proof, so we'll check.
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
// Attempt to process the rejected message to
// see if we get any new announcements.
anns, rErr := d.processRejectedEdge(msg, proof)
if rErr != nil {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
nMsg.err <- rErr
return nil
}
// If while processing this rejected edge, we
// realized there's a set of announcements we
// could extract, then we'll return those
// directly.
if len(anns) != 0 {
nMsg.err <- nil
return anns
}
// Otherwise, this is just a regular rejected
// edge.
log.Debugf("Router rejected channel "+
"edge: %v", err)
} else {
log.Tracef("Router rejected channel "+
"edge: %v", err)
}
nMsg.err <- err
return nil
}
// If we earlier received any ChannelUpdates for this channel,
// we can now process them, as the channel is added to the
// graph.
shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg
d.pChanUpdMtx.Lock()
for _, cu := range d.prematureChannelUpdates[shortChanID] {
channelUpdates = append(channelUpdates, cu)
}
// Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock()
// Launch a new goroutine to handle each ChannelUpdate, this to
// ensure we don't block here, as we can handle only one
// announcement at a time.
for _, cu := range channelUpdates {
d.wg.Add(1)
go func(nMsg *networkMsg) {
defer d.wg.Done()
switch msg := nMsg.msg.(type) {
// Reprocess the message, making sure we return
// an error to the original caller in case the
// gossiper shuts down.
case *lnwire.ChannelUpdate:
log.Debugf("Reprocessing"+
" ChannelUpdate for "+
"shortChanID=%v",
msg.ShortChannelID.ToUint64())
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- ErrGossiperShuttingDown
}
// We don't expect any other message type than
// ChannelUpdate to be in this map.
default:
log.Errorf("Unsupported message type "+
"found among ChannelUpdates: "+
"%T", msg)
}
}(cu)
}
// Channel announcement was successfully proceeded and know it
// might be broadcast to other connected nodes if it was
// announcement with proof (remote).
if proof != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: msg,
})
}
nMsg.err <- nil
return announcements
// A new authenticated channel edge update has arrived. This indicates
// that the directional information for an already known channel has
// been updated.
case *lnwire.ChannelUpdate:
// We'll ignore any channel announcements that target any chain
// other than the set of chains we know of.
if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) {
err := fmt.Errorf("Ignoring ChannelUpdate from "+
"chain=%v, gossiper on chain=%v", msg.ChainHash,
d.cfg.ChainHash)
log.Errorf(err.Error())
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
nMsg.err <- err
return nil
}
blockHeight := msg.ShortChannelID.BlockHeight
shortChanID := msg.ShortChannelID.ToUint64()
// If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+
"short_chan_id(%v), is premature: advertises "+
"height %v, only height %v is known",
shortChanID, blockHeight,
atomic.LoadUint32(&d.bestHeight))
d.Lock()
d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight],
nMsg,
)
d.Unlock()
return nil
}
// Before we perform any of the expensive checks below, we'll
// check whether this update is stale or is for a zombie
// channel in order to quickly reject it.
timestamp := time.Unix(int64(msg.Timestamp), 0)
if d.cfg.Router.IsStaleEdgePolicy(
msg.ShortChannelID, timestamp, msg.ChannelFlags,
) {
nMsg.err <- nil
return nil
}
// Get the node pub key as far as we don't have it in channel
// update announcement message. We'll need this to properly
// verify message signature.
//
// We make sure to obtain the mutex for this channel ID
// before we access the database. This ensures the state
// we read from the database has not changed between this
// point and when we call UpdateEdge() later.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
switch err {
// No error, break.
case nil:
break
case channeldb.ErrZombieEdge:
// Since we've deemed the update as not stale above,
// before marking it live, we'll make sure it has been
// signed by the correct party. The least-significant
// bit in the flag on the channel update tells us which
// edge is being updated.
var pubKey *btcec.PublicKey
switch {
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
pubKey, _ = chanInfo.NodeKey1()
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
pubKey, _ = chanInfo.NodeKey2()
}
err := routing.VerifyChannelUpdateSignature(msg, pubKey)
if err != nil {
err := fmt.Errorf("unable to verify channel "+
"update signature: %v", err)
log.Error(err)
nMsg.err <- err
return nil
}
// With the signature valid, we'll proceed to mark the
// edge as live and wait for the channel announcement to
// come through again.
err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID)
if err != nil {
err := fmt.Errorf("unable to remove edge with "+
"chan_id=%v from zombie index: %v",
msg.ShortChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
log.Debugf("Removed edge with chan_id=%v from zombie "+
"index", msg.ShortChannelID)
// We'll fallthrough to ensure we stash the update until
// we receive its corresponding ChannelAnnouncement.
// This is needed to ensure the edge exists in the graph
// before applying the update.
fallthrough
case channeldb.ErrGraphNotFound:
fallthrough
case channeldb.ErrGraphNoEdgesFound:
fallthrough
case channeldb.ErrEdgeNotFound:
// If the edge corresponding to this ChannelUpdate was
// not found in the graph, this might be a channel in
// the process of being opened, and we haven't processed
// our own ChannelAnnouncement yet, hence it is not
// found in the graph. This usually gets resolved after
// the channel proofs are exchanged and the channel is
// broadcasted to the rest of the network, but in case
// this is a private channel this won't ever happen.
// This can also happen in the case of a zombie channel
// with a fresh update for which we don't have a
// ChannelAnnouncement for since we reject them. Because
// of this, we temporarily add it to a map, and
// reprocess it after our own ChannelAnnouncement has
// been processed.
d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID], nMsg,
)
d.pChanUpdMtx.Unlock()
log.Debugf("Got ChannelUpdate for edge not found in "+
"graph(shortChanID=%v), saving for "+
"reprocessing later", shortChanID)
// NOTE: We don't return anything on the error channel
// for this message, as we expect that will be done when
// this ChannelUpdate is later reprocessed.
return nil
default:
err := fmt.Errorf("unable to validate channel update "+
"short_chan_id=%v: %v", shortChanID, err)
log.Error(err)
nMsg.err <- err
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
return nil
}
// The least-significant bit in the flag on the channel update
// announcement tells us "which" side of the channels directed
// edge is being updated.
var pubKey *btcec.PublicKey
switch {
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:
pubKey, _ = chanInfo.NodeKey1()
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:
pubKey, _ = chanInfo.NodeKey2()
}
// Validate the channel announcement with the expected public key and
// channel capacity. In the case of an invalid channel update, we'll
// return an error to the caller and exit early.
err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, msg)
if err != nil {
rErr := fmt.Errorf("unable to validate channel "+
"update announcement for short_chan_id=%v: %v",
spew.Sdump(msg.ShortChannelID), err)
log.Error(rErr)
nMsg.err <- rErr
return nil
}
update := &channeldb.ChannelEdgePolicy{
SigBytes: msg.Signature.ToSignatureBytes(),
ChannelID: shortChanID,
LastUpdate: timestamp,
MessageFlags: msg.MessageFlags,
ChannelFlags: msg.ChannelFlags,
TimeLockDelta: msg.TimeLockDelta,
MinHTLC: msg.HtlcMinimumMsat,
MaxHTLC: msg.HtlcMaximumMsat,
FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee),
FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate),
ExtraOpaqueData: msg.ExtraOpaqueData,
}
if err := d.cfg.Router.UpdateEdge(update); err != nil {
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
log.Debug(err)
} else {
d.rejectMtx.Lock()
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{}
d.rejectMtx.Unlock()
log.Error(err)
}
nMsg.err <- err
return nil
}
// If this is a local ChannelUpdate without an AuthProof, it
// means it is an update to a channel that is not (yet)
// supposed to be announced to the greater network. However,
// our channel counter party will need to be given the update,
// so we'll try sending the update directly to the remote peer.
if !nMsg.isRemote && chanInfo.AuthProof == nil {
// Get our peer's public key.
remotePubKey := remotePubFromChanInfo(
chanInfo, msg.ChannelFlags,
)
// Now, we'll attempt to send the channel update message
// reliably to the remote peer in the background, so
// that we don't block if the peer happens to be offline
// at the moment.
err := d.reliableSender.sendMessage(msg, remotePubKey)
if err != nil {
err := fmt.Errorf("unable to reliably send %v "+
"for channel=%v to peer=%x: %v",
msg.MsgType(), msg.ShortChannelID,
remotePubKey, err)
nMsg.err <- err
return nil
}
}
// Channel update announcement was successfully processed and
// now it can be broadcast to the rest of the network. However,
// we'll only broadcast the channel update announcement if it
// has an attached authentication proof.
if chanInfo.AuthProof != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: msg,
})
}
nMsg.err <- nil
return announcements
// A new signature announcement has been received. This indicates
// willingness of nodes involved in the funding of a channel to
// announce this new channel to the rest of the world.
case *lnwire.AnnounceSignatures:
needBlockHeight := msg.ShortChannelID.BlockHeight +
d.cfg.ProofMatureDelta
shortChanID := msg.ShortChannelID.ToUint64()
prefix := "local"
if nMsg.isRemote {
prefix = "remote"
}
log.Infof("Received new %v channel announcement: %v", prefix,
spew.Sdump(msg))
// By the specification, channel announcement proofs should be
// sent after some number of confirmations after channel was
// registered in bitcoin blockchain. Therefore, we check if the
// proof is premature. If so we'll halt processing until the
// expected announcement height. This allows us to be tolerant
// to other clients if this constraint was changed.
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
d.Lock()
d.prematureAnnouncements[needBlockHeight] = append(
d.prematureAnnouncements[needBlockHeight],
nMsg,
)
d.Unlock()
log.Infof("Premature proof announcement, "+
"current block height lower than needed: %v <"+
" %v, add announcement to reprocessing batch",
atomic.LoadUint32(&d.bestHeight), needBlockHeight)
return nil
}
// Ensure that we know of a channel with the target channel ID
// before proceeding further.
//
// We must acquire the mutex for this channel ID before getting
// the channel from the database, to ensure what we read does
// not change before we call AddProof() later.
d.channelMtx.Lock(msg.ShortChannelID.ToUint64())
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64())
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID)
if err != nil {
// TODO(andrew.shvv) this is dangerous because remote
// node might rewrite the waiting proof.
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
err := d.cfg.WaitingProofStore.Add(proof)
if err != nil {
err := fmt.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
log.Infof("Orphan %v proof announcement with "+
"short_chan_id=%v, adding"+
"to waiting batch", prefix, shortChanID)
nMsg.err <- nil
return nil
}
nodeID := nMsg.source.SerializeCompressed()
isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:])
isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:])
// Ensure that channel that was retrieved belongs to the peer
// which sent the proof announcement.
if !(isFirstNode || isSecondNode) {
err := fmt.Errorf("channel that was received not "+
"belongs to the peer which sent the proof, "+
"short_chan_id=%v", shortChanID)
log.Error(err)
nMsg.err <- err
return nil
}
// If proof was sent by a local sub-system, then we'll
// send the announcement signature to the remote node
// so they can also reconstruct the full channel
// announcement.
if !nMsg.isRemote {
var remotePubKey [33]byte
if isFirstNode {
remotePubKey = chanInfo.NodeKey2Bytes
} else {
remotePubKey = chanInfo.NodeKey1Bytes
}
// Since the remote peer might not be online
// we'll call a method that will attempt to
// deliver the proof when it comes online.
err := d.reliableSender.sendMessage(msg, remotePubKey)
if err != nil {
err := fmt.Errorf("unable to reliably send %v "+
"for channel=%v to peer=%x: %v",
msg.MsgType(), msg.ShortChannelID,
remotePubKey, err)
nMsg.err <- err
return nil
}
}
// Check if we already have the full proof for this channel.
if chanInfo.AuthProof != nil {
// If we already have the fully assembled proof, then
// the peer sending us their proof has probably not
// received our local proof yet. So be kind and send
// them the full proof.
if nMsg.isRemote {
peerID := nMsg.source.SerializeCompressed()
log.Debugf("Got AnnounceSignatures for " +
"channel with full proof.")
d.wg.Add(1)
go func() {
defer d.wg.Done()
log.Debugf("Received half proof for "+
"channel %v with existing "+
"full proof. Sending full "+
"proof to peer=%x",
msg.ChannelID,
peerID)
chanAnn, _, _, err := CreateChanAnnouncement(
chanInfo.AuthProof, chanInfo,
e1, e2,
)
if err != nil {
log.Errorf("unable to gen "+
"ann: %v", err)
return
}
err = nMsg.peer.SendMessage(
false, chanAnn,
)
if err != nil {
log.Errorf("Failed sending "+
"full proof to "+
"peer=%x: %v",
peerID, err)
return
}
log.Debugf("Full proof sent to peer=%x"+
" for chanID=%v", peerID,
msg.ChannelID)
}()
}
log.Debugf("Already have proof for channel "+
"with chanID=%v", msg.ChannelID)
nMsg.err <- nil
return nil
}
// Check that we received the opposite proof. If so, then we're
// now able to construct the full proof, and create the channel
// announcement. If we didn't receive the opposite half of the
// proof than we should store it this one, and wait for
// opposite to be received.
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg)
oppositeProof, err := d.cfg.WaitingProofStore.Get(
proof.OppositeKey(),
)
if err != nil && err != channeldb.ErrWaitingProofNotFound {
err := fmt.Errorf("unable to get "+
"the opposite proof for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
if err == channeldb.ErrWaitingProofNotFound {
err := d.cfg.WaitingProofStore.Add(proof)
if err != nil {
err := fmt.Errorf("unable to store "+
"the proof for short_chan_id=%v: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
log.Infof("1/2 of channel ann proof received for "+
"short_chan_id=%v, waiting for other half",
shortChanID)
nMsg.err <- nil
return nil
}
// We now have both halves of the channel announcement proof,
// then we'll reconstruct the initial announcement so we can
// validate it shortly below.
var dbProof channeldb.ChannelAuthProof
if isFirstNode {
dbProof.NodeSig1Bytes = msg.NodeSignature.ToSignatureBytes()
dbProof.NodeSig2Bytes = oppositeProof.NodeSignature.ToSignatureBytes()
dbProof.BitcoinSig1Bytes = msg.BitcoinSignature.ToSignatureBytes()
dbProof.BitcoinSig2Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes()
} else {
dbProof.NodeSig1Bytes = oppositeProof.NodeSignature.ToSignatureBytes()
dbProof.NodeSig2Bytes = msg.NodeSignature.ToSignatureBytes()
dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes()
dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes()
}
chanAnn, e1Ann, e2Ann, err := CreateChanAnnouncement(
&dbProof, chanInfo, e1, e2,
)
if err != nil {
log.Error(err)
nMsg.err <- err
return nil
}
// With all the necessary components assembled validate the
// full channel announcement proof.
if err := routing.ValidateChannelAnn(chanAnn); err != nil {
err := fmt.Errorf("channel announcement proof "+
"for short_chan_id=%v isn't valid: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
}
// If the channel was returned by the router it means that
// existence of funding point and inclusion of nodes bitcoin
// keys in it already checked by the router. In this stage we
// should check that node keys are attest to the bitcoin keys
// by validating the signatures of announcement. If proof is
// valid then we'll populate the channel edge with it, so we
// can announce it on peer connect.
err = d.cfg.Router.AddProof(msg.ShortChannelID, &dbProof)
if err != nil {
err := fmt.Errorf("unable add proof to the "+
"channel chanID=%v: %v", msg.ChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey())
if err != nil {
err := fmt.Errorf("unable remove opposite proof "+
"for the channel with chanID=%v: %v",
msg.ChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
// Proof was successfully created and now can announce the
// channel to the remain network.
log.Infof("Fully valid channel proof for short_chan_id=%v "+
"constructed, adding to next ann batch",
shortChanID)
// Assemble the necessary announcements to add to the next
// broadcasting batch.
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: chanAnn,
})
if e1Ann != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: e1Ann,
})
}
if e2Ann != nil {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: e2Ann,
})
}
// We'll also send along the node announcements for each channel
// participant if we know of them.
node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes)
if err != nil {
log.Debugf("Unable to fetch node announcement for "+
"%x: %v", chanInfo.NodeKey1Bytes, err)
} else {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: node1Ann,
})
}
node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes)
if err != nil {
log.Debugf("Unable to fetch node announcement for "+
"%x: %v", chanInfo.NodeKey2Bytes, err)
} else {
announcements = append(announcements, networkMsg{
peer: nMsg.peer,
source: nMsg.source,
msg: node2Ann,
})
}
nMsg.err <- nil
return announcements
default:
nMsg.err <- errors.New("wrong type of the announcement")
return nil
}
}
// fetchNodeAnn fetches the latest signed node announcement from our point of
// view for the node with the given public key.
func (d *AuthenticatedGossiper) fetchNodeAnn(
pubKey [33]byte) (*lnwire.NodeAnnouncement, error) {
node, err := d.cfg.Router.FetchLightningNode(pubKey)
if err != nil {
return nil, err
}
return node.NodeAnnouncement(true)
}
// isMsgStale determines whether a message retrieved from the backing
// MessageStore is seen as stale by the current graph.
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool {
switch msg := msg.(type) {
case *lnwire.AnnounceSignatures:
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(
msg.ShortChannelID,
)
// If the channel cannot be found, it is most likely a leftover
// message for a channel that was closed, so we can consider it
// stale.
if err == channeldb.ErrEdgeNotFound {
return true
}
if err != nil {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", err)
return false
}
// If the proof exists in the graph, then we have successfully
// received the remote proof and assembled the full proof, so we
// can safely delete the local proof from the database.
return chanInfo.AuthProof != nil
case *lnwire.ChannelUpdate:
_, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
// If the channel cannot be found, it is most likely a leftover
// message for a channel that was closed, so we can consider it
// stale.
if err == channeldb.ErrEdgeNotFound {
return true
}
if err != nil {
log.Debugf("Unable to retrieve channel=%v from graph: "+
"%v", msg.ShortChannelID, err)
return false
}
// Otherwise, we'll retrieve the correct policy that we
// currently have stored within our graph to check if this
// message is stale by comparing its timestamp.
var p *channeldb.ChannelEdgePolicy
if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 {
p = p1
} else {
p = p2
}
// If the policy is still unknown, then we can consider this
// policy fresh.
if p == nil {
return false
}
timestamp := time.Unix(int64(msg.Timestamp), 0)
return p.LastUpdate.After(timestamp)
default:
// We'll make sure to not mark any unsupported messages as stale
// to ensure they are not removed.
return false
}
}
// updateChannel creates a new fully signed update for the channel, and updates
// the underlying graph with the new state.
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo,
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdate, error) {
// We'll make sure we support the new max_htlc field if not already
// present.
if !edge.MessageFlags.HasMaxHtlc() {
edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc
edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity)
}
// Make sure timestamp is always increased, such that our update gets
// propagated.
timestamp := time.Now().Unix()
if timestamp <= edge.LastUpdate.Unix() {
timestamp = edge.LastUpdate.Unix() + 1
}
edge.LastUpdate = time.Unix(timestamp, 0)
chanUpdate := &lnwire.ChannelUpdate{
ChainHash: info.ChainHash,
ShortChannelID: lnwire.NewShortChanIDFromInt(edge.ChannelID),
Timestamp: uint32(timestamp),
MessageFlags: edge.MessageFlags,
ChannelFlags: edge.ChannelFlags,
TimeLockDelta: edge.TimeLockDelta,
HtlcMinimumMsat: edge.MinHTLC,
HtlcMaximumMsat: edge.MaxHTLC,
BaseFee: uint32(edge.FeeBaseMSat),
FeeRate: uint32(edge.FeeProportionalMillionths),
ExtraOpaqueData: edge.ExtraOpaqueData,
}
var err error
chanUpdate.Signature, err = lnwire.NewSigFromRawSignature(edge.SigBytes)
if err != nil {
return nil, nil, err
}
// With the update applied, we'll generate a new signature over a
// digest of the channel announcement itself.
sig, err := SignAnnouncement(d.cfg.AnnSigner, d.selfKey, chanUpdate)
if err != nil {
return nil, nil, err
}
// Next, we'll set the new signature in place, and update the reference
// in the backing slice.
edge.SigBytes = sig.Serialize()
chanUpdate.Signature, err = lnwire.NewSigFromSignature(sig)
if err != nil {
return nil, nil, err
}
// To ensure that our signature is valid, we'll verify it ourself
// before committing it to the slice returned.
err = routing.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate)
if err != nil {
return nil, nil, fmt.Errorf("generated invalid channel "+
"update sig: %v", err)
}
// Finally, we'll write the new edge policy to disk.
if err := d.cfg.Router.UpdateEdge(edge); err != nil {
return nil, nil, err
}
// We'll also create the original channel announcement so the two can
// be broadcast along side each other (if necessary), but only if we
// have a full channel announcement for this channel.
var chanAnn *lnwire.ChannelAnnouncement
if info.AuthProof != nil {
chanID := lnwire.NewShortChanIDFromInt(info.ChannelID)
chanAnn = &lnwire.ChannelAnnouncement{
ShortChannelID: chanID,
NodeID1: info.NodeKey1Bytes,
NodeID2: info.NodeKey2Bytes,
ChainHash: info.ChainHash,
BitcoinKey1: info.BitcoinKey1Bytes,
Features: lnwire.NewRawFeatureVector(),
BitcoinKey2: info.BitcoinKey2Bytes,
ExtraOpaqueData: edge.ExtraOpaqueData,
}
chanAnn.NodeSig1, err = lnwire.NewSigFromRawSignature(
info.AuthProof.NodeSig1Bytes,
)
if err != nil {
return nil, nil, err
}
chanAnn.NodeSig2, err = lnwire.NewSigFromRawSignature(
info.AuthProof.NodeSig2Bytes,
)
if err != nil {
return nil, nil, err
}
chanAnn.BitcoinSig1, err = lnwire.NewSigFromRawSignature(
info.AuthProof.BitcoinSig1Bytes,
)
if err != nil {
return nil, nil, err
}
chanAnn.BitcoinSig2, err = lnwire.NewSigFromRawSignature(
info.AuthProof.BitcoinSig2Bytes,
)
if err != nil {
return nil, nil, err
}
}
return chanAnn, chanUpdate, err
}
// SyncManager returns the gossiper's SyncManager instance.
func (d *AuthenticatedGossiper) SyncManager() *SyncManager {
return d.syncMgr
}