You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
2691 lines
86 KiB
2691 lines
86 KiB
package discovery |
|
|
|
import ( |
|
"bytes" |
|
"errors" |
|
"fmt" |
|
"sync" |
|
"time" |
|
|
|
"github.com/btcsuite/btcd/btcec" |
|
"github.com/btcsuite/btcd/chaincfg/chainhash" |
|
"github.com/btcsuite/btcd/wire" |
|
"github.com/btcsuite/btcutil" |
|
"github.com/davecgh/go-spew/spew" |
|
"github.com/lightningnetwork/lnd/batch" |
|
"github.com/lightningnetwork/lnd/chainntnfs" |
|
"github.com/lightningnetwork/lnd/channeldb" |
|
"github.com/lightningnetwork/lnd/lnpeer" |
|
"github.com/lightningnetwork/lnd/lnwallet" |
|
"github.com/lightningnetwork/lnd/lnwire" |
|
"github.com/lightningnetwork/lnd/multimutex" |
|
"github.com/lightningnetwork/lnd/netann" |
|
"github.com/lightningnetwork/lnd/routing" |
|
"github.com/lightningnetwork/lnd/routing/route" |
|
"github.com/lightningnetwork/lnd/ticker" |
|
"golang.org/x/time/rate" |
|
) |
|
|
|
const ( |
|
// DefaultMaxChannelUpdateBurst is the default maximum number of updates |
|
// for a specific channel and direction that we'll accept over an |
|
// interval. |
|
DefaultMaxChannelUpdateBurst = 10 |
|
|
|
// DefaultChannelUpdateInterval is the default interval we'll use to |
|
// determine how often we should allow a new update for a specific |
|
// channel and direction. |
|
DefaultChannelUpdateInterval = time.Minute |
|
) |
|
|
|
var ( |
|
// ErrGossiperShuttingDown is an error that is returned if the gossiper |
|
// is in the process of being shut down. |
|
ErrGossiperShuttingDown = errors.New("gossiper is shutting down") |
|
|
|
// ErrGossipSyncerNotFound signals that we were unable to find an active |
|
// gossip syncer corresponding to a gossip query message received from |
|
// the remote peer. |
|
ErrGossipSyncerNotFound = errors.New("gossip syncer not found") |
|
|
|
// emptyPubkey is used to compare compressed pubkeys against an empty |
|
// byte array. |
|
emptyPubkey [33]byte |
|
) |
|
|
|
// optionalMsgFields is a set of optional message fields that external callers |
|
// can provide that serve useful when processing a specific network |
|
// announcement. |
|
type optionalMsgFields struct { |
|
capacity *btcutil.Amount |
|
channelPoint *wire.OutPoint |
|
} |
|
|
|
// apply applies the optional fields within the functional options. |
|
func (f *optionalMsgFields) apply(optionalMsgFields ...OptionalMsgField) { |
|
for _, optionalMsgField := range optionalMsgFields { |
|
optionalMsgField(f) |
|
} |
|
} |
|
|
|
// OptionalMsgField is a functional option parameter that can be used to provide |
|
// external information that is not included within a network message but serves |
|
// useful when processing it. |
|
type OptionalMsgField func(*optionalMsgFields) |
|
|
|
// ChannelCapacity is an optional field that lets the gossiper know of the |
|
// capacity of a channel. |
|
func ChannelCapacity(capacity btcutil.Amount) OptionalMsgField { |
|
return func(f *optionalMsgFields) { |
|
f.capacity = &capacity |
|
} |
|
} |
|
|
|
// ChannelPoint is an optional field that lets the gossiper know of the outpoint |
|
// of a channel. |
|
func ChannelPoint(op wire.OutPoint) OptionalMsgField { |
|
return func(f *optionalMsgFields) { |
|
f.channelPoint = &op |
|
} |
|
} |
|
|
|
// networkMsg couples a routing related wire message with the peer that |
|
// originally sent it. |
|
type networkMsg struct { |
|
peer lnpeer.Peer |
|
source *btcec.PublicKey |
|
msg lnwire.Message |
|
optionalMsgFields *optionalMsgFields |
|
|
|
isRemote bool |
|
|
|
err chan error |
|
} |
|
|
|
// chanPolicyUpdateRequest is a request that is sent to the server when a caller |
|
// wishes to update a particular set of channels. New ChannelUpdate messages |
|
// will be crafted to be sent out during the next broadcast epoch and the fee |
|
// updates committed to the lower layer. |
|
type chanPolicyUpdateRequest struct { |
|
edgesToUpdate []EdgeWithInfo |
|
errChan chan error |
|
} |
|
|
|
// PinnedSyncers is a set of node pubkeys for which we will maintain an active |
|
// syncer at all times. |
|
type PinnedSyncers map[route.Vertex]struct{} |
|
|
|
// Config defines the configuration for the service. ALL elements within the |
|
// configuration MUST be non-nil for the service to carry out its duties. |
|
type Config struct { |
|
// ChainHash is a hash that indicates which resident chain of the |
|
// AuthenticatedGossiper. Any announcements that don't match this |
|
// chain hash will be ignored. |
|
// |
|
// TODO(roasbeef): eventually make into map so can de-multiplex |
|
// incoming announcements |
|
// * also need to do same for Notifier |
|
ChainHash chainhash.Hash |
|
|
|
// Router is the subsystem which is responsible for managing the |
|
// topology of lightning network. After incoming channel, node, channel |
|
// updates announcements are validated they are sent to the router in |
|
// order to be included in the LN graph. |
|
Router routing.ChannelGraphSource |
|
|
|
// ChanSeries is an interfaces that provides access to a time series |
|
// view of the current known channel graph. Each GossipSyncer enabled |
|
// peer will utilize this in order to create and respond to channel |
|
// graph time series queries. |
|
ChanSeries ChannelGraphTimeSeries |
|
|
|
// Notifier is used for receiving notifications of incoming blocks. |
|
// With each new incoming block found we process previously premature |
|
// announcements. |
|
// |
|
// TODO(roasbeef): could possibly just replace this with an epoch |
|
// channel. |
|
Notifier chainntnfs.ChainNotifier |
|
|
|
// Broadcast broadcasts a particular set of announcements to all peers |
|
// that the daemon is connected to. If supplied, the exclude parameter |
|
// indicates that the target peer should be excluded from the |
|
// broadcast. |
|
Broadcast func(skips map[route.Vertex]struct{}, |
|
msg ...lnwire.Message) error |
|
|
|
// NotifyWhenOnline is a function that allows the gossiper to be |
|
// notified when a certain peer comes online, allowing it to |
|
// retry sending a peer message. |
|
// |
|
// NOTE: The peerChan channel must be buffered. |
|
NotifyWhenOnline func(peerPubKey [33]byte, peerChan chan<- lnpeer.Peer) |
|
|
|
// NotifyWhenOffline is a function that allows the gossiper to be |
|
// notified when a certain peer disconnects, allowing it to request a |
|
// notification for when it reconnects. |
|
NotifyWhenOffline func(peerPubKey [33]byte) <-chan struct{} |
|
|
|
// SelfNodeAnnouncement is a function that fetches our own current node |
|
// announcement, for use when determining whether we should update our |
|
// peers about our presence on the network. If the refresh is true, a |
|
// new and updated announcement will be returned. |
|
SelfNodeAnnouncement func(refresh bool) (lnwire.NodeAnnouncement, error) |
|
|
|
// ProofMatureDelta the number of confirmations which is needed before |
|
// exchange the channel announcement proofs. |
|
ProofMatureDelta uint32 |
|
|
|
// TrickleDelay the period of trickle timer which flushes to the |
|
// network the pending batch of new announcements we've received since |
|
// the last trickle tick. |
|
TrickleDelay time.Duration |
|
|
|
// RetransmitTicker is a ticker that ticks with a period which |
|
// indicates that we should check if we need re-broadcast any of our |
|
// personal channels. |
|
RetransmitTicker ticker.Ticker |
|
|
|
// RebroadcastInterval is the maximum time we wait between sending out |
|
// channel updates for our active channels and our own node |
|
// announcement. We do this to ensure our active presence on the |
|
// network is known, and we are not being considered a zombie node or |
|
// having zombie channels. |
|
RebroadcastInterval time.Duration |
|
|
|
// WaitingProofStore is a persistent storage of partial channel proof |
|
// announcement messages. We use it to buffer half of the material |
|
// needed to reconstruct a full authenticated channel announcement. |
|
// Once we receive the other half the channel proof, we'll be able to |
|
// properly validate it and re-broadcast it out to the network. |
|
// |
|
// TODO(wilmer): make interface to prevent channeldb dependency. |
|
WaitingProofStore *channeldb.WaitingProofStore |
|
|
|
// MessageStore is a persistent storage of gossip messages which we will |
|
// use to determine which messages need to be resent for a given peer. |
|
MessageStore GossipMessageStore |
|
|
|
// AnnSigner is an instance of the MessageSigner interface which will |
|
// be used to manually sign any outgoing channel updates. The signer |
|
// implementation should be backed by the public key of the backing |
|
// Lightning node. |
|
// |
|
// TODO(roasbeef): extract ann crafting + sign from fundingMgr into |
|
// here? |
|
AnnSigner lnwallet.MessageSigner |
|
|
|
// NumActiveSyncers is the number of peers for which we should have |
|
// active syncers with. After reaching NumActiveSyncers, any future |
|
// gossip syncers will be passive. |
|
NumActiveSyncers int |
|
|
|
// RotateTicker is a ticker responsible for notifying the SyncManager |
|
// when it should rotate its active syncers. A single active syncer with |
|
// a chansSynced state will be exchanged for a passive syncer in order |
|
// to ensure we don't keep syncing with the same peers. |
|
RotateTicker ticker.Ticker |
|
|
|
// HistoricalSyncTicker is a ticker responsible for notifying the |
|
// syncManager when it should attempt a historical sync with a gossip |
|
// sync peer. |
|
HistoricalSyncTicker ticker.Ticker |
|
|
|
// ActiveSyncerTimeoutTicker is a ticker responsible for notifying the |
|
// syncManager when it should attempt to start the next pending |
|
// activeSyncer due to the current one not completing its state machine |
|
// within the timeout. |
|
ActiveSyncerTimeoutTicker ticker.Ticker |
|
|
|
// MinimumBatchSize is minimum size of a sub batch of announcement |
|
// messages. |
|
MinimumBatchSize int |
|
|
|
// SubBatchDelay is the delay between sending sub batches of |
|
// gossip messages. |
|
SubBatchDelay time.Duration |
|
|
|
// IgnoreHistoricalFilters will prevent syncers from replying with |
|
// historical data when the remote peer sets a gossip_timestamp_range. |
|
// This prevents ranges with old start times from causing us to dump the |
|
// graph on connect. |
|
IgnoreHistoricalFilters bool |
|
|
|
// PinnedSyncers is a set of peers that will always transition to |
|
// ActiveSync upon connection. These peers will never transition to |
|
// PassiveSync. |
|
PinnedSyncers PinnedSyncers |
|
|
|
// MaxChannelUpdateBurst specifies the maximum number of updates for a |
|
// specific channel and direction that we'll accept over an interval. |
|
MaxChannelUpdateBurst int |
|
|
|
// ChannelUpdateInterval specifies the interval we'll use to determine |
|
// how often we should allow a new update for a specific channel and |
|
// direction. |
|
ChannelUpdateInterval time.Duration |
|
} |
|
|
|
// AuthenticatedGossiper is a subsystem which is responsible for receiving |
|
// announcements, validating them and applying the changes to router, syncing |
|
// lightning network with newly connected nodes, broadcasting announcements |
|
// after validation, negotiating the channel announcement proofs exchange and |
|
// handling the premature announcements. All outgoing announcements are |
|
// expected to be properly signed as dictated in BOLT#7, additionally, all |
|
// incoming message are expected to be well formed and signed. Invalid messages |
|
// will be rejected by this struct. |
|
type AuthenticatedGossiper struct { |
|
// Parameters which are needed to properly handle the start and stop of |
|
// the service. |
|
started sync.Once |
|
stopped sync.Once |
|
|
|
// bestHeight is the height of the block at the tip of the main chain |
|
// as we know it. Accesses *MUST* be done with the gossiper's lock |
|
// held. |
|
bestHeight uint32 |
|
|
|
quit chan struct{} |
|
wg sync.WaitGroup |
|
|
|
// cfg is a copy of the configuration struct that the gossiper service |
|
// was initialized with. |
|
cfg *Config |
|
|
|
// blockEpochs encapsulates a stream of block epochs that are sent at |
|
// every new block height. |
|
blockEpochs *chainntnfs.BlockEpochEvent |
|
|
|
// prematureChannelUpdates is a map of ChannelUpdates we have received |
|
// that wasn't associated with any channel we know about. We store |
|
// them temporarily, such that we can reprocess them when a |
|
// ChannelAnnouncement for the channel is received. |
|
prematureChannelUpdates map[uint64][]*networkMsg |
|
pChanUpdMtx sync.Mutex |
|
|
|
// networkMsgs is a channel that carries new network broadcasted |
|
// message from outside the gossiper service to be processed by the |
|
// networkHandler. |
|
networkMsgs chan *networkMsg |
|
|
|
// chanPolicyUpdates is a channel that requests to update the |
|
// forwarding policy of a set of channels is sent over. |
|
chanPolicyUpdates chan *chanPolicyUpdateRequest |
|
|
|
// selfKey is the identity public key of the backing Lightning node. |
|
selfKey *btcec.PublicKey |
|
|
|
// channelMtx is used to restrict the database access to one |
|
// goroutine per channel ID. This is done to ensure that when |
|
// the gossiper is handling an announcement, the db state stays |
|
// consistent between when the DB is first read until it's written. |
|
channelMtx *multimutex.Mutex |
|
|
|
rejectMtx sync.RWMutex |
|
recentRejects map[uint64]struct{} |
|
|
|
// syncMgr is a subsystem responsible for managing the gossip syncers |
|
// for peers currently connected. When a new peer is connected, the |
|
// manager will create its accompanying gossip syncer and determine |
|
// whether it should have an activeSync or passiveSync sync type based |
|
// on how many other gossip syncers are currently active. Any activeSync |
|
// gossip syncers are started in a round-robin manner to ensure we're |
|
// not syncing with multiple peers at the same time. |
|
syncMgr *SyncManager |
|
|
|
// reliableSender is a subsystem responsible for handling reliable |
|
// message send requests to peers. This should only be used for channels |
|
// that are unadvertised at the time of handling the message since if it |
|
// is advertised, then peers should be able to get the message from the |
|
// network. |
|
reliableSender *reliableSender |
|
|
|
// chanUpdateRateLimiter contains rate limiters for each direction of |
|
// a channel update we've processed. We'll use these to determine |
|
// whether we should accept a new update for a specific channel and |
|
// direction. |
|
// |
|
// NOTE: This map must be synchronized with the main |
|
// AuthenticatedGossiper lock. |
|
chanUpdateRateLimiter map[uint64][2]*rate.Limiter |
|
|
|
sync.Mutex |
|
} |
|
|
|
// New creates a new AuthenticatedGossiper instance, initialized with the |
|
// passed configuration parameters. |
|
func New(cfg Config, selfKey *btcec.PublicKey) *AuthenticatedGossiper { |
|
gossiper := &AuthenticatedGossiper{ |
|
selfKey: selfKey, |
|
cfg: &cfg, |
|
networkMsgs: make(chan *networkMsg), |
|
quit: make(chan struct{}), |
|
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest), |
|
prematureChannelUpdates: make(map[uint64][]*networkMsg), |
|
channelMtx: multimutex.NewMutex(), |
|
recentRejects: make(map[uint64]struct{}), |
|
chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter), |
|
} |
|
|
|
gossiper.syncMgr = newSyncManager(&SyncManagerCfg{ |
|
ChainHash: cfg.ChainHash, |
|
ChanSeries: cfg.ChanSeries, |
|
RotateTicker: cfg.RotateTicker, |
|
HistoricalSyncTicker: cfg.HistoricalSyncTicker, |
|
NumActiveSyncers: cfg.NumActiveSyncers, |
|
IgnoreHistoricalFilters: cfg.IgnoreHistoricalFilters, |
|
BestHeight: gossiper.latestHeight, |
|
PinnedSyncers: cfg.PinnedSyncers, |
|
}) |
|
|
|
gossiper.reliableSender = newReliableSender(&reliableSenderCfg{ |
|
NotifyWhenOnline: cfg.NotifyWhenOnline, |
|
NotifyWhenOffline: cfg.NotifyWhenOffline, |
|
MessageStore: cfg.MessageStore, |
|
IsMsgStale: gossiper.isMsgStale, |
|
}) |
|
|
|
return gossiper |
|
} |
|
|
|
// EdgeWithInfo contains the information that is required to update an edge. |
|
type EdgeWithInfo struct { |
|
// Info describes the channel. |
|
Info *channeldb.ChannelEdgeInfo |
|
|
|
// Edge describes the policy in one direction of the channel. |
|
Edge *channeldb.ChannelEdgePolicy |
|
} |
|
|
|
// PropagateChanPolicyUpdate signals the AuthenticatedGossiper to perform the |
|
// specified edge updates. Updates are done in two stages: first, the |
|
// AuthenticatedGossiper ensures the update has been committed by dependent |
|
// sub-systems, then it signs and broadcasts new updates to the network. A |
|
// mapping between outpoints and updated channel policies is returned, which is |
|
// used to update the forwarding policies of the underlying links. |
|
func (d *AuthenticatedGossiper) PropagateChanPolicyUpdate( |
|
edgesToUpdate []EdgeWithInfo) error { |
|
|
|
errChan := make(chan error, 1) |
|
policyUpdate := &chanPolicyUpdateRequest{ |
|
edgesToUpdate: edgesToUpdate, |
|
errChan: errChan, |
|
} |
|
|
|
select { |
|
case d.chanPolicyUpdates <- policyUpdate: |
|
err := <-errChan |
|
return err |
|
case <-d.quit: |
|
return fmt.Errorf("AuthenticatedGossiper shutting down") |
|
} |
|
} |
|
|
|
// Start spawns network messages handler goroutine and registers on new block |
|
// notifications in order to properly handle the premature announcements. |
|
func (d *AuthenticatedGossiper) Start() error { |
|
var err error |
|
d.started.Do(func() { |
|
err = d.start() |
|
}) |
|
return err |
|
} |
|
|
|
func (d *AuthenticatedGossiper) start() error { |
|
log.Info("Authenticated Gossiper is starting") |
|
|
|
// First we register for new notifications of newly discovered blocks. |
|
// We do this immediately so we'll later be able to consume any/all |
|
// blocks which were discovered. |
|
blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn(nil) |
|
if err != nil { |
|
return err |
|
} |
|
d.blockEpochs = blockEpochs |
|
|
|
height, err := d.cfg.Router.CurrentBlockHeight() |
|
if err != nil { |
|
return err |
|
} |
|
d.bestHeight = height |
|
|
|
// Start the reliable sender. In case we had any pending messages ready |
|
// to be sent when the gossiper was last shut down, we must continue on |
|
// our quest to deliver them to their respective peers. |
|
if err := d.reliableSender.Start(); err != nil { |
|
return err |
|
} |
|
|
|
d.syncMgr.Start() |
|
|
|
d.wg.Add(1) |
|
go d.networkHandler() |
|
|
|
return nil |
|
} |
|
|
|
// Stop signals any active goroutines for a graceful closure. |
|
func (d *AuthenticatedGossiper) Stop() error { |
|
d.stopped.Do(d.stop) |
|
return nil |
|
} |
|
|
|
func (d *AuthenticatedGossiper) stop() { |
|
log.Info("Authenticated Gossiper is stopping") |
|
|
|
d.blockEpochs.Cancel() |
|
|
|
d.syncMgr.Stop() |
|
|
|
close(d.quit) |
|
d.wg.Wait() |
|
|
|
// We'll stop our reliable sender after all of the gossiper's goroutines |
|
// have exited to ensure nothing can cause it to continue executing. |
|
d.reliableSender.Stop() |
|
} |
|
|
|
// TODO(roasbeef): need method to get current gossip timestamp? |
|
// * using mtx, check time rotate forward is needed? |
|
|
|
// ProcessRemoteAnnouncement sends a new remote announcement message along with |
|
// the peer that sent the routing message. The announcement will be processed |
|
// then added to a queue for batched trickled announcement to all connected |
|
// peers. Remote channel announcements should contain the announcement proof |
|
// and be fully validated. |
|
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, |
|
peer lnpeer.Peer) chan error { |
|
|
|
errChan := make(chan error, 1) |
|
|
|
// For messages in the known set of channel series queries, we'll |
|
// dispatch the message directly to the GossipSyncer, and skip the main |
|
// processing loop. |
|
switch m := msg.(type) { |
|
case *lnwire.QueryShortChanIDs, |
|
*lnwire.QueryChannelRange, |
|
*lnwire.ReplyChannelRange, |
|
*lnwire.ReplyShortChanIDsEnd: |
|
|
|
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey()) |
|
if !ok { |
|
log.Warnf("Gossip syncer for peer=%x not found", |
|
peer.PubKey()) |
|
|
|
errChan <- ErrGossipSyncerNotFound |
|
return errChan |
|
} |
|
|
|
// If we've found the message target, then we'll dispatch the |
|
// message directly to it. |
|
syncer.ProcessQueryMsg(m, peer.QuitSignal()) |
|
|
|
errChan <- nil |
|
return errChan |
|
|
|
// If a peer is updating its current update horizon, then we'll dispatch |
|
// that directly to the proper GossipSyncer. |
|
case *lnwire.GossipTimestampRange: |
|
syncer, ok := d.syncMgr.GossipSyncer(peer.PubKey()) |
|
if !ok { |
|
log.Warnf("Gossip syncer for peer=%x not found", |
|
peer.PubKey()) |
|
|
|
errChan <- ErrGossipSyncerNotFound |
|
return errChan |
|
} |
|
|
|
// If we've found the message target, then we'll dispatch the |
|
// message directly to it. |
|
if err := syncer.ApplyGossipFilter(m); err != nil { |
|
log.Warnf("Unable to apply gossip filter for peer=%x: "+ |
|
"%v", peer.PubKey(), err) |
|
|
|
errChan <- err |
|
return errChan |
|
} |
|
|
|
errChan <- nil |
|
return errChan |
|
|
|
// To avoid inserting edges in the graph for our own channels that we |
|
// have already closed, we ignore such channel announcements coming |
|
// from the remote. |
|
case *lnwire.ChannelAnnouncement: |
|
ownKey := d.selfKey.SerializeCompressed() |
|
ownErr := fmt.Errorf("ignoring remote ChannelAnnouncement " + |
|
"for own channel") |
|
|
|
if bytes.Equal(m.NodeID1[:], ownKey) || |
|
bytes.Equal(m.NodeID2[:], ownKey) { |
|
|
|
log.Warn(ownErr) |
|
errChan <- ownErr |
|
return errChan |
|
} |
|
} |
|
|
|
nMsg := &networkMsg{ |
|
msg: msg, |
|
isRemote: true, |
|
peer: peer, |
|
source: peer.IdentityKey(), |
|
err: errChan, |
|
} |
|
|
|
select { |
|
case d.networkMsgs <- nMsg: |
|
|
|
// If the peer that sent us this error is quitting, then we don't need |
|
// to send back an error and can return immediately. |
|
case <-peer.QuitSignal(): |
|
return nil |
|
case <-d.quit: |
|
nMsg.err <- ErrGossiperShuttingDown |
|
} |
|
|
|
return nMsg.err |
|
} |
|
|
|
// ProcessLocalAnnouncement sends a new remote announcement message along with |
|
// the peer that sent the routing message. The announcement will be processed |
|
// then added to a queue for batched trickled announcement to all connected |
|
// peers. Local channel announcements don't contain the announcement proof and |
|
// will not be fully validated. Once the channel proofs are received, the |
|
// entire channel announcement and update messages will be re-constructed and |
|
// broadcast to the rest of the network. |
|
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message, |
|
optionalFields ...OptionalMsgField) chan error { |
|
|
|
optionalMsgFields := &optionalMsgFields{} |
|
optionalMsgFields.apply(optionalFields...) |
|
|
|
nMsg := &networkMsg{ |
|
msg: msg, |
|
optionalMsgFields: optionalMsgFields, |
|
isRemote: false, |
|
source: d.selfKey, |
|
err: make(chan error, 1), |
|
} |
|
|
|
select { |
|
case d.networkMsgs <- nMsg: |
|
case <-d.quit: |
|
nMsg.err <- ErrGossiperShuttingDown |
|
} |
|
|
|
return nMsg.err |
|
} |
|
|
|
// channelUpdateID is a unique identifier for ChannelUpdate messages, as |
|
// channel updates can be identified by the (ShortChannelID, ChannelFlags) |
|
// tuple. |
|
type channelUpdateID struct { |
|
// channelID represents the set of data which is needed to |
|
// retrieve all necessary data to validate the channel existence. |
|
channelID lnwire.ShortChannelID |
|
|
|
// Flags least-significant bit must be set to 0 if the creating node |
|
// corresponds to the first node in the previously sent channel |
|
// announcement and 1 otherwise. |
|
flags lnwire.ChanUpdateChanFlags |
|
} |
|
|
|
// msgWithSenders is a wrapper struct around a message, and the set of peers |
|
// that originally sent us this message. Using this struct, we can ensure that |
|
// we don't re-send a message to the peer that sent it to us in the first |
|
// place. |
|
type msgWithSenders struct { |
|
// msg is the wire message itself. |
|
msg lnwire.Message |
|
|
|
// sender is the set of peers that sent us this message. |
|
senders map[route.Vertex]struct{} |
|
} |
|
|
|
// mergeSyncerMap is used to merge the set of senders of a particular message |
|
// with peers that we have an active GossipSyncer with. We do this to ensure |
|
// that we don't broadcast messages to any peers that we have active gossip |
|
// syncers for. |
|
func (m *msgWithSenders) mergeSyncerMap(syncers map[route.Vertex]*GossipSyncer) { |
|
for peerPub := range syncers { |
|
m.senders[peerPub] = struct{}{} |
|
} |
|
} |
|
|
|
// deDupedAnnouncements de-duplicates announcements that have been added to the |
|
// batch. Internally, announcements are stored in three maps |
|
// (one each for channel announcements, channel updates, and node |
|
// announcements). These maps keep track of unique announcements and ensure no |
|
// announcements are duplicated. We keep the three message types separate, such |
|
// that we can send channel announcements first, then channel updates, and |
|
// finally node announcements when it's time to broadcast them. |
|
type deDupedAnnouncements struct { |
|
// channelAnnouncements are identified by the short channel id field. |
|
channelAnnouncements map[lnwire.ShortChannelID]msgWithSenders |
|
|
|
// channelUpdates are identified by the channel update id field. |
|
channelUpdates map[channelUpdateID]msgWithSenders |
|
|
|
// nodeAnnouncements are identified by the Vertex field. |
|
nodeAnnouncements map[route.Vertex]msgWithSenders |
|
|
|
sync.Mutex |
|
} |
|
|
|
// Reset operates on deDupedAnnouncements to reset the storage of |
|
// announcements. |
|
func (d *deDupedAnnouncements) Reset() { |
|
d.Lock() |
|
defer d.Unlock() |
|
|
|
d.reset() |
|
} |
|
|
|
// reset is the private version of the Reset method. We have this so we can |
|
// call this method within method that are already holding the lock. |
|
func (d *deDupedAnnouncements) reset() { |
|
// Storage of each type of announcement (channel announcements, channel |
|
// updates, node announcements) is set to an empty map where the |
|
// appropriate key points to the corresponding lnwire.Message. |
|
d.channelAnnouncements = make(map[lnwire.ShortChannelID]msgWithSenders) |
|
d.channelUpdates = make(map[channelUpdateID]msgWithSenders) |
|
d.nodeAnnouncements = make(map[route.Vertex]msgWithSenders) |
|
} |
|
|
|
// addMsg adds a new message to the current batch. If the message is already |
|
// present in the current batch, then this new instance replaces the latter, |
|
// and the set of senders is updated to reflect which node sent us this |
|
// message. |
|
func (d *deDupedAnnouncements) addMsg(message networkMsg) { |
|
// Depending on the message type (channel announcement, channel update, |
|
// or node announcement), the message is added to the corresponding map |
|
// in deDupedAnnouncements. Because each identifying key can have at |
|
// most one value, the announcements are de-duplicated, with newer ones |
|
// replacing older ones. |
|
switch msg := message.msg.(type) { |
|
|
|
// Channel announcements are identified by the short channel id field. |
|
case *lnwire.ChannelAnnouncement: |
|
deDupKey := msg.ShortChannelID |
|
sender := route.NewVertex(message.source) |
|
|
|
mws, ok := d.channelAnnouncements[deDupKey] |
|
if !ok { |
|
mws = msgWithSenders{ |
|
msg: msg, |
|
senders: make(map[route.Vertex]struct{}), |
|
} |
|
mws.senders[sender] = struct{}{} |
|
|
|
d.channelAnnouncements[deDupKey] = mws |
|
|
|
return |
|
} |
|
|
|
mws.msg = msg |
|
mws.senders[sender] = struct{}{} |
|
d.channelAnnouncements[deDupKey] = mws |
|
|
|
// Channel updates are identified by the (short channel id, |
|
// channelflags) tuple. |
|
case *lnwire.ChannelUpdate: |
|
sender := route.NewVertex(message.source) |
|
deDupKey := channelUpdateID{ |
|
msg.ShortChannelID, |
|
msg.ChannelFlags, |
|
} |
|
|
|
oldTimestamp := uint32(0) |
|
mws, ok := d.channelUpdates[deDupKey] |
|
if ok { |
|
// If we already have seen this message, record its |
|
// timestamp. |
|
oldTimestamp = mws.msg.(*lnwire.ChannelUpdate).Timestamp |
|
} |
|
|
|
// If we already had this message with a strictly newer |
|
// timestamp, then we'll just discard the message we got. |
|
if oldTimestamp > msg.Timestamp { |
|
return |
|
} |
|
|
|
// If the message we just got is newer than what we previously |
|
// have seen, or this is the first time we see it, then we'll |
|
// add it to our map of announcements. |
|
if oldTimestamp < msg.Timestamp { |
|
mws = msgWithSenders{ |
|
msg: msg, |
|
senders: make(map[route.Vertex]struct{}), |
|
} |
|
|
|
// We'll mark the sender of the message in the |
|
// senders map. |
|
mws.senders[sender] = struct{}{} |
|
|
|
d.channelUpdates[deDupKey] = mws |
|
|
|
return |
|
} |
|
|
|
// Lastly, if we had seen this exact message from before, with |
|
// the same timestamp, we'll add the sender to the map of |
|
// senders, such that we can skip sending this message back in |
|
// the next batch. |
|
mws.msg = msg |
|
mws.senders[sender] = struct{}{} |
|
d.channelUpdates[deDupKey] = mws |
|
|
|
// Node announcements are identified by the Vertex field. Use the |
|
// NodeID to create the corresponding Vertex. |
|
case *lnwire.NodeAnnouncement: |
|
sender := route.NewVertex(message.source) |
|
deDupKey := route.Vertex(msg.NodeID) |
|
|
|
// We do the same for node announcements as we did for channel |
|
// updates, as they also carry a timestamp. |
|
oldTimestamp := uint32(0) |
|
mws, ok := d.nodeAnnouncements[deDupKey] |
|
if ok { |
|
oldTimestamp = mws.msg.(*lnwire.NodeAnnouncement).Timestamp |
|
} |
|
|
|
// Discard the message if it's old. |
|
if oldTimestamp > msg.Timestamp { |
|
return |
|
} |
|
|
|
// Replace if it's newer. |
|
if oldTimestamp < msg.Timestamp { |
|
mws = msgWithSenders{ |
|
msg: msg, |
|
senders: make(map[route.Vertex]struct{}), |
|
} |
|
|
|
mws.senders[sender] = struct{}{} |
|
|
|
d.nodeAnnouncements[deDupKey] = mws |
|
|
|
return |
|
} |
|
|
|
// Add to senders map if it's the same as we had. |
|
mws.msg = msg |
|
mws.senders[sender] = struct{}{} |
|
d.nodeAnnouncements[deDupKey] = mws |
|
} |
|
} |
|
|
|
// AddMsgs is a helper method to add multiple messages to the announcement |
|
// batch. |
|
func (d *deDupedAnnouncements) AddMsgs(msgs ...networkMsg) { |
|
d.Lock() |
|
defer d.Unlock() |
|
|
|
for _, msg := range msgs { |
|
d.addMsg(msg) |
|
} |
|
} |
|
|
|
// Emit returns the set of de-duplicated announcements to be sent out during |
|
// the next announcement epoch, in the order of channel announcements, channel |
|
// updates, and node announcements. Each message emitted, contains the set of |
|
// peers that sent us the message. This way, we can ensure that we don't waste |
|
// bandwidth by re-sending a message to the peer that sent it to us in the |
|
// first place. Additionally, the set of stored messages are reset. |
|
func (d *deDupedAnnouncements) Emit() []msgWithSenders { |
|
d.Lock() |
|
defer d.Unlock() |
|
|
|
// Get the total number of announcements. |
|
numAnnouncements := len(d.channelAnnouncements) + len(d.channelUpdates) + |
|
len(d.nodeAnnouncements) |
|
|
|
// Create an empty array of lnwire.Messages with a length equal to |
|
// the total number of announcements. |
|
msgs := make([]msgWithSenders, 0, numAnnouncements) |
|
|
|
// Add the channel announcements to the array first. |
|
for _, message := range d.channelAnnouncements { |
|
msgs = append(msgs, message) |
|
} |
|
|
|
// Then add the channel updates. |
|
for _, message := range d.channelUpdates { |
|
msgs = append(msgs, message) |
|
} |
|
|
|
// Finally add the node announcements. |
|
for _, message := range d.nodeAnnouncements { |
|
msgs = append(msgs, message) |
|
} |
|
|
|
d.reset() |
|
|
|
// Return the array of lnwire.messages. |
|
return msgs |
|
} |
|
|
|
// calculateSubBatchSize is a helper function that calculates the size to break |
|
// down the batchSize into. |
|
func calculateSubBatchSize(totalDelay, subBatchDelay time.Duration, |
|
minimumBatchSize, batchSize int) int { |
|
if subBatchDelay > totalDelay { |
|
return batchSize |
|
} |
|
|
|
subBatchSize := (int(batchSize)*int(subBatchDelay) + int(totalDelay) - 1) / |
|
int(totalDelay) |
|
|
|
if subBatchSize < minimumBatchSize { |
|
return minimumBatchSize |
|
} |
|
|
|
return subBatchSize |
|
} |
|
|
|
// splitAnnouncementBatches takes an exiting list of announcements and |
|
// decomposes it into sub batches controlled by the `subBatchSize`. |
|
func splitAnnouncementBatches(subBatchSize int, |
|
announcementBatch []msgWithSenders) [][]msgWithSenders { |
|
var splitAnnouncementBatch [][]msgWithSenders |
|
|
|
for subBatchSize < len(announcementBatch) { |
|
// For slicing with minimal allocation |
|
// https://github.com/golang/go/wiki/SliceTricks |
|
announcementBatch, splitAnnouncementBatch = |
|
announcementBatch[subBatchSize:], |
|
append(splitAnnouncementBatch, |
|
announcementBatch[0:subBatchSize:subBatchSize]) |
|
} |
|
splitAnnouncementBatch = append(splitAnnouncementBatch, announcementBatch) |
|
|
|
return splitAnnouncementBatch |
|
} |
|
|
|
// sendBatch broadcasts a list of announcements to our peers. |
|
func (d *AuthenticatedGossiper) sendBatch(announcementBatch []msgWithSenders) { |
|
syncerPeers := d.syncMgr.GossipSyncers() |
|
|
|
// We'll first attempt to filter out this new message |
|
// for all peers that have active gossip syncers |
|
// active. |
|
for _, syncer := range syncerPeers { |
|
syncer.FilterGossipMsgs(announcementBatch...) |
|
} |
|
|
|
for _, msgChunk := range announcementBatch { |
|
// With the syncers taken care of, we'll merge |
|
// the sender map with the set of syncers, so |
|
// we don't send out duplicate messages. |
|
msgChunk.mergeSyncerMap(syncerPeers) |
|
|
|
err := d.cfg.Broadcast( |
|
msgChunk.senders, msgChunk.msg, |
|
) |
|
if err != nil { |
|
log.Errorf("Unable to send batch "+ |
|
"announcements: %v", err) |
|
continue |
|
} |
|
} |
|
} |
|
|
|
// networkHandler is the primary goroutine that drives this service. The roles |
|
// of this goroutine includes answering queries related to the state of the |
|
// network, syncing up newly connected peers, and also periodically |
|
// broadcasting our latest topology state to all connected peers. |
|
// |
|
// NOTE: This MUST be run as a goroutine. |
|
func (d *AuthenticatedGossiper) networkHandler() { |
|
defer d.wg.Done() |
|
|
|
// Initialize empty deDupedAnnouncements to store announcement batch. |
|
announcements := deDupedAnnouncements{} |
|
announcements.Reset() |
|
|
|
d.cfg.RetransmitTicker.Resume() |
|
defer d.cfg.RetransmitTicker.Stop() |
|
|
|
trickleTimer := time.NewTicker(d.cfg.TrickleDelay) |
|
defer trickleTimer.Stop() |
|
|
|
// To start, we'll first check to see if there are any stale channel or |
|
// node announcements that we need to re-transmit. |
|
if err := d.retransmitStaleAnns(time.Now()); err != nil { |
|
log.Errorf("Unable to rebroadcast stale announcements: %v", err) |
|
} |
|
|
|
// We'll use this validation to ensure that we process jobs in their |
|
// dependency order during parallel validation. |
|
validationBarrier := routing.NewValidationBarrier(1000, d.quit) |
|
|
|
for { |
|
select { |
|
// A new policy update has arrived. We'll commit it to the |
|
// sub-systems below us, then craft, sign, and broadcast a new |
|
// ChannelUpdate for the set of affected clients. |
|
case policyUpdate := <-d.chanPolicyUpdates: |
|
// First, we'll now create new fully signed updates for |
|
// the affected channels and also update the underlying |
|
// graph with the new state. |
|
newChanUpdates, err := d.processChanPolicyUpdate( |
|
policyUpdate.edgesToUpdate, |
|
) |
|
policyUpdate.errChan <- err |
|
if err != nil { |
|
log.Errorf("Unable to craft policy updates: %v", |
|
err) |
|
continue |
|
} |
|
|
|
// Finally, with the updates committed, we'll now add |
|
// them to the announcement batch to be flushed at the |
|
// start of the next epoch. |
|
announcements.AddMsgs(newChanUpdates...) |
|
|
|
case announcement := <-d.networkMsgs: |
|
// We should only broadcast this message forward if it |
|
// originated from us or it wasn't received as part of |
|
// our initial historical sync. |
|
shouldBroadcast := !announcement.isRemote || |
|
d.syncMgr.IsGraphSynced() |
|
|
|
switch announcement.msg.(type) { |
|
// Channel announcement signatures are amongst the only |
|
// messages that we'll process serially. |
|
case *lnwire.AnnounceSignatures: |
|
emittedAnnouncements, _ := d.processNetworkAnnouncement( |
|
announcement, |
|
) |
|
if emittedAnnouncements != nil { |
|
announcements.AddMsgs( |
|
emittedAnnouncements..., |
|
) |
|
} |
|
continue |
|
} |
|
|
|
// If this message was recently rejected, then we won't |
|
// attempt to re-process it. |
|
if d.isRecentlyRejectedMsg(announcement.msg) { |
|
announcement.err <- fmt.Errorf("recently " + |
|
"rejected") |
|
continue |
|
} |
|
|
|
// We'll set up any dependent, and wait until a free |
|
// slot for this job opens up, this allow us to not |
|
// have thousands of goroutines active. |
|
validationBarrier.InitJobDependencies(announcement.msg) |
|
|
|
d.wg.Add(1) |
|
go func() { |
|
defer d.wg.Done() |
|
defer validationBarrier.CompleteJob() |
|
|
|
// If this message has an existing dependency, |
|
// then we'll wait until that has been fully |
|
// validated before we proceed. |
|
err := validationBarrier.WaitForDependants( |
|
announcement.msg, |
|
) |
|
if err != nil { |
|
if err != routing.ErrVBarrierShuttingDown && |
|
err != routing.ErrParentValidationFailed { |
|
log.Warnf("unexpected error "+ |
|
"during validation "+ |
|
"barrier shutdown: %v", |
|
err) |
|
} |
|
announcement.err <- err |
|
return |
|
} |
|
|
|
// Process the network announcement to |
|
// determine if this is either a new |
|
// announcement from our PoV or an edges to a |
|
// prior vertex/edge we previously proceeded. |
|
emittedAnnouncements, allowDependents := d.processNetworkAnnouncement( |
|
announcement, |
|
) |
|
|
|
// If this message had any dependencies, then |
|
// we can now signal them to continue. |
|
validationBarrier.SignalDependants( |
|
announcement.msg, allowDependents, |
|
) |
|
|
|
// If the announcement was accepted, then add |
|
// the emitted announcements to our announce |
|
// batch to be broadcast once the trickle timer |
|
// ticks gain. |
|
if emittedAnnouncements != nil && shouldBroadcast { |
|
// TODO(roasbeef): exclude peer that |
|
// sent. |
|
announcements.AddMsgs( |
|
emittedAnnouncements..., |
|
) |
|
} else if emittedAnnouncements != nil { |
|
log.Trace("Skipping broadcast of " + |
|
"announcements received " + |
|
"during initial graph sync") |
|
} |
|
|
|
}() |
|
|
|
// A new block has arrived, so we can re-process the previously |
|
// premature announcements. |
|
case newBlock, ok := <-d.blockEpochs.Epochs: |
|
// If the channel has been closed, then this indicates |
|
// the daemon is shutting down, so we exit ourselves. |
|
if !ok { |
|
return |
|
} |
|
|
|
// Once a new block arrives, we update our running |
|
// track of the height of the chain tip. |
|
d.Lock() |
|
blockHeight := uint32(newBlock.Height) |
|
d.bestHeight = blockHeight |
|
d.Unlock() |
|
|
|
log.Debugf("New block: height=%d, hash=%s", blockHeight, |
|
newBlock.Hash) |
|
|
|
// The trickle timer has ticked, which indicates we should |
|
// flush to the network the pending batch of new announcements |
|
// we've received since the last trickle tick. |
|
case <-trickleTimer.C: |
|
// Emit the current batch of announcements from |
|
// deDupedAnnouncements. |
|
announcementBatch := announcements.Emit() |
|
|
|
// If the current announcements batch is nil, then we |
|
// have no further work here. |
|
if len(announcementBatch) == 0 { |
|
continue |
|
} |
|
|
|
// Next, If we have new things to announce then |
|
// broadcast them to all our immediately connected |
|
// peers. |
|
subBatchSize := calculateSubBatchSize( |
|
d.cfg.TrickleDelay, d.cfg.SubBatchDelay, d.cfg.MinimumBatchSize, |
|
len(announcementBatch), |
|
) |
|
|
|
splitAnnouncementBatch := splitAnnouncementBatches( |
|
subBatchSize, announcementBatch, |
|
) |
|
|
|
d.wg.Add(1) |
|
go func() { |
|
defer d.wg.Done() |
|
log.Infof("Broadcasting %v new announcements in %d sub batches", |
|
len(announcementBatch), len(splitAnnouncementBatch)) |
|
|
|
for _, announcementBatch := range splitAnnouncementBatch { |
|
d.sendBatch(announcementBatch) |
|
select { |
|
case <-time.After(d.cfg.SubBatchDelay): |
|
case <-d.quit: |
|
return |
|
} |
|
} |
|
}() |
|
|
|
// The retransmission timer has ticked which indicates that we |
|
// should check if we need to prune or re-broadcast any of our |
|
// personal channels or node announcement. This addresses the |
|
// case of "zombie" channels and channel advertisements that |
|
// have been dropped, or not properly propagated through the |
|
// network. |
|
case tick := <-d.cfg.RetransmitTicker.Ticks(): |
|
if err := d.retransmitStaleAnns(tick); err != nil { |
|
log.Errorf("unable to rebroadcast stale "+ |
|
"announcements: %v", err) |
|
} |
|
|
|
// The gossiper has been signalled to exit, to we exit our |
|
// main loop so the wait group can be decremented. |
|
case <-d.quit: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// TODO(roasbeef): d/c peers that send updates not on our chain |
|
|
|
// InitSyncState is called by outside sub-systems when a connection is |
|
// established to a new peer that understands how to perform channel range |
|
// queries. We'll allocate a new gossip syncer for it, and start any goroutines |
|
// needed to handle new queries. |
|
func (d *AuthenticatedGossiper) InitSyncState(syncPeer lnpeer.Peer) { |
|
d.syncMgr.InitSyncState(syncPeer) |
|
} |
|
|
|
// PruneSyncState is called by outside sub-systems once a peer that we were |
|
// previously connected to has been disconnected. In this case we can stop the |
|
// existing GossipSyncer assigned to the peer and free up resources. |
|
func (d *AuthenticatedGossiper) PruneSyncState(peer route.Vertex) { |
|
d.syncMgr.PruneSyncState(peer) |
|
} |
|
|
|
// isRecentlyRejectedMsg returns true if we recently rejected a message, and |
|
// false otherwise, This avoids expensive reprocessing of the message. |
|
func (d *AuthenticatedGossiper) isRecentlyRejectedMsg(msg lnwire.Message) bool { |
|
d.rejectMtx.RLock() |
|
defer d.rejectMtx.RUnlock() |
|
|
|
switch m := msg.(type) { |
|
case *lnwire.ChannelUpdate: |
|
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()] |
|
return ok |
|
|
|
case *lnwire.ChannelAnnouncement: |
|
_, ok := d.recentRejects[m.ShortChannelID.ToUint64()] |
|
return ok |
|
|
|
default: |
|
return false |
|
} |
|
} |
|
|
|
// retransmitStaleAnns examines all outgoing channels that the source node is |
|
// known to maintain to check to see if any of them are "stale". A channel is |
|
// stale iff, the last timestamp of its rebroadcast is older than the |
|
// RebroadcastInterval. We also check if a refreshed node announcement should |
|
// be resent. |
|
func (d *AuthenticatedGossiper) retransmitStaleAnns(now time.Time) error { |
|
// Iterate over all of our channels and check if any of them fall |
|
// within the prune interval or re-broadcast interval. |
|
type updateTuple struct { |
|
info *channeldb.ChannelEdgeInfo |
|
edge *channeldb.ChannelEdgePolicy |
|
} |
|
|
|
var ( |
|
havePublicChannels bool |
|
edgesToUpdate []updateTuple |
|
) |
|
err := d.cfg.Router.ForAllOutgoingChannels(func( |
|
info *channeldb.ChannelEdgeInfo, |
|
edge *channeldb.ChannelEdgePolicy) error { |
|
|
|
// If there's no auth proof attached to this edge, it means |
|
// that it is a private channel not meant to be announced to |
|
// the greater network, so avoid sending channel updates for |
|
// this channel to not leak its |
|
// existence. |
|
if info.AuthProof == nil { |
|
log.Debugf("Skipping retransmission of channel "+ |
|
"without AuthProof: %v", info.ChannelID) |
|
return nil |
|
} |
|
|
|
// We make a note that we have at least one public channel. We |
|
// use this to determine whether we should send a node |
|
// announcement below. |
|
havePublicChannels = true |
|
|
|
// If this edge has a ChannelUpdate that was created before the |
|
// introduction of the MaxHTLC field, then we'll update this |
|
// edge to propagate this information in the network. |
|
if !edge.MessageFlags.HasMaxHtlc() { |
|
// We'll make sure we support the new max_htlc field if |
|
// not already present. |
|
edge.MessageFlags |= lnwire.ChanUpdateOptionMaxHtlc |
|
edge.MaxHTLC = lnwire.NewMSatFromSatoshis(info.Capacity) |
|
|
|
edgesToUpdate = append(edgesToUpdate, updateTuple{ |
|
info: info, |
|
edge: edge, |
|
}) |
|
return nil |
|
} |
|
|
|
timeElapsed := now.Sub(edge.LastUpdate) |
|
|
|
// If it's been longer than RebroadcastInterval since we've |
|
// re-broadcasted the channel, add the channel to the set of |
|
// edges we need to update. |
|
if timeElapsed >= d.cfg.RebroadcastInterval { |
|
edgesToUpdate = append(edgesToUpdate, updateTuple{ |
|
info: info, |
|
edge: edge, |
|
}) |
|
} |
|
|
|
return nil |
|
}) |
|
if err != nil && err != channeldb.ErrGraphNoEdgesFound { |
|
return fmt.Errorf("unable to retrieve outgoing channels: %v", |
|
err) |
|
} |
|
|
|
var signedUpdates []lnwire.Message |
|
for _, chanToUpdate := range edgesToUpdate { |
|
// Re-sign and update the channel on disk and retrieve our |
|
// ChannelUpdate to broadcast. |
|
chanAnn, chanUpdate, err := d.updateChannel( |
|
chanToUpdate.info, chanToUpdate.edge, |
|
) |
|
if err != nil { |
|
return fmt.Errorf("unable to update channel: %v", err) |
|
} |
|
|
|
// If we have a valid announcement to transmit, then we'll send |
|
// that along with the update. |
|
if chanAnn != nil { |
|
signedUpdates = append(signedUpdates, chanAnn) |
|
} |
|
|
|
signedUpdates = append(signedUpdates, chanUpdate) |
|
} |
|
|
|
// If we don't have any public channels, we return as we don't want to |
|
// broadcast anything that would reveal our existence. |
|
if !havePublicChannels { |
|
return nil |
|
} |
|
|
|
// We'll also check that our NodeAnnouncement is not too old. |
|
currentNodeAnn, err := d.cfg.SelfNodeAnnouncement(false) |
|
if err != nil { |
|
return fmt.Errorf("unable to get current node announment: %v", |
|
err) |
|
} |
|
|
|
timestamp := time.Unix(int64(currentNodeAnn.Timestamp), 0) |
|
timeElapsed := now.Sub(timestamp) |
|
|
|
// If it's been a full day since we've re-broadcasted the |
|
// node announcement, refresh it and resend it. |
|
nodeAnnStr := "" |
|
if timeElapsed >= d.cfg.RebroadcastInterval { |
|
newNodeAnn, err := d.cfg.SelfNodeAnnouncement(true) |
|
if err != nil { |
|
return fmt.Errorf("unable to get refreshed node "+ |
|
"announcement: %v", err) |
|
} |
|
|
|
signedUpdates = append(signedUpdates, &newNodeAnn) |
|
nodeAnnStr = " and our refreshed node announcement" |
|
|
|
// Before broadcasting the refreshed node announcement, add it |
|
// to our own graph. |
|
if err := d.addNode(&newNodeAnn); err != nil { |
|
log.Errorf("Unable to add refreshed node announcement "+ |
|
"to graph: %v", err) |
|
} |
|
} |
|
|
|
// If we don't have any updates to re-broadcast, then we'll exit |
|
// early. |
|
if len(signedUpdates) == 0 { |
|
return nil |
|
} |
|
|
|
log.Infof("Retransmitting %v outgoing channels%v", |
|
len(edgesToUpdate), nodeAnnStr) |
|
|
|
// With all the wire announcements properly crafted, we'll broadcast |
|
// our known outgoing channels to all our immediate peers. |
|
if err := d.cfg.Broadcast(nil, signedUpdates...); err != nil { |
|
return fmt.Errorf("unable to re-broadcast channels: %v", err) |
|
} |
|
|
|
return nil |
|
} |
|
|
|
// processChanPolicyUpdate generates a new set of channel updates for the |
|
// provided list of edges and updates the backing ChannelGraphSource. |
|
func (d *AuthenticatedGossiper) processChanPolicyUpdate( |
|
edgesToUpdate []EdgeWithInfo) ([]networkMsg, error) { |
|
|
|
var chanUpdates []networkMsg |
|
for _, edgeInfo := range edgesToUpdate { |
|
// Now that we've collected all the channels we need to update, |
|
// we'll re-sign and update the backing ChannelGraphSource, and |
|
// retrieve our ChannelUpdate to broadcast. |
|
_, chanUpdate, err := d.updateChannel( |
|
edgeInfo.Info, edgeInfo.Edge, |
|
) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// We'll avoid broadcasting any updates for private channels to |
|
// avoid directly giving away their existence. Instead, we'll |
|
// send the update directly to the remote party. |
|
if edgeInfo.Info.AuthProof == nil { |
|
remotePubKey := remotePubFromChanInfo( |
|
edgeInfo.Info, chanUpdate.ChannelFlags, |
|
) |
|
err := d.reliableSender.sendMessage( |
|
chanUpdate, remotePubKey, |
|
) |
|
if err != nil { |
|
log.Errorf("Unable to reliably send %v for "+ |
|
"channel=%v to peer=%x: %v", |
|
chanUpdate.MsgType(), |
|
chanUpdate.ShortChannelID, |
|
remotePubKey, err) |
|
} |
|
continue |
|
} |
|
|
|
// We set ourselves as the source of this message to indicate |
|
// that we shouldn't skip any peers when sending this message. |
|
chanUpdates = append(chanUpdates, networkMsg{ |
|
source: d.selfKey, |
|
msg: chanUpdate, |
|
}) |
|
} |
|
|
|
return chanUpdates, nil |
|
} |
|
|
|
// remotePubFromChanInfo returns the public key of the remote peer given a |
|
// ChannelEdgeInfo that describe a channel we have with them. |
|
func remotePubFromChanInfo(chanInfo *channeldb.ChannelEdgeInfo, |
|
chanFlags lnwire.ChanUpdateChanFlags) [33]byte { |
|
|
|
var remotePubKey [33]byte |
|
switch { |
|
case chanFlags&lnwire.ChanUpdateDirection == 0: |
|
remotePubKey = chanInfo.NodeKey2Bytes |
|
case chanFlags&lnwire.ChanUpdateDirection == 1: |
|
remotePubKey = chanInfo.NodeKey1Bytes |
|
} |
|
|
|
return remotePubKey |
|
} |
|
|
|
// processRejectedEdge examines a rejected edge to see if we can extract any |
|
// new announcements from it. An edge will get rejected if we already added |
|
// the same edge without AuthProof to the graph. If the received announcement |
|
// contains a proof, we can add this proof to our edge. We can end up in this |
|
// situation in the case where we create a channel, but for some reason fail |
|
// to receive the remote peer's proof, while the remote peer is able to fully |
|
// assemble the proof and craft the ChannelAnnouncement. |
|
func (d *AuthenticatedGossiper) processRejectedEdge( |
|
chanAnnMsg *lnwire.ChannelAnnouncement, |
|
proof *channeldb.ChannelAuthProof) ([]networkMsg, error) { |
|
|
|
// First, we'll fetch the state of the channel as we know if from the |
|
// database. |
|
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( |
|
chanAnnMsg.ShortChannelID, |
|
) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
// The edge is in the graph, and has a proof attached, then we'll just |
|
// reject it as normal. |
|
if chanInfo.AuthProof != nil { |
|
return nil, nil |
|
} |
|
|
|
// Otherwise, this means that the edge is within the graph, but it |
|
// doesn't yet have a proper proof attached. If we did not receive |
|
// the proof such that we now can add it, there's nothing more we |
|
// can do. |
|
if proof == nil { |
|
return nil, nil |
|
} |
|
|
|
// We'll then create then validate the new fully assembled |
|
// announcement. |
|
chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement( |
|
proof, chanInfo, e1, e2, |
|
) |
|
if err != nil { |
|
return nil, err |
|
} |
|
err = routing.ValidateChannelAnn(chanAnn) |
|
if err != nil { |
|
err := fmt.Errorf("assembled channel announcement proof "+ |
|
"for shortChanID=%v isn't valid: %v", |
|
chanAnnMsg.ShortChannelID, err) |
|
log.Error(err) |
|
return nil, err |
|
} |
|
|
|
// If everything checks out, then we'll add the fully assembled proof |
|
// to the database. |
|
err = d.cfg.Router.AddProof(chanAnnMsg.ShortChannelID, proof) |
|
if err != nil { |
|
err := fmt.Errorf("unable add proof to shortChanID=%v: %v", |
|
chanAnnMsg.ShortChannelID, err) |
|
log.Error(err) |
|
return nil, err |
|
} |
|
|
|
// As we now have a complete channel announcement for this channel, |
|
// we'll construct the announcement so they can be broadcast out to all |
|
// our peers. |
|
announcements := make([]networkMsg, 0, 3) |
|
announcements = append(announcements, networkMsg{ |
|
source: d.selfKey, |
|
msg: chanAnn, |
|
}) |
|
if e1Ann != nil { |
|
announcements = append(announcements, networkMsg{ |
|
source: d.selfKey, |
|
msg: e1Ann, |
|
}) |
|
} |
|
if e2Ann != nil { |
|
announcements = append(announcements, networkMsg{ |
|
source: d.selfKey, |
|
msg: e2Ann, |
|
}) |
|
|
|
} |
|
|
|
return announcements, nil |
|
} |
|
|
|
// addNode processes the given node announcement, and adds it to our channel |
|
// graph. |
|
func (d *AuthenticatedGossiper) addNode(msg *lnwire.NodeAnnouncement, |
|
op ...batch.SchedulerOption) error { |
|
|
|
if err := routing.ValidateNodeAnn(msg); err != nil { |
|
return fmt.Errorf("unable to validate node announcement: %v", |
|
err) |
|
} |
|
|
|
timestamp := time.Unix(int64(msg.Timestamp), 0) |
|
features := lnwire.NewFeatureVector(msg.Features, lnwire.Features) |
|
node := &channeldb.LightningNode{ |
|
HaveNodeAnnouncement: true, |
|
LastUpdate: timestamp, |
|
Addresses: msg.Addresses, |
|
PubKeyBytes: msg.NodeID, |
|
Alias: msg.Alias.String(), |
|
AuthSigBytes: msg.Signature.ToSignatureBytes(), |
|
Features: features, |
|
Color: msg.RGBColor, |
|
ExtraOpaqueData: msg.ExtraOpaqueData, |
|
} |
|
|
|
return d.cfg.Router.AddNode(node, op...) |
|
} |
|
|
|
// processNetworkAnnouncement processes a new network relate authenticated |
|
// channel or node announcement or announcements proofs. If the announcement |
|
// didn't affect the internal state due to either being out of date, invalid, |
|
// or redundant, then nil is returned. Otherwise, the set of announcements will |
|
// be returned which should be broadcasted to the rest of the network. The |
|
// boolean returned indicates whether any dependents of the announcement should |
|
// attempt to be processed as well. |
|
func (d *AuthenticatedGossiper) processNetworkAnnouncement( |
|
nMsg *networkMsg) ([]networkMsg, bool) { |
|
|
|
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool { |
|
// TODO(roasbeef) make height delta 6 |
|
// * or configurable |
|
return chanID.BlockHeight+delta > d.bestHeight |
|
} |
|
|
|
// If this is a remote update, we set the scheduler option to lazily |
|
// add it to the graph. |
|
var schedulerOp []batch.SchedulerOption |
|
if nMsg.isRemote { |
|
schedulerOp = append(schedulerOp, batch.LazyAdd()) |
|
} |
|
|
|
var announcements []networkMsg |
|
|
|
switch msg := nMsg.msg.(type) { |
|
|
|
// A new node announcement has arrived which either presents new |
|
// information about a node in one of the channels we know about, or a |
|
// updating previously advertised information. |
|
case *lnwire.NodeAnnouncement: |
|
timestamp := time.Unix(int64(msg.Timestamp), 0) |
|
|
|
// We'll quickly ask the router if it already has a |
|
// newer update for this node so we can skip validating |
|
// signatures if not required. |
|
if d.cfg.Router.IsStaleNode(msg.NodeID, timestamp) { |
|
nMsg.err <- nil |
|
return nil, true |
|
} |
|
|
|
if err := d.addNode(msg, schedulerOp...); err != nil { |
|
if routing.IsError(err, routing.ErrOutdated, |
|
routing.ErrIgnored) { |
|
|
|
log.Debug(err) |
|
} else if err != routing.ErrVBarrierShuttingDown { |
|
log.Error(err) |
|
} |
|
|
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// In order to ensure we don't leak unadvertised nodes, we'll |
|
// make a quick check to ensure this node intends to publicly |
|
// advertise itself to the network. |
|
isPublic, err := d.cfg.Router.IsPublicNode(msg.NodeID) |
|
if err != nil { |
|
log.Errorf("Unable to determine if node %x is "+ |
|
"advertised: %v", msg.NodeID, err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// If it does, we'll add their announcement to our batch so that |
|
// it can be broadcast to the rest of our peers. |
|
if isPublic { |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: nMsg.source, |
|
msg: msg, |
|
}) |
|
} else { |
|
log.Tracef("Skipping broadcasting node announcement "+ |
|
"for %x due to being unadvertised", msg.NodeID) |
|
} |
|
|
|
nMsg.err <- nil |
|
// TODO(roasbeef): get rid of the above |
|
return announcements, true |
|
|
|
// A new channel announcement has arrived, this indicates the |
|
// *creation* of a new channel within the network. This only advertises |
|
// the existence of a channel and not yet the routing policies in |
|
// either direction of the channel. |
|
case *lnwire.ChannelAnnouncement: |
|
// We'll ignore any channel announcements that target any chain |
|
// other than the set of chains we know of. |
|
if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { |
|
err := fmt.Errorf("ignoring ChannelAnnouncement from "+ |
|
"chain=%v, gossiper on chain=%v", msg.ChainHash, |
|
d.cfg.ChainHash) |
|
log.Errorf(err.Error()) |
|
|
|
d.rejectMtx.Lock() |
|
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} |
|
d.rejectMtx.Unlock() |
|
|
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// If the advertised inclusionary block is beyond our knowledge |
|
// of the chain tip, then we'll ignore for it now. |
|
d.Lock() |
|
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { |
|
log.Infof("Announcement for chan_id=(%v), is "+ |
|
"premature: advertises height %v, only "+ |
|
"height %v is known", |
|
msg.ShortChannelID.ToUint64(), |
|
msg.ShortChannelID.BlockHeight, |
|
d.bestHeight) |
|
d.Unlock() |
|
nMsg.err <- nil |
|
return nil, false |
|
} |
|
d.Unlock() |
|
|
|
// At this point, we'll now ask the router if this is a |
|
// zombie/known edge. If so we can skip all the processing |
|
// below. |
|
if d.cfg.Router.IsKnownEdge(msg.ShortChannelID) { |
|
nMsg.err <- nil |
|
return nil, true |
|
} |
|
|
|
// If this is a remote channel announcement, then we'll validate |
|
// all the signatures within the proof as it should be well |
|
// formed. |
|
var proof *channeldb.ChannelAuthProof |
|
if nMsg.isRemote { |
|
if err := routing.ValidateChannelAnn(msg); err != nil { |
|
err := fmt.Errorf("unable to validate "+ |
|
"announcement: %v", err) |
|
d.rejectMtx.Lock() |
|
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} |
|
d.rejectMtx.Unlock() |
|
|
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// If the proof checks out, then we'll save the proof |
|
// itself to the database so we can fetch it later when |
|
// gossiping with other nodes. |
|
proof = &channeldb.ChannelAuthProof{ |
|
NodeSig1Bytes: msg.NodeSig1.ToSignatureBytes(), |
|
NodeSig2Bytes: msg.NodeSig2.ToSignatureBytes(), |
|
BitcoinSig1Bytes: msg.BitcoinSig1.ToSignatureBytes(), |
|
BitcoinSig2Bytes: msg.BitcoinSig2.ToSignatureBytes(), |
|
} |
|
} |
|
|
|
// With the proof validate (if necessary), we can now store it |
|
// within the database for our path finding and syncing needs. |
|
var featureBuf bytes.Buffer |
|
if err := msg.Features.Encode(&featureBuf); err != nil { |
|
log.Errorf("unable to encode features: %v", err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
edge := &channeldb.ChannelEdgeInfo{ |
|
ChannelID: msg.ShortChannelID.ToUint64(), |
|
ChainHash: msg.ChainHash, |
|
NodeKey1Bytes: msg.NodeID1, |
|
NodeKey2Bytes: msg.NodeID2, |
|
BitcoinKey1Bytes: msg.BitcoinKey1, |
|
BitcoinKey2Bytes: msg.BitcoinKey2, |
|
AuthProof: proof, |
|
Features: featureBuf.Bytes(), |
|
ExtraOpaqueData: msg.ExtraOpaqueData, |
|
} |
|
|
|
// If there were any optional message fields provided, we'll |
|
// include them in its serialized disk representation now. |
|
if nMsg.optionalMsgFields != nil { |
|
if nMsg.optionalMsgFields.capacity != nil { |
|
edge.Capacity = *nMsg.optionalMsgFields.capacity |
|
} |
|
if nMsg.optionalMsgFields.channelPoint != nil { |
|
edge.ChannelPoint = *nMsg.optionalMsgFields.channelPoint |
|
} |
|
} |
|
|
|
// We will add the edge to the channel router. If the nodes |
|
// present in this channel are not present in the database, a |
|
// partial node will be added to represent each node while we |
|
// wait for a node announcement. |
|
// |
|
// Before we add the edge to the database, we obtain |
|
// the mutex for this channel ID. We do this to ensure |
|
// no other goroutine has read the database and is now |
|
// making decisions based on this DB state, before it |
|
// writes to the DB. |
|
d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) |
|
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) |
|
if err := d.cfg.Router.AddEdge(edge, schedulerOp...); err != nil { |
|
// If the edge was rejected due to already being known, |
|
// then it may be that case that this new message has a |
|
// fresh channel proof, so we'll check. |
|
if routing.IsError(err, routing.ErrIgnored) { |
|
// Attempt to process the rejected message to |
|
// see if we get any new announcements. |
|
anns, rErr := d.processRejectedEdge(msg, proof) |
|
if rErr != nil { |
|
d.rejectMtx.Lock() |
|
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} |
|
d.rejectMtx.Unlock() |
|
nMsg.err <- rErr |
|
return nil, false |
|
} |
|
|
|
// If while processing this rejected edge, we |
|
// realized there's a set of announcements we |
|
// could extract, then we'll return those |
|
// directly. |
|
if len(anns) != 0 { |
|
nMsg.err <- nil |
|
return anns, true |
|
} |
|
|
|
// Otherwise, this is just a regular rejected |
|
// edge. |
|
log.Debugf("Router rejected channel "+ |
|
"edge: %v", err) |
|
} else { |
|
log.Tracef("Router rejected channel "+ |
|
"edge: %v", err) |
|
|
|
d.rejectMtx.Lock() |
|
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} |
|
d.rejectMtx.Unlock() |
|
} |
|
|
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// If we earlier received any ChannelUpdates for this channel, |
|
// we can now process them, as the channel is added to the |
|
// graph. |
|
shortChanID := msg.ShortChannelID.ToUint64() |
|
var channelUpdates []*networkMsg |
|
|
|
d.pChanUpdMtx.Lock() |
|
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...) |
|
|
|
// Now delete the premature ChannelUpdates, since we added them |
|
// all to the queue of network messages. |
|
delete(d.prematureChannelUpdates, shortChanID) |
|
d.pChanUpdMtx.Unlock() |
|
|
|
// Launch a new goroutine to handle each ChannelUpdate, this to |
|
// ensure we don't block here, as we can handle only one |
|
// announcement at a time. |
|
for _, cu := range channelUpdates { |
|
d.wg.Add(1) |
|
go func(nMsg *networkMsg) { |
|
defer d.wg.Done() |
|
|
|
switch msg := nMsg.msg.(type) { |
|
|
|
// Reprocess the message, making sure we return |
|
// an error to the original caller in case the |
|
// gossiper shuts down. |
|
case *lnwire.ChannelUpdate: |
|
log.Debugf("Reprocessing"+ |
|
" ChannelUpdate for "+ |
|
"shortChanID=%v", |
|
msg.ShortChannelID.ToUint64()) |
|
|
|
select { |
|
case d.networkMsgs <- nMsg: |
|
case <-d.quit: |
|
nMsg.err <- ErrGossiperShuttingDown |
|
} |
|
|
|
// We don't expect any other message type than |
|
// ChannelUpdate to be in this map. |
|
default: |
|
log.Errorf("Unsupported message type "+ |
|
"found among ChannelUpdates: "+ |
|
"%T", msg) |
|
} |
|
}(cu) |
|
} |
|
|
|
// Channel announcement was successfully proceeded and know it |
|
// might be broadcast to other connected nodes if it was |
|
// announcement with proof (remote). |
|
if proof != nil { |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: nMsg.source, |
|
msg: msg, |
|
}) |
|
} |
|
|
|
nMsg.err <- nil |
|
return announcements, true |
|
|
|
// A new authenticated channel edge update has arrived. This indicates |
|
// that the directional information for an already known channel has |
|
// been updated. |
|
case *lnwire.ChannelUpdate: |
|
// We'll ignore any channel announcements that target any chain |
|
// other than the set of chains we know of. |
|
if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { |
|
err := fmt.Errorf("ignoring ChannelUpdate from "+ |
|
"chain=%v, gossiper on chain=%v", msg.ChainHash, |
|
d.cfg.ChainHash) |
|
log.Errorf(err.Error()) |
|
|
|
d.rejectMtx.Lock() |
|
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} |
|
d.rejectMtx.Unlock() |
|
|
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
blockHeight := msg.ShortChannelID.BlockHeight |
|
shortChanID := msg.ShortChannelID.ToUint64() |
|
|
|
// If the advertised inclusionary block is beyond our knowledge |
|
// of the chain tip, then we'll put the announcement in limbo |
|
// to be fully verified once we advance forward in the chain. |
|
d.Lock() |
|
if nMsg.isRemote && isPremature(msg.ShortChannelID, 0) { |
|
log.Infof("Update announcement for "+ |
|
"short_chan_id(%v), is premature: advertises "+ |
|
"height %v, only height %v is known", |
|
shortChanID, blockHeight, |
|
d.bestHeight) |
|
d.Unlock() |
|
nMsg.err <- nil |
|
return nil, false |
|
} |
|
d.Unlock() |
|
|
|
// Before we perform any of the expensive checks below, we'll |
|
// check whether this update is stale or is for a zombie |
|
// channel in order to quickly reject it. |
|
timestamp := time.Unix(int64(msg.Timestamp), 0) |
|
if d.cfg.Router.IsStaleEdgePolicy( |
|
msg.ShortChannelID, timestamp, msg.ChannelFlags, |
|
) { |
|
nMsg.err <- nil |
|
return nil, true |
|
} |
|
|
|
// Get the node pub key as far as we don't have it in channel |
|
// update announcement message. We'll need this to properly |
|
// verify message signature. |
|
// |
|
// We make sure to obtain the mutex for this channel ID |
|
// before we access the database. This ensures the state |
|
// we read from the database has not changed between this |
|
// point and when we call UpdateEdge() later. |
|
d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) |
|
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) |
|
chanInfo, edge1, edge2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) |
|
switch err { |
|
// No error, break. |
|
case nil: |
|
break |
|
|
|
case channeldb.ErrZombieEdge: |
|
err = d.processZombieUpdate(chanInfo, msg) |
|
if err != nil { |
|
log.Debug(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// We'll fallthrough to ensure we stash the update until |
|
// we receive its corresponding ChannelAnnouncement. |
|
// This is needed to ensure the edge exists in the graph |
|
// before applying the update. |
|
fallthrough |
|
case channeldb.ErrGraphNotFound: |
|
fallthrough |
|
case channeldb.ErrGraphNoEdgesFound: |
|
fallthrough |
|
case channeldb.ErrEdgeNotFound: |
|
// If the edge corresponding to this ChannelUpdate was |
|
// not found in the graph, this might be a channel in |
|
// the process of being opened, and we haven't processed |
|
// our own ChannelAnnouncement yet, hence it is not |
|
// found in the graph. This usually gets resolved after |
|
// the channel proofs are exchanged and the channel is |
|
// broadcasted to the rest of the network, but in case |
|
// this is a private channel this won't ever happen. |
|
// This can also happen in the case of a zombie channel |
|
// with a fresh update for which we don't have a |
|
// ChannelAnnouncement for since we reject them. Because |
|
// of this, we temporarily add it to a map, and |
|
// reprocess it after our own ChannelAnnouncement has |
|
// been processed. |
|
d.pChanUpdMtx.Lock() |
|
d.prematureChannelUpdates[shortChanID] = append( |
|
d.prematureChannelUpdates[shortChanID], nMsg, |
|
) |
|
d.pChanUpdMtx.Unlock() |
|
|
|
log.Debugf("Got ChannelUpdate for edge not found in "+ |
|
"graph(shortChanID=%v), saving for "+ |
|
"reprocessing later", shortChanID) |
|
|
|
// NOTE: We don't return anything on the error channel |
|
// for this message, as we expect that will be done when |
|
// this ChannelUpdate is later reprocessed. |
|
return nil, false |
|
|
|
default: |
|
err := fmt.Errorf("unable to validate channel update "+ |
|
"short_chan_id=%v: %v", shortChanID, err) |
|
log.Error(err) |
|
nMsg.err <- err |
|
|
|
d.rejectMtx.Lock() |
|
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} |
|
d.rejectMtx.Unlock() |
|
return nil, false |
|
} |
|
|
|
// The least-significant bit in the flag on the channel update |
|
// announcement tells us "which" side of the channels directed |
|
// edge is being updated. |
|
var ( |
|
pubKey *btcec.PublicKey |
|
edgeToUpdate *channeldb.ChannelEdgePolicy |
|
) |
|
direction := msg.ChannelFlags & lnwire.ChanUpdateDirection |
|
switch direction { |
|
case 0: |
|
pubKey, _ = chanInfo.NodeKey1() |
|
edgeToUpdate = edge1 |
|
case 1: |
|
pubKey, _ = chanInfo.NodeKey2() |
|
edgeToUpdate = edge2 |
|
} |
|
|
|
// If we have a previous version of the edge being updated, |
|
// we'll want to rate limit its updates to prevent spam |
|
// throughout the network. |
|
if nMsg.isRemote && edgeToUpdate != nil { |
|
// If it's a keep-alive update, we'll only propagate one |
|
// if it's been a day since the previous. This follows |
|
// our own heuristic of sending keep-alive updates after |
|
// the same duration (see retransmitStaleAnns). |
|
timeSinceLastUpdate := timestamp.Sub(edgeToUpdate.LastUpdate) |
|
if IsKeepAliveUpdate(msg, edgeToUpdate) { |
|
if timeSinceLastUpdate < d.cfg.RebroadcastInterval { |
|
log.Debugf("Ignoring keep alive update "+ |
|
"not within %v period for "+ |
|
"channel %v", |
|
d.cfg.RebroadcastInterval, |
|
shortChanID) |
|
nMsg.err <- nil |
|
return nil, false |
|
} |
|
} else { |
|
// If it's not, we'll allow an update per minute |
|
// with a maximum burst of 10. If we haven't |
|
// seen an update for this channel before, we'll |
|
// need to initialize a rate limiter for each |
|
// direction. |
|
d.Lock() |
|
rateLimiters, ok := d.chanUpdateRateLimiter[shortChanID] |
|
if !ok { |
|
r := rate.Every(d.cfg.ChannelUpdateInterval) |
|
b := d.cfg.MaxChannelUpdateBurst |
|
rateLimiters = [2]*rate.Limiter{ |
|
rate.NewLimiter(r, b), |
|
rate.NewLimiter(r, b), |
|
} |
|
d.chanUpdateRateLimiter[shortChanID] = rateLimiters |
|
} |
|
d.Unlock() |
|
|
|
if !rateLimiters[direction].Allow() { |
|
log.Debugf("Rate limiting update for "+ |
|
"channel %v from direction %x", |
|
shortChanID, |
|
pubKey.SerializeCompressed()) |
|
nMsg.err <- nil |
|
return nil, false |
|
} |
|
} |
|
} |
|
|
|
// Validate the channel announcement with the expected public key and |
|
// channel capacity. In the case of an invalid channel update, we'll |
|
// return an error to the caller and exit early. |
|
err = routing.ValidateChannelUpdateAnn(pubKey, chanInfo.Capacity, msg) |
|
if err != nil { |
|
rErr := fmt.Errorf("unable to validate channel "+ |
|
"update announcement for short_chan_id=%v: %v", |
|
spew.Sdump(msg.ShortChannelID), err) |
|
|
|
log.Error(rErr) |
|
nMsg.err <- rErr |
|
return nil, false |
|
} |
|
|
|
update := &channeldb.ChannelEdgePolicy{ |
|
SigBytes: msg.Signature.ToSignatureBytes(), |
|
ChannelID: shortChanID, |
|
LastUpdate: timestamp, |
|
MessageFlags: msg.MessageFlags, |
|
ChannelFlags: msg.ChannelFlags, |
|
TimeLockDelta: msg.TimeLockDelta, |
|
MinHTLC: msg.HtlcMinimumMsat, |
|
MaxHTLC: msg.HtlcMaximumMsat, |
|
FeeBaseMSat: lnwire.MilliSatoshi(msg.BaseFee), |
|
FeeProportionalMillionths: lnwire.MilliSatoshi(msg.FeeRate), |
|
ExtraOpaqueData: msg.ExtraOpaqueData, |
|
} |
|
|
|
if err := d.cfg.Router.UpdateEdge(update, schedulerOp...); err != nil { |
|
if routing.IsError(err, routing.ErrOutdated, |
|
routing.ErrIgnored) { |
|
log.Debug(err) |
|
} else if err != routing.ErrVBarrierShuttingDown { |
|
d.rejectMtx.Lock() |
|
d.recentRejects[msg.ShortChannelID.ToUint64()] = struct{}{} |
|
d.rejectMtx.Unlock() |
|
log.Error(err) |
|
} |
|
|
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// If this is a local ChannelUpdate without an AuthProof, it |
|
// means it is an update to a channel that is not (yet) |
|
// supposed to be announced to the greater network. However, |
|
// our channel counter party will need to be given the update, |
|
// so we'll try sending the update directly to the remote peer. |
|
if !nMsg.isRemote && chanInfo.AuthProof == nil { |
|
// Get our peer's public key. |
|
remotePubKey := remotePubFromChanInfo( |
|
chanInfo, msg.ChannelFlags, |
|
) |
|
|
|
// Now, we'll attempt to send the channel update message |
|
// reliably to the remote peer in the background, so |
|
// that we don't block if the peer happens to be offline |
|
// at the moment. |
|
err := d.reliableSender.sendMessage(msg, remotePubKey) |
|
if err != nil { |
|
err := fmt.Errorf("unable to reliably send %v "+ |
|
"for channel=%v to peer=%x: %v", |
|
msg.MsgType(), msg.ShortChannelID, |
|
remotePubKey, err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
} |
|
|
|
// Channel update announcement was successfully processed and |
|
// now it can be broadcast to the rest of the network. However, |
|
// we'll only broadcast the channel update announcement if it |
|
// has an attached authentication proof. |
|
if chanInfo.AuthProof != nil { |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: nMsg.source, |
|
msg: msg, |
|
}) |
|
} |
|
|
|
nMsg.err <- nil |
|
return announcements, true |
|
|
|
// A new signature announcement has been received. This indicates |
|
// willingness of nodes involved in the funding of a channel to |
|
// announce this new channel to the rest of the world. |
|
case *lnwire.AnnounceSignatures: |
|
needBlockHeight := msg.ShortChannelID.BlockHeight + |
|
d.cfg.ProofMatureDelta |
|
shortChanID := msg.ShortChannelID.ToUint64() |
|
|
|
prefix := "local" |
|
if nMsg.isRemote { |
|
prefix = "remote" |
|
} |
|
|
|
log.Infof("Received new %v channel announcement for %v", prefix, |
|
msg.ShortChannelID) |
|
|
|
// By the specification, channel announcement proofs should be |
|
// sent after some number of confirmations after channel was |
|
// registered in bitcoin blockchain. Therefore, we check if the |
|
// proof is premature. |
|
d.Lock() |
|
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) { |
|
log.Infof("Premature proof announcement, current "+ |
|
"block height lower than needed: %v < %v", |
|
d.bestHeight, needBlockHeight) |
|
d.Unlock() |
|
nMsg.err <- nil |
|
return nil, false |
|
} |
|
d.Unlock() |
|
|
|
// Ensure that we know of a channel with the target channel ID |
|
// before proceeding further. |
|
// |
|
// We must acquire the mutex for this channel ID before getting |
|
// the channel from the database, to ensure what we read does |
|
// not change before we call AddProof() later. |
|
d.channelMtx.Lock(msg.ShortChannelID.ToUint64()) |
|
defer d.channelMtx.Unlock(msg.ShortChannelID.ToUint64()) |
|
|
|
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID( |
|
msg.ShortChannelID) |
|
if err != nil { |
|
// TODO(andrew.shvv) this is dangerous because remote |
|
// node might rewrite the waiting proof. |
|
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) |
|
err := d.cfg.WaitingProofStore.Add(proof) |
|
if err != nil { |
|
err := fmt.Errorf("unable to store "+ |
|
"the proof for short_chan_id=%v: %v", |
|
shortChanID, err) |
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
log.Infof("Orphan %v proof announcement with "+ |
|
"short_chan_id=%v, adding "+ |
|
"to waiting batch", prefix, shortChanID) |
|
nMsg.err <- nil |
|
return nil, false |
|
} |
|
|
|
nodeID := nMsg.source.SerializeCompressed() |
|
isFirstNode := bytes.Equal(nodeID, chanInfo.NodeKey1Bytes[:]) |
|
isSecondNode := bytes.Equal(nodeID, chanInfo.NodeKey2Bytes[:]) |
|
|
|
// Ensure that channel that was retrieved belongs to the peer |
|
// which sent the proof announcement. |
|
if !(isFirstNode || isSecondNode) { |
|
err := fmt.Errorf("channel that was received not "+ |
|
"belongs to the peer which sent the proof, "+ |
|
"short_chan_id=%v", shortChanID) |
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// If proof was sent by a local sub-system, then we'll |
|
// send the announcement signature to the remote node |
|
// so they can also reconstruct the full channel |
|
// announcement. |
|
if !nMsg.isRemote { |
|
var remotePubKey [33]byte |
|
if isFirstNode { |
|
remotePubKey = chanInfo.NodeKey2Bytes |
|
} else { |
|
remotePubKey = chanInfo.NodeKey1Bytes |
|
} |
|
// Since the remote peer might not be online |
|
// we'll call a method that will attempt to |
|
// deliver the proof when it comes online. |
|
err := d.reliableSender.sendMessage(msg, remotePubKey) |
|
if err != nil { |
|
err := fmt.Errorf("unable to reliably send %v "+ |
|
"for channel=%v to peer=%x: %v", |
|
msg.MsgType(), msg.ShortChannelID, |
|
remotePubKey, err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
} |
|
|
|
// Check if we already have the full proof for this channel. |
|
if chanInfo.AuthProof != nil { |
|
// If we already have the fully assembled proof, then |
|
// the peer sending us their proof has probably not |
|
// received our local proof yet. So be kind and send |
|
// them the full proof. |
|
if nMsg.isRemote { |
|
peerID := nMsg.source.SerializeCompressed() |
|
log.Debugf("Got AnnounceSignatures for " + |
|
"channel with full proof.") |
|
|
|
d.wg.Add(1) |
|
go func() { |
|
defer d.wg.Done() |
|
log.Debugf("Received half proof for "+ |
|
"channel %v with existing "+ |
|
"full proof. Sending full "+ |
|
"proof to peer=%x", |
|
msg.ChannelID, |
|
peerID) |
|
|
|
chanAnn, _, _, err := netann.CreateChanAnnouncement( |
|
chanInfo.AuthProof, chanInfo, |
|
e1, e2, |
|
) |
|
if err != nil { |
|
log.Errorf("unable to gen "+ |
|
"ann: %v", err) |
|
return |
|
} |
|
err = nMsg.peer.SendMessage( |
|
false, chanAnn, |
|
) |
|
if err != nil { |
|
log.Errorf("Failed sending "+ |
|
"full proof to "+ |
|
"peer=%x: %v", |
|
peerID, err) |
|
return |
|
} |
|
log.Debugf("Full proof sent to peer=%x"+ |
|
" for chanID=%v", peerID, |
|
msg.ChannelID) |
|
}() |
|
} |
|
|
|
log.Debugf("Already have proof for channel "+ |
|
"with chanID=%v", msg.ChannelID) |
|
nMsg.err <- nil |
|
return nil, true |
|
} |
|
|
|
// Check that we received the opposite proof. If so, then we're |
|
// now able to construct the full proof, and create the channel |
|
// announcement. If we didn't receive the opposite half of the |
|
// proof than we should store it this one, and wait for |
|
// opposite to be received. |
|
proof := channeldb.NewWaitingProof(nMsg.isRemote, msg) |
|
oppositeProof, err := d.cfg.WaitingProofStore.Get( |
|
proof.OppositeKey(), |
|
) |
|
if err != nil && err != channeldb.ErrWaitingProofNotFound { |
|
err := fmt.Errorf("unable to get "+ |
|
"the opposite proof for short_chan_id=%v: %v", |
|
shortChanID, err) |
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
if err == channeldb.ErrWaitingProofNotFound { |
|
err := d.cfg.WaitingProofStore.Add(proof) |
|
if err != nil { |
|
err := fmt.Errorf("unable to store "+ |
|
"the proof for short_chan_id=%v: %v", |
|
shortChanID, err) |
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
log.Infof("1/2 of channel ann proof received for "+ |
|
"short_chan_id=%v, waiting for other half", |
|
shortChanID) |
|
|
|
nMsg.err <- nil |
|
return nil, false |
|
} |
|
|
|
// We now have both halves of the channel announcement proof, |
|
// then we'll reconstruct the initial announcement so we can |
|
// validate it shortly below. |
|
var dbProof channeldb.ChannelAuthProof |
|
if isFirstNode { |
|
dbProof.NodeSig1Bytes = msg.NodeSignature.ToSignatureBytes() |
|
dbProof.NodeSig2Bytes = oppositeProof.NodeSignature.ToSignatureBytes() |
|
dbProof.BitcoinSig1Bytes = msg.BitcoinSignature.ToSignatureBytes() |
|
dbProof.BitcoinSig2Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() |
|
} else { |
|
dbProof.NodeSig1Bytes = oppositeProof.NodeSignature.ToSignatureBytes() |
|
dbProof.NodeSig2Bytes = msg.NodeSignature.ToSignatureBytes() |
|
dbProof.BitcoinSig1Bytes = oppositeProof.BitcoinSignature.ToSignatureBytes() |
|
dbProof.BitcoinSig2Bytes = msg.BitcoinSignature.ToSignatureBytes() |
|
} |
|
chanAnn, e1Ann, e2Ann, err := netann.CreateChanAnnouncement( |
|
&dbProof, chanInfo, e1, e2, |
|
) |
|
if err != nil { |
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// With all the necessary components assembled validate the |
|
// full channel announcement proof. |
|
if err := routing.ValidateChannelAnn(chanAnn); err != nil { |
|
err := fmt.Errorf("channel announcement proof "+ |
|
"for short_chan_id=%v isn't valid: %v", |
|
shortChanID, err) |
|
|
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// If the channel was returned by the router it means that |
|
// existence of funding point and inclusion of nodes bitcoin |
|
// keys in it already checked by the router. In this stage we |
|
// should check that node keys are attest to the bitcoin keys |
|
// by validating the signatures of announcement. If proof is |
|
// valid then we'll populate the channel edge with it, so we |
|
// can announce it on peer connect. |
|
err = d.cfg.Router.AddProof(msg.ShortChannelID, &dbProof) |
|
if err != nil { |
|
err := fmt.Errorf("unable add proof to the "+ |
|
"channel chanID=%v: %v", msg.ChannelID, err) |
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
err = d.cfg.WaitingProofStore.Remove(proof.OppositeKey()) |
|
if err != nil { |
|
err := fmt.Errorf("unable remove opposite proof "+ |
|
"for the channel with chanID=%v: %v", |
|
msg.ChannelID, err) |
|
log.Error(err) |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
|
|
// Proof was successfully created and now can announce the |
|
// channel to the remain network. |
|
log.Infof("Fully valid channel proof for short_chan_id=%v "+ |
|
"constructed, adding to next ann batch", |
|
shortChanID) |
|
|
|
// Assemble the necessary announcements to add to the next |
|
// broadcasting batch. |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: nMsg.source, |
|
msg: chanAnn, |
|
}) |
|
if src, err := chanInfo.NodeKey1(); err == nil && e1Ann != nil { |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: src, |
|
msg: e1Ann, |
|
}) |
|
} |
|
if src, err := chanInfo.NodeKey2(); err == nil && e2Ann != nil { |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: src, |
|
msg: e2Ann, |
|
}) |
|
} |
|
|
|
// We'll also send along the node announcements for each channel |
|
// participant if we know of them. To ensure our node |
|
// announcement propagates to our channel counterparty, we'll |
|
// set the source for each announcement to the node it belongs |
|
// to, otherwise we won't send it since the source gets skipped. |
|
// This isn't necessary for channel updates and announcement |
|
// signatures since we send those directly to our channel |
|
// counterparty through the gossiper's reliable sender. |
|
node1Ann, err := d.fetchNodeAnn(chanInfo.NodeKey1Bytes) |
|
if err != nil { |
|
log.Debugf("Unable to fetch node announcement for "+ |
|
"%x: %v", chanInfo.NodeKey1Bytes, err) |
|
} else { |
|
if nodeKey1, err := chanInfo.NodeKey1(); err == nil { |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: nodeKey1, |
|
msg: node1Ann, |
|
}) |
|
} |
|
} |
|
node2Ann, err := d.fetchNodeAnn(chanInfo.NodeKey2Bytes) |
|
if err != nil { |
|
log.Debugf("Unable to fetch node announcement for "+ |
|
"%x: %v", chanInfo.NodeKey2Bytes, err) |
|
} else { |
|
if nodeKey2, err := chanInfo.NodeKey2(); err == nil { |
|
announcements = append(announcements, networkMsg{ |
|
peer: nMsg.peer, |
|
source: nodeKey2, |
|
msg: node2Ann, |
|
}) |
|
} |
|
} |
|
|
|
nMsg.err <- nil |
|
return announcements, true |
|
|
|
default: |
|
err := errors.New("wrong type of the announcement") |
|
nMsg.err <- err |
|
return nil, false |
|
} |
|
} |
|
|
|
// processZombieUpdate determines whether the provided channel update should |
|
// resurrect a given zombie edge. |
|
func (d *AuthenticatedGossiper) processZombieUpdate( |
|
chanInfo *channeldb.ChannelEdgeInfo, msg *lnwire.ChannelUpdate) error { |
|
|
|
// The least-significant bit in the flag on the channel update tells us |
|
// which edge is being updated. |
|
isNode1 := msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 |
|
|
|
// Since we've deemed the update as not stale above, before marking it |
|
// live, we'll make sure it has been signed by the correct party. If we |
|
// have both pubkeys, either party can resurect the channel. If we've |
|
// already marked this with the stricter, single-sided resurrection we |
|
// will only have the pubkey of the node with the oldest timestamp. |
|
var pubKey *btcec.PublicKey |
|
switch { |
|
case isNode1 && chanInfo.NodeKey1Bytes != emptyPubkey: |
|
pubKey, _ = chanInfo.NodeKey1() |
|
case !isNode1 && chanInfo.NodeKey2Bytes != emptyPubkey: |
|
pubKey, _ = chanInfo.NodeKey2() |
|
} |
|
if pubKey == nil { |
|
return fmt.Errorf("incorrect pubkey to resurrect zombie "+ |
|
"with chan_id=%v", msg.ShortChannelID) |
|
} |
|
|
|
err := routing.VerifyChannelUpdateSignature(msg, pubKey) |
|
if err != nil { |
|
return fmt.Errorf("unable to verify channel "+ |
|
"update signature: %v", err) |
|
} |
|
|
|
// With the signature valid, we'll proceed to mark the |
|
// edge as live and wait for the channel announcement to |
|
// come through again. |
|
err = d.cfg.Router.MarkEdgeLive(msg.ShortChannelID) |
|
if err != nil { |
|
return fmt.Errorf("unable to remove edge with "+ |
|
"chan_id=%v from zombie index: %v", |
|
msg.ShortChannelID, err) |
|
} |
|
|
|
log.Debugf("Removed edge with chan_id=%v from zombie "+ |
|
"index", msg.ShortChannelID) |
|
|
|
return nil |
|
} |
|
|
|
// fetchNodeAnn fetches the latest signed node announcement from our point of |
|
// view for the node with the given public key. |
|
func (d *AuthenticatedGossiper) fetchNodeAnn( |
|
pubKey [33]byte) (*lnwire.NodeAnnouncement, error) { |
|
|
|
node, err := d.cfg.Router.FetchLightningNode(pubKey) |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return node.NodeAnnouncement(true) |
|
} |
|
|
|
// isMsgStale determines whether a message retrieved from the backing |
|
// MessageStore is seen as stale by the current graph. |
|
func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { |
|
switch msg := msg.(type) { |
|
case *lnwire.AnnounceSignatures: |
|
chanInfo, _, _, err := d.cfg.Router.GetChannelByID( |
|
msg.ShortChannelID, |
|
) |
|
|
|
// If the channel cannot be found, it is most likely a leftover |
|
// message for a channel that was closed, so we can consider it |
|
// stale. |
|
if err == channeldb.ErrEdgeNotFound { |
|
return true |
|
} |
|
if err != nil { |
|
log.Debugf("Unable to retrieve channel=%v from graph: "+ |
|
"%v", err) |
|
return false |
|
} |
|
|
|
// If the proof exists in the graph, then we have successfully |
|
// received the remote proof and assembled the full proof, so we |
|
// can safely delete the local proof from the database. |
|
return chanInfo.AuthProof != nil |
|
|
|
case *lnwire.ChannelUpdate: |
|
_, p1, p2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID) |
|
|
|
// If the channel cannot be found, it is most likely a leftover |
|
// message for a channel that was closed, so we can consider it |
|
// stale. |
|
if err == channeldb.ErrEdgeNotFound { |
|
return true |
|
} |
|
if err != nil { |
|
log.Debugf("Unable to retrieve channel=%v from graph: "+ |
|
"%v", msg.ShortChannelID, err) |
|
return false |
|
} |
|
|
|
// Otherwise, we'll retrieve the correct policy that we |
|
// currently have stored within our graph to check if this |
|
// message is stale by comparing its timestamp. |
|
var p *channeldb.ChannelEdgePolicy |
|
if msg.ChannelFlags&lnwire.ChanUpdateDirection == 0 { |
|
p = p1 |
|
} else { |
|
p = p2 |
|
} |
|
|
|
// If the policy is still unknown, then we can consider this |
|
// policy fresh. |
|
if p == nil { |
|
return false |
|
} |
|
|
|
timestamp := time.Unix(int64(msg.Timestamp), 0) |
|
return p.LastUpdate.After(timestamp) |
|
|
|
default: |
|
// We'll make sure to not mark any unsupported messages as stale |
|
// to ensure they are not removed. |
|
return false |
|
} |
|
} |
|
|
|
// updateChannel creates a new fully signed update for the channel, and updates |
|
// the underlying graph with the new state. |
|
func (d *AuthenticatedGossiper) updateChannel(info *channeldb.ChannelEdgeInfo, |
|
edge *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement, |
|
*lnwire.ChannelUpdate, error) { |
|
|
|
// Parse the unsigned edge into a channel update. |
|
chanUpdate := netann.UnsignedChannelUpdateFromEdge(info, edge) |
|
|
|
// We'll generate a new signature over a digest of the channel |
|
// announcement itself and update the timestamp to ensure it propagate. |
|
err := netann.SignChannelUpdate( |
|
d.cfg.AnnSigner, d.selfKey, chanUpdate, |
|
netann.ChanUpdSetTimestamp, |
|
) |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
|
|
// Next, we'll set the new signature in place, and update the reference |
|
// in the backing slice. |
|
edge.LastUpdate = time.Unix(int64(chanUpdate.Timestamp), 0) |
|
edge.SigBytes = chanUpdate.Signature.ToSignatureBytes() |
|
|
|
// To ensure that our signature is valid, we'll verify it ourself |
|
// before committing it to the slice returned. |
|
err = routing.ValidateChannelUpdateAnn(d.selfKey, info.Capacity, chanUpdate) |
|
if err != nil { |
|
return nil, nil, fmt.Errorf("generated invalid channel "+ |
|
"update sig: %v", err) |
|
} |
|
|
|
// Finally, we'll write the new edge policy to disk. |
|
if err := d.cfg.Router.UpdateEdge(edge); err != nil { |
|
return nil, nil, err |
|
} |
|
|
|
// We'll also create the original channel announcement so the two can |
|
// be broadcast along side each other (if necessary), but only if we |
|
// have a full channel announcement for this channel. |
|
var chanAnn *lnwire.ChannelAnnouncement |
|
if info.AuthProof != nil { |
|
chanID := lnwire.NewShortChanIDFromInt(info.ChannelID) |
|
chanAnn = &lnwire.ChannelAnnouncement{ |
|
ShortChannelID: chanID, |
|
NodeID1: info.NodeKey1Bytes, |
|
NodeID2: info.NodeKey2Bytes, |
|
ChainHash: info.ChainHash, |
|
BitcoinKey1: info.BitcoinKey1Bytes, |
|
Features: lnwire.NewRawFeatureVector(), |
|
BitcoinKey2: info.BitcoinKey2Bytes, |
|
ExtraOpaqueData: edge.ExtraOpaqueData, |
|
} |
|
chanAnn.NodeSig1, err = lnwire.NewSigFromRawSignature( |
|
info.AuthProof.NodeSig1Bytes, |
|
) |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
chanAnn.NodeSig2, err = lnwire.NewSigFromRawSignature( |
|
info.AuthProof.NodeSig2Bytes, |
|
) |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
chanAnn.BitcoinSig1, err = lnwire.NewSigFromRawSignature( |
|
info.AuthProof.BitcoinSig1Bytes, |
|
) |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
chanAnn.BitcoinSig2, err = lnwire.NewSigFromRawSignature( |
|
info.AuthProof.BitcoinSig2Bytes, |
|
) |
|
if err != nil { |
|
return nil, nil, err |
|
} |
|
} |
|
|
|
return chanAnn, chanUpdate, err |
|
} |
|
|
|
// SyncManager returns the gossiper's SyncManager instance. |
|
func (d *AuthenticatedGossiper) SyncManager() *SyncManager { |
|
return d.syncMgr |
|
} |
|
|
|
// IsKeepAliveUpdate determines whether this channel update is considered a |
|
// keep-alive update based on the previous channel update processed for the same |
|
// direction. |
|
func IsKeepAliveUpdate(update *lnwire.ChannelUpdate, |
|
prev *channeldb.ChannelEdgePolicy) bool { |
|
|
|
// Both updates should be from the same direction. |
|
if update.ChannelFlags&lnwire.ChanUpdateDirection != |
|
prev.ChannelFlags&lnwire.ChanUpdateDirection { |
|
return false |
|
} |
|
|
|
// The timestamp should always increase for a keep-alive update. |
|
timestamp := time.Unix(int64(update.Timestamp), 0) |
|
if !timestamp.After(prev.LastUpdate) { |
|
return false |
|
} |
|
|
|
// None of the remaining fields should change for a keep-alive update. |
|
if update.ChannelFlags.IsDisabled() != prev.ChannelFlags.IsDisabled() { |
|
return false |
|
} |
|
if lnwire.MilliSatoshi(update.BaseFee) != prev.FeeBaseMSat { |
|
return false |
|
} |
|
if lnwire.MilliSatoshi(update.FeeRate) != prev.FeeProportionalMillionths { |
|
return false |
|
} |
|
if update.TimeLockDelta != prev.TimeLockDelta { |
|
return false |
|
} |
|
if update.HtlcMinimumMsat != prev.MinHTLC { |
|
return false |
|
} |
|
if update.MessageFlags.HasMaxHtlc() && !prev.MessageFlags.HasMaxHtlc() { |
|
return false |
|
} |
|
if update.HtlcMaximumMsat != prev.MaxHTLC { |
|
return false |
|
} |
|
if !bytes.Equal(update.ExtraOpaqueData, prev.ExtraOpaqueData) { |
|
return false |
|
} |
|
return true |
|
} |
|
|
|
// latestHeight returns the gossiper's latest height known of the chain. |
|
func (d *AuthenticatedGossiper) latestHeight() uint32 { |
|
d.Lock() |
|
defer d.Unlock() |
|
return d.bestHeight |
|
}
|
|
|