package discovery import ( "sync" "sync/atomic" "time" "encoding/hex" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcutil" ) // networkMsg couples a routing related wire message with the peer that // originally sent it. type networkMsg struct { msg lnwire.Message isRemote bool peer *btcec.PublicKey } // syncRequest represents a request from an outside subsystem to the wallet to // sync a new node to the latest graph state. type syncRequest struct { node *btcec.PublicKey } // Config defines the configuration for the service. ALL elements within the // configuration MUST be non-nil for the service to carry out its duties. type Config struct { // Router is the subsystem which is responsible for managing the // topology of lightning network. After incoming channel, node, // channel updates announcements are validated they are sent to the // router in order to be included in the LN graph. Router routing.ChannelGraphSource // Notifier is used for receiving notifications of incoming blocks. // With each new incoming block found we process previously premature // announcements. // TODO(roasbeef): could possibly just replace this with an epoch // channel. Notifier chainntnfs.ChainNotifier // Broadcast broadcasts a particular set of announcements to all peers // that the daemon is connected to. If supplied, the exclude parameter // indicates that the target peer should be excluded from the broadcast. Broadcast func(exclude *btcec.PublicKey, msg ...lnwire.Message) error // SendMessages is a function which allows the service to send a set of // messages to a particular peer identified by the target public // key. SendMessages func(target *btcec.PublicKey, msg ...lnwire.Message) error } // New create new discovery service structure. func New(cfg Config) (*Discovery, error) { // TODO(roasbeef): remove this place holder after sigs are properly // stored in the graph. s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" + "1bf0314f882d70220299105481d63e0f4bc2a88121167221b6700d72a0e" + "ad154c03be696a292d24ae" fakeSigHex, err := hex.DecodeString(s) if err != nil { return nil, err } fakeSig, err := btcec.ParseSignature(fakeSigHex, btcec.S256()) if err != nil { return nil, err } return &Discovery{ cfg: &cfg, networkMsgs: make(chan *networkMsg), quit: make(chan bool), syncRequests: make(chan *syncRequest), prematureAnnouncements: make(map[uint32][]*networkMsg), fakeSig: fakeSig, }, nil } // Discovery is a subsystem which is responsible for receiving announcements // validate them and apply the changes to router, syncing lightning network // with newly connected nodes, broadcasting announcements after validation, // negotiating the channel announcement proofs exchange and handling the // premature announcements. type Discovery struct { // Parameters which are needed to properly handle the start and stop // of the service. started uint32 stopped uint32 quit chan bool wg sync.WaitGroup // cfg is a copy of the configuration struct that the discovery service // was initialized with. cfg *Config // newBlocks is a channel in which new blocks connected to the end of // the main chain are sent over. newBlocks <-chan *chainntnfs.BlockEpoch // prematureAnnouncements maps a blockheight to a set of announcements // which are "premature" from our PoV. An message is premature if // it claims to be anchored in a block which is beyond the current main // chain tip as we know it. Premature network messages will be processed // once the chain tip as we know it extends to/past the premature // height. // // TODO(roasbeef): limit premature networkMsgs to N prematureAnnouncements map[uint32][]*networkMsg // networkMsgs is a channel that carries new network broadcasted // message from outside the discovery service to be processed by the // networkHandler. networkMsgs chan *networkMsg // syncRequests is a channel that carries requests to synchronize newly // connected peers to the state of the lightning network topology from // our PoV. syncRequests chan *syncRequest // bestHeight is the height of the block at the tip of the main chain // as we know it. bestHeight uint32 fakeSig *btcec.Signature } // ProcessRemoteAnnouncement sends a new remote announcement message along with // the peer that sent the routing message. The announcement will be processed then // added to a queue for batched trickled announcement to all connected peers. // Remote channel announcements should contain the announcement proof and be // fully validated. func (d *Discovery) ProcessRemoteAnnouncement(msg lnwire.Message, src *btcec.PublicKey) error { aMsg := &networkMsg{ msg: msg, isRemote: true, peer: src, } select { case d.networkMsgs <- aMsg: return nil case <-d.quit: return errors.New("discovery has been shutted down") } } // ProcessLocalAnnouncement sends a new remote announcement message along with // the peer that sent the routing message. The announcement will be processed then // added to a queue for batched trickled announcement to all connected peers. // Local channel announcements not contain the announcement proof and should be // fully validated. The channels proofs will be included farther if nodes agreed // to announce this channel to the rest of the network. func (d *Discovery) ProcessLocalAnnouncement(msg lnwire.Message, src *btcec.PublicKey) error { aMsg := &networkMsg{ msg: msg, isRemote: false, peer: src, } select { case d.networkMsgs <- aMsg: return nil case <-d.quit: return errors.New("discovery has been shutted down") } } // SynchronizeNode sends a message to the service indicating it should // synchronize lightning topology state with the target node. This method // is to be utilized when a node connections for the first time to provide it // with the latest topology update state. func (d *Discovery) SynchronizeNode(pub *btcec.PublicKey) { select { case d.syncRequests <- &syncRequest{ node: pub, }: case <-d.quit: return } } // Start spawns network messages handler goroutine and registers on new block // notifications in order to properly handle the premature announcements. func (d *Discovery) Start() error { if !atomic.CompareAndSwapUint32(&d.started, 0, 1) { return nil } // First we register for new notifications of newly discovered blocks. // We do this immediately so we'll later be able to consume any/all // blocks which were discovered. blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn() if err != nil { return err } d.newBlocks = blockEpochs.Epochs height, err := d.cfg.Router.CurrentBlockHeight() if err != nil { return err } d.bestHeight = height d.wg.Add(1) go d.networkHandler() log.Info("Discovery service is started") return nil } // Stop signals any active goroutines for a graceful closure. func (d *Discovery) Stop() { if !atomic.CompareAndSwapUint32(&d.stopped, 0, 1) { return } close(d.quit) d.wg.Wait() log.Info("Discovery service is stoped.") } // networkHandler is the primary goroutine. The roles of this goroutine include // answering queries related to the state of the network, syncing up newly // connected peers, and also periodically broadcasting our latest topology state // to all connected peers. // // NOTE: This MUST be run as a goroutine. func (d *Discovery) networkHandler() { defer d.wg.Done() var announcementBatch []lnwire.Message // TODO(roasbeef): parametrize the above retransmitTimer := time.NewTicker(time.Minute * 30) defer retransmitTimer.Stop() // TODO(roasbeef): parametrize the above trickleTimer := time.NewTicker(time.Millisecond * 300) defer trickleTimer.Stop() for { select { case announcement := <-d.networkMsgs: // Process the network announcement to determine if // this is either a new announcement from our PoV or an // updates to a prior vertex/edge we previously // accepted. accepted := d.processNetworkAnnouncement(announcement) // If the updates was accepted, then add it to our next // announcement batch to be broadcast once the trickle // timer ticks gain. if accepted { // TODO(roasbeef): exclude peer that sent announcementBatch = append( announcementBatch, announcement.msg, ) } // A new block has arrived, so we can re-process the // previously premature announcements. case newBlock, ok := <-d.newBlocks: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. if !ok { return } // Once a new block arrives, we updates our running // track of the height of the chain tip. blockHeight := uint32(newBlock.Height) d.bestHeight = blockHeight // Next we check if we have any premature announcements // for this height, if so, then we process them once // more as normal announcements. prematureAnns := d.prematureAnnouncements[uint32(newBlock.Height)] if len(prematureAnns) != 0 { log.Infof("Re-processing %v premature "+ "announcements for height %v", len(prematureAnns), blockHeight) } for _, ann := range prematureAnns { accepted := d.processNetworkAnnouncement(ann) if accepted { announcementBatch = append( announcementBatch, ann.msg, ) } } delete(d.prematureAnnouncements, blockHeight) // The trickle timer has ticked, which indicates we should // flush to the network the pending batch of new announcements // we've received since the last trickle tick. case <-trickleTimer.C: // If the current announcement batch is nil, then we // have no further work here. if len(announcementBatch) == 0 { continue } log.Infof("Broadcasting batch of %v new announcements", len(announcementBatch)) // If we have new things to announce then broadcast // them to all our immediately connected peers. err := d.cfg.Broadcast(nil, announcementBatch...) if err != nil { log.Errorf("unable to send batch announcement: %v", err) continue } // If we're able to broadcast the current batch // successfully, then we reset the batch for a new // round of announcements. announcementBatch = nil // The retransmission timer has ticked which indicates that we // should broadcast our personal channels to the network. This // addresses the case of channel advertisements whether being // dropped, or not properly propagated through the network. case <-retransmitTimer.C: var selfChans []lnwire.Message // Iterate over our channels and construct the // announcements array. err := d.cfg.Router.ForAllOutgoingChannels( func(p *channeldb.ChannelEdgePolicy) error { c := &lnwire.ChannelUpdateAnnouncement{ Signature: d.fakeSig, ChannelID: lnwire.NewChanIDFromInt(p.ChannelID), Timestamp: uint32(p.LastUpdate.Unix()), Flags: p.Flags, TimeLockDelta: p.TimeLockDelta, HtlcMinimumMsat: uint32(p.MinHTLC), FeeBaseMsat: uint32(p.FeeBaseMSat), FeeProportionalMillionths: uint32(p.FeeProportionalMillionths), } selfChans = append(selfChans, c) return nil }) if err != nil { log.Errorf("unable to iterate over chann"+ "els: %v", err) continue } else if len(selfChans) == 0 { continue } log.Debugf("Retransmitting %v outgoing channels", len(selfChans)) // With all the wire announcements properly crafted, // we'll broadcast our known outgoing channel to all our // immediate peers. if err := d.cfg.Broadcast(nil, selfChans...); err != nil { log.Errorf("unable to re-broadcast "+ "channels: %v", err) } // We've just received a new request to synchronize a peer with // our latest lightning network topology state. This indicates // that a peer has just connected for the first time, so for now // we dump our entire network graph and allow them to sift // through the (subjectively) new information on their own. case syncReq := <-d.syncRequests: nodePub := syncReq.node.SerializeCompressed() log.Infof("Synchronizing channel graph with %x", nodePub) if err := d.synchronize(syncReq); err != nil { log.Errorf("unable to sync graph state with %x: %v", nodePub, err) } // The discovery has been signalled to exit, to we exit our main // loop so the wait group can be decremented. case <-d.quit: return } } } // processNetworkAnnouncement processes a new network relate authenticated // channel or node announcement. If the updates didn't affect the internal state // of the draft due to either being out of date, invalid, or redundant, then // false is returned. Otherwise, true is returned indicating that the caller // may want to batch this request to be broadcast to immediate peers during the // next announcement epoch. func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { isPremature := func(chanID *lnwire.ChannelID) bool { return chanID.BlockHeight > d.bestHeight } switch msg := aMsg.msg.(type) { // A new node announcement has arrived which either presents a new // node, or a node updating previously advertised information. case *lnwire.NodeAnnouncement: if aMsg.isRemote { // TODO(andrew.shvv) Add node validation } node := &channeldb.LightningNode{ LastUpdate: time.Unix(int64(msg.Timestamp), 0), Addresses: msg.Addresses, PubKey: msg.NodeID, Alias: msg.Alias.String(), AuthSig: msg.Signature, Features: msg.Features, } if err := d.cfg.Router.AddNode(node); err != nil { log.Errorf("unable to add node: %v", err) return false } // A new channel announcement has arrived, this indicates the // *creation* of a new channel within the network. This only advertises // the existence of a channel and not yet the routing policies in // either direction of the channel. case *lnwire.ChannelAnnouncement: // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. if isPremature(&msg.ChannelID) { blockHeight := msg.ChannelID.BlockHeight log.Infof("Announcement for chan_id=(%v), is "+ "premature: advertises height %v, only height "+ "%v is known", msg.ChannelID, msg.ChannelID.BlockHeight, d.bestHeight) d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], aMsg, ) return false } var proof *channeldb.ChannelAuthProof if aMsg.isRemote { // TODO(andrew.shvv) Add channel validation } proof = &channeldb.ChannelAuthProof{ NodeSig1: msg.FirstNodeSig, NodeSig2: msg.SecondNodeSig, BitcoinSig1: msg.FirstBitcoinSig, BitcoinSig2: msg.SecondBitcoinSig, } edge := &channeldb.ChannelEdgeInfo{ ChannelID: msg.ChannelID.ToUint64(), NodeKey1: msg.FirstNodeID, NodeKey2: msg.SecondNodeID, BitcoinKey1: msg.FirstBitcoinKey, BitcoinKey2: msg.SecondBitcoinKey, AuthProof: proof, } if err := d.cfg.Router.AddEdge(edge); err != nil { if !routing.IsError(err, routing.ErrOutdated) { log.Errorf("unable to add edge: %v", err) } else { log.Info("Unable to add edge: %v", err) } return false } // A new authenticated channel updates has arrived, this indicates // that the directional information for an already known channel has // been updated. case *lnwire.ChannelUpdateAnnouncement: chanID := msg.ChannelID.ToUint64() // If the advertised inclusionary block is beyond our knowledge // of the chain tip, then we'll put the announcement in limbo // to be fully verified once we advance forward in the chain. if isPremature(&msg.ChannelID) { blockHeight := msg.ChannelID.BlockHeight log.Infof("Update announcement for chan_id=(%v), is "+ "premature: advertises height %v, only height "+ "%v is known", chanID, blockHeight, d.bestHeight) d.prematureAnnouncements[blockHeight] = append( d.prematureAnnouncements[blockHeight], aMsg, ) return false } if aMsg.isRemote { // TODO(andrew.shvv) Add update channel validation } // TODO(roasbeef): should be msat here update := &channeldb.ChannelEdgePolicy{ ChannelID: chanID, LastUpdate: time.Unix(int64(msg.Timestamp), 0), Flags: msg.Flags, TimeLockDelta: msg.TimeLockDelta, MinHTLC: btcutil.Amount(msg.HtlcMinimumMsat), FeeBaseMSat: btcutil.Amount(msg.FeeBaseMsat), FeeProportionalMillionths: btcutil.Amount(msg.FeeProportionalMillionths), } if err := d.cfg.Router.UpdateEdge(update); err != nil { log.Errorf("unable to update edge: %v", err) return false } } return true } // synchronize attempts to synchronize the target node in the syncReq to // the latest channel graph state. In order to accomplish this, (currently) the // entire network graph is read from disk, then serialized to the format // defined within the current wire protocol. This cache of graph data is then // sent directly to the target node. func (d *Discovery) synchronize(syncReq *syncRequest) error { targetNode := syncReq.node // TODO(roasbeef): need to also store sig data in db // * will be nice when we switch to pairing sigs would only need one ^_^ // We'll collate all the gathered routing messages into a single slice // containing all the messages to be sent to the target peer. var announceMessages []lnwire.Message // First run through all the vertexes in the graph, retrieving the data // for the announcement we originally retrieved. var numNodes uint32 if err := d.cfg.Router.ForEachNode(func(node *channeldb.LightningNode) error { alias, err := lnwire.NewAlias(node.Alias) if err != nil { return err } ann := &lnwire.NodeAnnouncement{ Signature: d.fakeSig, Timestamp: uint32(node.LastUpdate.Unix()), Addresses: node.Addresses, NodeID: node.PubKey, Alias: alias, Features: node.Features, } announceMessages = append(announceMessages, ann) numNodes++ return nil }); err != nil { return err } // With the vertexes gathered, we'll no retrieve the initial // announcement, as well as the latest channel update announcement for // both of the directed edges that make up the channel. var numEdges uint32 if err := d.cfg.Router.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo, e1, e2 *channeldb.ChannelEdgePolicy) error { chanID := lnwire.NewChanIDFromInt(chanInfo.ChannelID) // First, using the parameters of the channel, along with the // channel authentication proof, we'll create re-create the // original authenticated channel announcement. // TODO(andrew.shvv) skip if proof is nil authProof := chanInfo.AuthProof chanAnn := &lnwire.ChannelAnnouncement{ FirstNodeSig: authProof.NodeSig1, SecondNodeSig: authProof.NodeSig2, ChannelID: chanID, FirstBitcoinSig: authProof.BitcoinSig1, SecondBitcoinSig: authProof.BitcoinSig2, FirstNodeID: chanInfo.NodeKey1, SecondNodeID: chanInfo.NodeKey2, FirstBitcoinKey: chanInfo.BitcoinKey1, SecondBitcoinKey: chanInfo.BitcoinKey2, } announceMessages = append(announceMessages, chanAnn) // Since it's up to a node's policy as to whether they // advertise the edge in dire direction, we don't create an // advertisement if the edge is nil. if e1 != nil { announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ Signature: d.fakeSig, ChannelID: chanID, Timestamp: uint32(e1.LastUpdate.Unix()), Flags: 0, TimeLockDelta: e1.TimeLockDelta, HtlcMinimumMsat: uint32(e1.MinHTLC), FeeBaseMsat: uint32(e1.FeeBaseMSat), FeeProportionalMillionths: uint32(e1.FeeProportionalMillionths), }) } if e2 != nil { announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ Signature: d.fakeSig, ChannelID: chanID, Timestamp: uint32(e2.LastUpdate.Unix()), Flags: 1, TimeLockDelta: e2.TimeLockDelta, HtlcMinimumMsat: uint32(e2.MinHTLC), FeeBaseMsat: uint32(e2.FeeBaseMSat), FeeProportionalMillionths: uint32(e2.FeeProportionalMillionths), }) } numEdges++ return nil }); err != nil && err != channeldb.ErrGraphNoEdgesFound { log.Errorf("unable to sync edges w/ peer: %v", err) return err } log.Infof("Syncing channel graph state with %x, sending %v "+ "nodes and %v edges", targetNode.SerializeCompressed(), numNodes, numEdges) // With all the announcement messages gathered, send them all in a // single batch to the target peer. return d.cfg.SendMessages(targetNode, announceMessages...) }