diff --git a/discovery/log.go b/discovery/log.go new file mode 100644 index 00000000..08fa7e07 --- /dev/null +++ b/discovery/log.go @@ -0,0 +1,72 @@ +package discovery + +import ( + "errors" + "io" + + "github.com/btcsuite/btclog" + btcwallet "github.com/roasbeef/btcwallet/wallet" +) + +// log is a logger that is initialized with no output filters. This +// means the package will not perform any logging by default until the caller +// requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + DisableLog() +} + +// DisableLog disables all library log output. Logging output is disabled +// by default until either UseLogger or SetLogWriter are called. +func DisableLog() { + log = btclog.Disabled +} + +// UseLogger uses a specified Logger to output package logging info. +// This should be used in preference to SetLogWriter if the caller is also +// using btclog. +func UseLogger(logger btclog.Logger) { + log = logger + btcwallet.UseLogger(logger) +} + +// SetLogWriter uses a specified io.Writer to output package logging info. +// This allows a caller to direct package logging output without needing a +// dependency on seelog. If the caller is also using btclog, UseLogger should +// be used instead. +func SetLogWriter(w io.Writer, level string) error { + if w == nil { + return errors.New("nil writer") + } + + lvl, ok := btclog.LogLevelFromString(level) + if !ok { + return errors.New("invalid log level") + } + + l, err := btclog.NewLoggerFromWriter(w, lvl) + if err != nil { + return err + } + + UseLogger(l) + return nil +} + +// logClosure is used to provide a closure over expensive logging operations +// so don't have to be performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/discovery/service.go b/discovery/service.go new file mode 100644 index 00000000..44f83067 --- /dev/null +++ b/discovery/service.go @@ -0,0 +1,643 @@ +package discovery + +import ( + "sync" + "sync/atomic" + "time" + + "encoding/hex" + + "github.com/go-errors/errors" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcutil" +) + +// networkMsg couples a routing related wire message with the peer that +// originally sent it. +type networkMsg struct { + msg lnwire.Message + isRemote bool + peer *btcec.PublicKey +} + +// syncRequest represents a request from an outside subsystem to the wallet to +// sync a new node to the latest graph state. +type syncRequest struct { + node *btcec.PublicKey +} + +// Config defines the configuration for the service. ALL elements within the +// configuration MUST be non-nil for the service to carry out its duties. +type Config struct { + // Router is the subsystem which is responsible for managing the + // topology of lightning network. After incoming channel, node, + // channel updates announcements are validated they are sent to the + // router in order to be included in the LN graph. + Router routing.ChannelGraphSource + + // Notifier is used for receiving notifications of incoming blocks. + // With each new incoming block found we process previously premature + // announcements. + // TODO(roasbeef): could possibly just replace this with an epoch + // channel. + Notifier chainntnfs.ChainNotifier + + // Broadcast broadcasts a particular set of announcements to all peers + // that the daemon is connected to. If supplied, the exclude parameter + // indicates that the target peer should be excluded from the broadcast. + Broadcast func(exclude *btcec.PublicKey, msg ...lnwire.Message) error + + // SendMessages is a function which allows the service to send a set of + // messages to a particular peer identified by the target public + // key. + SendMessages func(target *btcec.PublicKey, msg ...lnwire.Message) error +} + +// New create new discovery service structure. +func New(cfg Config) (*Discovery, error) { + // TODO(roasbeef): remove this place holder after sigs are properly + // stored in the graph. + s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" + + "1bf0314f882d70220299105481d63e0f4bc2a88121167221b6700d72a0e" + + "ad154c03be696a292d24ae" + fakeSigHex, err := hex.DecodeString(s) + if err != nil { + return nil, err + } + fakeSig, err := btcec.ParseSignature(fakeSigHex, btcec.S256()) + if err != nil { + return nil, err + } + + return &Discovery{ + cfg: &cfg, + networkMsgs: make(chan *networkMsg), + quit: make(chan bool), + syncRequests: make(chan *syncRequest), + prematureAnnouncements: make(map[uint32][]*networkMsg), + fakeSig: fakeSig, + }, nil +} + +// Discovery is a subsystem which is responsible for receiving announcements +// validate them and apply the changes to router, syncing lightning network +// with newly connected nodes, broadcasting announcements after validation, +// negotiating the channel announcement proofs exchange and handling the +// premature announcements. +type Discovery struct { + // Parameters which are needed to properly handle the start and stop + // of the service. + started uint32 + stopped uint32 + quit chan bool + wg sync.WaitGroup + + // cfg is a copy of the configuration struct that the discovery service + // was initialized with. + cfg *Config + + // newBlocks is a channel in which new blocks connected to the end of + // the main chain are sent over. + newBlocks <-chan *chainntnfs.BlockEpoch + + // prematureAnnouncements maps a blockheight to a set of announcements + // which are "premature" from our PoV. An message is premature if + // it claims to be anchored in a block which is beyond the current main + // chain tip as we know it. Premature network messages will be processed + // once the chain tip as we know it extends to/past the premature + // height. + // + // TODO(roasbeef): limit premature networkMsgs to N + prematureAnnouncements map[uint32][]*networkMsg + + // networkMsgs is a channel that carries new network broadcasted + // message from outside the discovery service to be processed by the + // networkHandler. + networkMsgs chan *networkMsg + + // syncRequests is a channel that carries requests to synchronize newly + // connected peers to the state of the lightning network topology from + // our PoV. + syncRequests chan *syncRequest + + // bestHeight is the height of the block at the tip of the main chain + // as we know it. + bestHeight uint32 + + fakeSig *btcec.Signature +} + +// ProcessRemoteAnnouncement sends a new remote announcement message along with +// the peer that sent the routing message. The announcement will be processed then +// added to a queue for batched trickled announcement to all connected peers. +// Remote channel announcements should contain the announcement proof and be +// fully validated. +func (d *Discovery) ProcessRemoteAnnouncement(msg lnwire.Message, + src *btcec.PublicKey) error { + + aMsg := &networkMsg{ + msg: msg, + isRemote: true, + peer: src, + } + + select { + case d.networkMsgs <- aMsg: + return nil + case <-d.quit: + return errors.New("discovery has been shutted down") + } +} + +// ProcessLocalAnnouncement sends a new remote announcement message along with +// the peer that sent the routing message. The announcement will be processed then +// added to a queue for batched trickled announcement to all connected peers. +// Local channel announcements not contain the announcement proof and should be +// fully validated. The channels proofs will be included farther if nodes agreed +// to announce this channel to the rest of the network. +func (d *Discovery) ProcessLocalAnnouncement(msg lnwire.Message, + src *btcec.PublicKey) error { + + aMsg := &networkMsg{ + msg: msg, + isRemote: false, + peer: src, + } + + select { + case d.networkMsgs <- aMsg: + return nil + case <-d.quit: + return errors.New("discovery has been shutted down") + } +} + +// SynchronizeNode sends a message to the service indicating it should +// synchronize lightning topology state with the target node. This method +// is to be utilized when a node connections for the first time to provide it +// with the latest topology update state. +func (d *Discovery) SynchronizeNode(pub *btcec.PublicKey) { + select { + case d.syncRequests <- &syncRequest{ + node: pub, + }: + case <-d.quit: + return + } +} + +// Start spawns network messages handler goroutine and registers on new block +// notifications in order to properly handle the premature announcements. +func (d *Discovery) Start() error { + if !atomic.CompareAndSwapUint32(&d.started, 0, 1) { + return nil + } + + // First we register for new notifications of newly discovered blocks. + // We do this immediately so we'll later be able to consume any/all + // blocks which were discovered. + blockEpochs, err := d.cfg.Notifier.RegisterBlockEpochNtfn() + if err != nil { + return err + } + d.newBlocks = blockEpochs.Epochs + + height, err := d.cfg.Router.CurrentBlockHeight() + if err != nil { + return err + } + d.bestHeight = height + + d.wg.Add(1) + go d.networkHandler() + + log.Info("Discovery service is started") + return nil +} + +// Stop signals any active goroutines for a graceful closure. +func (d *Discovery) Stop() { + if !atomic.CompareAndSwapUint32(&d.stopped, 0, 1) { + return + } + + close(d.quit) + d.wg.Wait() + log.Info("Discovery service is stoped.") +} + +// networkHandler is the primary goroutine. The roles of this goroutine include +// answering queries related to the state of the network, syncing up newly +// connected peers, and also periodically broadcasting our latest topology state +// to all connected peers. +// +// NOTE: This MUST be run as a goroutine. +func (d *Discovery) networkHandler() { + defer d.wg.Done() + + var announcementBatch []lnwire.Message + + // TODO(roasbeef): parametrize the above + retransmitTimer := time.NewTicker(time.Minute * 30) + defer retransmitTimer.Stop() + + // TODO(roasbeef): parametrize the above + trickleTimer := time.NewTicker(time.Millisecond * 300) + defer trickleTimer.Stop() + + for { + select { + case announcement := <-d.networkMsgs: + // Process the network announcement to determine if + // this is either a new announcement from our PoV or an + // updates to a prior vertex/edge we previously + // accepted. + accepted := d.processNetworkAnnouncement(announcement) + + // If the updates was accepted, then add it to our next + // announcement batch to be broadcast once the trickle + // timer ticks gain. + if accepted { + // TODO(roasbeef): exclude peer that sent + announcementBatch = append( + announcementBatch, + announcement.msg, + ) + } + + // A new block has arrived, so we can re-process the + // previously premature announcements. + case newBlock, ok := <-d.newBlocks: + // If the channel has been closed, then this indicates + // the daemon is shutting down, so we exit ourselves. + if !ok { + return + } + + // Once a new block arrives, we updates our running + // track of the height of the chain tip. + blockHeight := uint32(newBlock.Height) + d.bestHeight = blockHeight + + // Next we check if we have any premature announcements + // for this height, if so, then we process them once + // more as normal announcements. + prematureAnns := d.prematureAnnouncements[uint32(newBlock.Height)] + if len(prematureAnns) != 0 { + log.Infof("Re-processing %v premature "+ + "announcements for height %v", + len(prematureAnns), blockHeight) + } + + for _, ann := range prematureAnns { + accepted := d.processNetworkAnnouncement(ann) + + if accepted { + announcementBatch = append( + announcementBatch, + ann.msg, + ) + } + } + delete(d.prematureAnnouncements, blockHeight) + + // The trickle timer has ticked, which indicates we should + // flush to the network the pending batch of new announcements + // we've received since the last trickle tick. + case <-trickleTimer.C: + // If the current announcement batch is nil, then we + // have no further work here. + if len(announcementBatch) == 0 { + continue + } + + log.Infof("Broadcasting batch of %v new announcements", + len(announcementBatch)) + + // If we have new things to announce then broadcast + // them to all our immediately connected peers. + err := d.cfg.Broadcast(nil, announcementBatch...) + if err != nil { + log.Errorf("unable to send batch announcement: %v", err) + continue + } + + // If we're able to broadcast the current batch + // successfully, then we reset the batch for a new + // round of announcements. + announcementBatch = nil + + // The retransmission timer has ticked which indicates that we + // should broadcast our personal channels to the network. This + // addresses the case of channel advertisements whether being + // dropped, or not properly propagated through the network. + case <-retransmitTimer.C: + var selfChans []lnwire.Message + + // Iterate over our channels and construct the + // announcements array. + err := d.cfg.Router.ForAllOutgoingChannels( + func(p *channeldb.ChannelEdgePolicy) error { + c := &lnwire.ChannelUpdateAnnouncement{ + Signature: d.fakeSig, + ChannelID: lnwire.NewChanIDFromInt(p.ChannelID), + Timestamp: uint32(p.LastUpdate.Unix()), + Flags: p.Flags, + TimeLockDelta: p.TimeLockDelta, + HtlcMinimumMsat: uint32(p.MinHTLC), + FeeBaseMsat: uint32(p.FeeBaseMSat), + FeeProportionalMillionths: uint32(p.FeeProportionalMillionths), + } + selfChans = append(selfChans, c) + return nil + }) + if err != nil { + log.Errorf("unable to iterate over chann"+ + "els: %v", err) + continue + } else if len(selfChans) == 0 { + continue + } + + log.Debugf("Retransmitting %v outgoing channels", + len(selfChans)) + + // With all the wire announcements properly crafted, + // we'll broadcast our known outgoing channel to all our + // immediate peers. + if err := d.cfg.Broadcast(nil, selfChans...); err != nil { + log.Errorf("unable to re-broadcast "+ + "channels: %v", err) + } + + // We've just received a new request to synchronize a peer with + // our latest lightning network topology state. This indicates + // that a peer has just connected for the first time, so for now + // we dump our entire network graph and allow them to sift + // through the (subjectively) new information on their own. + case syncReq := <-d.syncRequests: + nodePub := syncReq.node.SerializeCompressed() + log.Infof("Synchronizing channel graph with %x", nodePub) + + if err := d.synchronize(syncReq); err != nil { + log.Errorf("unable to sync graph state with %x: %v", + nodePub, err) + } + + // The discovery has been signalled to exit, to we exit our main + // loop so the wait group can be decremented. + case <-d.quit: + return + } + } +} + +// processNetworkAnnouncement processes a new network relate authenticated +// channel or node announcement. If the updates didn't affect the internal state +// of the draft due to either being out of date, invalid, or redundant, then +// false is returned. Otherwise, true is returned indicating that the caller +// may want to batch this request to be broadcast to immediate peers during the +// next announcement epoch. +func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool { + isPremature := func(chanID *lnwire.ChannelID) bool { + return chanID.BlockHeight > d.bestHeight + } + + switch msg := aMsg.msg.(type) { + + // A new node announcement has arrived which either presents a new + // node, or a node updating previously advertised information. + case *lnwire.NodeAnnouncement: + if aMsg.isRemote { + // TODO(andrew.shvv) Add node validation + } + + node := &channeldb.LightningNode{ + LastUpdate: time.Unix(int64(msg.Timestamp), 0), + Addresses: msg.Addresses, + PubKey: msg.NodeID, + Alias: msg.Alias.String(), + AuthSig: msg.Signature, + Features: msg.Features, + } + + if err := d.cfg.Router.AddNode(node); err != nil { + log.Errorf("unable to add node: %v", err) + return false + } + + // A new channel announcement has arrived, this indicates the + // *creation* of a new channel within the network. This only advertises + // the existence of a channel and not yet the routing policies in + // either direction of the channel. + case *lnwire.ChannelAnnouncement: + // If the advertised inclusionary block is beyond our knowledge + // of the chain tip, then we'll put the announcement in limbo + // to be fully verified once we advance forward in the chain. + if isPremature(&msg.ChannelID) { + blockHeight := msg.ChannelID.BlockHeight + log.Infof("Announcement for chan_id=(%v), is "+ + "premature: advertises height %v, only height "+ + "%v is known", msg.ChannelID, msg.ChannelID.BlockHeight, + d.bestHeight) + + d.prematureAnnouncements[blockHeight] = append( + d.prematureAnnouncements[blockHeight], + aMsg, + ) + return false + } + + var proof *channeldb.ChannelAuthProof + if aMsg.isRemote { + // TODO(andrew.shvv) Add channel validation + } + + proof = &channeldb.ChannelAuthProof{ + NodeSig1: msg.FirstNodeSig, + NodeSig2: msg.SecondNodeSig, + BitcoinSig1: msg.FirstBitcoinSig, + BitcoinSig2: msg.SecondBitcoinSig, + } + + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: msg.ChannelID.ToUint64(), + NodeKey1: msg.FirstNodeID, + NodeKey2: msg.SecondNodeID, + BitcoinKey1: msg.FirstBitcoinKey, + BitcoinKey2: msg.SecondBitcoinKey, + AuthProof: proof, + } + + if err := d.cfg.Router.AddEdge(edge); err != nil { + if !routing.IsError(err, routing.ErrOutdated) { + log.Errorf("unable to add edge: %v", err) + } else { + log.Info("Unable to add edge: %v", err) + } + + return false + } + + // A new authenticated channel updates has arrived, this indicates + // that the directional information for an already known channel has + // been updated. + case *lnwire.ChannelUpdateAnnouncement: + chanID := msg.ChannelID.ToUint64() + + // If the advertised inclusionary block is beyond our knowledge + // of the chain tip, then we'll put the announcement in limbo + // to be fully verified once we advance forward in the chain. + if isPremature(&msg.ChannelID) { + blockHeight := msg.ChannelID.BlockHeight + log.Infof("Update announcement for chan_id=(%v), is "+ + "premature: advertises height %v, only height "+ + "%v is known", chanID, blockHeight, + d.bestHeight) + + d.prematureAnnouncements[blockHeight] = append( + d.prematureAnnouncements[blockHeight], + aMsg, + ) + return false + } + + if aMsg.isRemote { + // TODO(andrew.shvv) Add update channel validation + } + + // TODO(roasbeef): should be msat here + update := &channeldb.ChannelEdgePolicy{ + ChannelID: chanID, + LastUpdate: time.Unix(int64(msg.Timestamp), 0), + Flags: msg.Flags, + TimeLockDelta: msg.TimeLockDelta, + MinHTLC: btcutil.Amount(msg.HtlcMinimumMsat), + FeeBaseMSat: btcutil.Amount(msg.FeeBaseMsat), + FeeProportionalMillionths: btcutil.Amount(msg.FeeProportionalMillionths), + } + + if err := d.cfg.Router.UpdateEdge(update); err != nil { + log.Errorf("unable to update edge: %v", err) + return false + } + } + + return true +} + +// synchronize attempts to synchronize the target node in the syncReq to +// the latest channel graph state. In order to accomplish this, (currently) the +// entire network graph is read from disk, then serialized to the format +// defined within the current wire protocol. This cache of graph data is then +// sent directly to the target node. +func (d *Discovery) synchronize(syncReq *syncRequest) error { + targetNode := syncReq.node + + // TODO(roasbeef): need to also store sig data in db + // * will be nice when we switch to pairing sigs would only need one ^_^ + + // We'll collate all the gathered routing messages into a single slice + // containing all the messages to be sent to the target peer. + var announceMessages []lnwire.Message + + // First run through all the vertexes in the graph, retrieving the data + // for the announcement we originally retrieved. + var numNodes uint32 + if err := d.cfg.Router.ForEachNode(func(node *channeldb.LightningNode) error { + alias, err := lnwire.NewAlias(node.Alias) + if err != nil { + return err + } + + ann := &lnwire.NodeAnnouncement{ + Signature: d.fakeSig, + Timestamp: uint32(node.LastUpdate.Unix()), + Addresses: node.Addresses, + NodeID: node.PubKey, + Alias: alias, + Features: node.Features, + } + announceMessages = append(announceMessages, ann) + + numNodes++ + + return nil + }); err != nil { + return err + } + + // With the vertexes gathered, we'll no retrieve the initial + // announcement, as well as the latest channel update announcement for + // both of the directed edges that make up the channel. + var numEdges uint32 + if err := d.cfg.Router.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo, + e1, e2 *channeldb.ChannelEdgePolicy) error { + + chanID := lnwire.NewChanIDFromInt(chanInfo.ChannelID) + + // First, using the parameters of the channel, along with the + // channel authentication proof, we'll create re-create the + // original authenticated channel announcement. + // TODO(andrew.shvv) skip if proof is nil + authProof := chanInfo.AuthProof + chanAnn := &lnwire.ChannelAnnouncement{ + FirstNodeSig: authProof.NodeSig1, + SecondNodeSig: authProof.NodeSig2, + ChannelID: chanID, + FirstBitcoinSig: authProof.BitcoinSig1, + SecondBitcoinSig: authProof.BitcoinSig2, + FirstNodeID: chanInfo.NodeKey1, + SecondNodeID: chanInfo.NodeKey2, + FirstBitcoinKey: chanInfo.BitcoinKey1, + SecondBitcoinKey: chanInfo.BitcoinKey2, + } + announceMessages = append(announceMessages, chanAnn) + + // Since it's up to a node's policy as to whether they + // advertise the edge in dire direction, we don't create an + // advertisement if the edge is nil. + if e1 != nil { + announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ + Signature: d.fakeSig, + ChannelID: chanID, + Timestamp: uint32(e1.LastUpdate.Unix()), + Flags: 0, + TimeLockDelta: e1.TimeLockDelta, + HtlcMinimumMsat: uint32(e1.MinHTLC), + FeeBaseMsat: uint32(e1.FeeBaseMSat), + FeeProportionalMillionths: uint32(e1.FeeProportionalMillionths), + }) + } + if e2 != nil { + announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ + Signature: d.fakeSig, + ChannelID: chanID, + Timestamp: uint32(e2.LastUpdate.Unix()), + Flags: 1, + TimeLockDelta: e2.TimeLockDelta, + HtlcMinimumMsat: uint32(e2.MinHTLC), + FeeBaseMsat: uint32(e2.FeeBaseMSat), + FeeProportionalMillionths: uint32(e2.FeeProportionalMillionths), + }) + } + + numEdges++ + return nil + }); err != nil && err != channeldb.ErrGraphNoEdgesFound { + log.Errorf("unable to sync edges w/ peer: %v", err) + return err + } + + log.Infof("Syncing channel graph state with %x, sending %v "+ + "nodes and %v edges", targetNode.SerializeCompressed(), + numNodes, numEdges) + + // With all the announcement messages gathered, send them all in a + // single batch to the target peer. + return d.cfg.SendMessages(targetNode, announceMessages...) +} diff --git a/discovery/service_test.go b/discovery/service_test.go new file mode 100644 index 00000000..856fbc2c --- /dev/null +++ b/discovery/service_test.go @@ -0,0 +1,373 @@ +package discovery + +import ( + "fmt" + "net" + "sync" + + prand "math/rand" + + "testing" + + "math/big" + + "time" + + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing" + "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/wire" +) + +var ( + testAddr = &net.TCPAddr{IP: (net.IP)([]byte{0xA, 0x0, 0x0, 0x1}), + Port: 9000} + testAddrs = []net.Addr{testAddr} + testFeatures = lnwire.NewFeatureVector([]lnwire.Feature{}) + testSig = &btcec.Signature{ + R: new(big.Int), + S: new(big.Int), + } + _, _ = testSig.R.SetString("63724406601629180062774974542967536251589935445068131219452686511677818569431", 10) + _, _ = testSig.S.SetString("18801056069249825825291287104931333862866033135609736119018462340006816851118", 10) +) + +type mockGraphSource struct { + nodes []*channeldb.LightningNode + edges []*channeldb.ChannelEdgeInfo + updates []*channeldb.ChannelEdgePolicy + bestHeight uint32 +} + +func newMockRouter(height uint32) *mockGraphSource { + return &mockGraphSource{ + bestHeight: height, + } +} + +var _ routing.ChannelGraphSource = (*mockGraphSource)(nil) + +func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error { + r.nodes = append(r.nodes, node) + return nil +} + +func (r *mockGraphSource) AddEdge(edge *channeldb.ChannelEdgeInfo) error { + r.edges = append(r.edges, edge) + return nil +} + +func (r *mockGraphSource) UpdateEdge(policy *channeldb.ChannelEdgePolicy) error { + r.updates = append(r.updates, policy) + return nil +} + +func (r *mockGraphSource) AddProof(chanID uint8, + proof *channeldb.ChannelAuthProof) error { + return nil +} + +func (r *mockGraphSource) SelfEdges() ([]*channeldb.ChannelEdgePolicy, error) { + return nil, nil +} + +func (r *mockGraphSource) CurrentBlockHeight() (uint32, error) { + return r.bestHeight, nil +} + +func (r *mockGraphSource) ForEachNode(func(node *channeldb.LightningNode) error) error { + return nil +} + +func (r *mockGraphSource) ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgePolicy) error) error { + return nil +} + +func (r *mockGraphSource) ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo, + e1, e2 *channeldb.ChannelEdgePolicy) error) error { + return nil +} + +type mockNotifier struct { + clientCounter uint32 + epochClients map[uint32]chan *chainntnfs.BlockEpoch + + sync.RWMutex +} + +func newMockNotifier() *mockNotifier { + return &mockNotifier{ + epochClients: make(map[uint32]chan *chainntnfs.BlockEpoch), + } +} + +func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, + numConfs uint32) (*chainntnfs.ConfirmationEvent, error) { + + return nil, nil +} + +func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) { + return nil, nil +} + +func (m *mockNotifier) notifyBlock(hash chainhash.Hash, height uint32) { + m.RLock() + defer m.RUnlock() + + for _, client := range m.epochClients { + client <- &chainntnfs.BlockEpoch{ + Height: int32(height), + Hash: &hash, + } + } +} + +func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { + m.RLock() + defer m.RUnlock() + + epochChan := make(chan *chainntnfs.BlockEpoch) + clientID := m.clientCounter + m.clientCounter++ + m.epochClients[clientID] = epochChan + + return &chainntnfs.BlockEpochEvent{ + Epochs: epochChan, + Cancel: func() {}, + }, nil +} + +func (m *mockNotifier) Start() error { + return nil +} + +func (m *mockNotifier) Stop() error { + return nil +} + +func createNodeAnnouncement() (*lnwire.NodeAnnouncement, + error) { + priv, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + return nil, err + } + + pub := priv.PubKey().SerializeCompressed() + + alias, err := lnwire.NewAlias("kek" + string(pub[:])) + if err != nil { + return nil, err + } + + return &lnwire.NodeAnnouncement{ + Timestamp: uint32(prand.Int31()), + Addresses: testAddrs, + NodeID: priv.PubKey(), + Alias: alias, + Features: testFeatures, + }, nil +} + +func createUpdateAnnouncement(blockHeight uint32) *lnwire.ChannelUpdateAnnouncement { + return &lnwire.ChannelUpdateAnnouncement{ + Signature: testSig, + ChannelID: lnwire.ChannelID{ + BlockHeight: blockHeight, + }, + Timestamp: uint32(prand.Int31()), + TimeLockDelta: uint16(prand.Int63()), + HtlcMinimumMsat: uint32(prand.Int31()), + FeeBaseMsat: uint32(prand.Int31()), + FeeProportionalMillionths: uint32(prand.Int31()), + } +} + +func createChannelAnnouncement(blockHeight uint32) *lnwire.ChannelAnnouncement { + // Our fake channel will be "confirmed" at height 101. + chanID := lnwire.ChannelID{ + BlockHeight: blockHeight, + TxIndex: 0, + TxPosition: 0, + } + + return &lnwire.ChannelAnnouncement{ + ChannelID: chanID, + } +} + +type testCtx struct { + discovery *Discovery + router *mockGraphSource + notifier *mockNotifier + + broadcastedMessage chan lnwire.Message +} + +func createTestCtx(startHeight uint32) (*testCtx, func(), error) { + // Next we'll initialize an instance of the channel router with mock + // versions of the chain and channel notifier. As we don't need to test + // any p2p functionality, the peer send and switch send, + // broadcast functions won't be populated. + notifier := newMockNotifier() + router := newMockRouter(startHeight) + + broadcastedMessage := make(chan lnwire.Message) + discovery, err := New(Config{ + Notifier: notifier, + Broadcast: func(_ *btcec.PublicKey, msgs ...lnwire.Message) error { + for _, msg := range msgs { + broadcastedMessage <- msg + } + return nil + }, + SendMessages: nil, + Router: router, + }) + if err != nil { + return nil, nil, fmt.Errorf("unable to create router %v", err) + } + if err := discovery.Start(); err != nil { + return nil, nil, fmt.Errorf("unable to start router: %v", err) + } + + cleanUp := func() { + discovery.Stop() + } + + return &testCtx{ + router: router, + notifier: notifier, + discovery: discovery, + broadcastedMessage: broadcastedMessage, + }, cleanUp, nil +} + +// TestProcessAnnouncement checks that mature announcements are propagated to +// the router subsystem. +func TestProcessAnnouncement(t *testing.T) { + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + priv, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("can't create node pub key: %v", err) + } + nodePub := priv.PubKey() + + na, err := createNodeAnnouncement() + if err != nil { + t.Fatalf("can't create node announcement: %v", err) + } + ctx.discovery.ProcessRemoteAnnouncement(na, nodePub) + + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcememt wasn't proceeded") + } + + if len(ctx.router.nodes) != 1 { + t.Fatalf("node wasn't added to router: %v", err) + } + + ca := createChannelAnnouncement(0) + ctx.discovery.ProcessRemoteAnnouncement(ca, nodePub) + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcememt wasn't proceeded") + } + + if len(ctx.router.edges) != 1 { + t.Fatalf("edge wasn't added to router: %v", err) + } + + ua := createUpdateAnnouncement(0) + ctx.discovery.ProcessRemoteAnnouncement(ua, nodePub) + select { + case <-ctx.broadcastedMessage: + case <-time.After(time.Second): + t.Fatal("announcememt wasn't proceeded") + } + + if len(ctx.router.updates) != 1 { + t.Fatalf("edge update wasn't added to router: %v", err) + } +} + +// TestPrematureAnnouncement checks that premature networkMsgs are +// not propagated to the router subsystem until block with according +// block height received. +func TestPrematureAnnouncement(t *testing.T) { + ctx, cleanup, err := createTestCtx(0) + if err != nil { + t.Fatalf("can't create context: %v", err) + } + defer cleanup() + + priv, err := btcec.NewPrivateKey(btcec.S256()) + if err != nil { + t.Fatalf("can't create node pub key: %v", err) + } + nodePub := priv.PubKey() + + ca := createChannelAnnouncement(1) + ctx.discovery.ProcessRemoteAnnouncement(ca, nodePub) + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcement was proceeded") + case <-time.After(100 * time.Millisecond): + } + + if len(ctx.router.edges) != 0 { + t.Fatal("edge was added to router") + } + + ua := createUpdateAnnouncement(1) + ctx.discovery.ProcessRemoteAnnouncement(ua, nodePub) + select { + case <-ctx.broadcastedMessage: + t.Fatal("announcement was proceeded") + case <-time.After(100 * time.Millisecond): + } + + if len(ctx.router.updates) != 0 { + t.Fatal("edge update was added to router") + } + + newBlock := &wire.MsgBlock{} + ctx.notifier.notifyBlock(newBlock.Header.BlockHash(), 1) + + select { + case <-ctx.broadcastedMessage: + if err != nil { + t.Fatalf("announcememt was proceeded with err: %v", err) + } + case <-time.After(time.Second): + t.Fatal("announcememt wasn't proceeded") + } + + if len(ctx.router.edges) != 1 { + t.Fatalf("edge was't added to router: %v", err) + } + + select { + case <-ctx.broadcastedMessage: + if err != nil { + t.Fatalf("announcememt was proceeded with err: %v", err) + } + case <-time.After(time.Second): + t.Fatal("announcememt wasn't proceeded") + } + + if len(ctx.router.updates) != 1 { + t.Fatalf("edge update wasn't added to router: %v", err) + } +} diff --git a/log.go b/log.go index 3bb1b85e..af4274d0 100644 --- a/log.go +++ b/log.go @@ -8,6 +8,7 @@ import ( "github.com/btcsuite/seelog" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/discovery" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/routing" "github.com/roasbeef/btcd/connmgr" @@ -22,6 +23,7 @@ var ( ltndLog = btclog.Disabled lnwlLog = btclog.Disabled peerLog = btclog.Disabled + discLog = btclog.Disabled fndgLog = btclog.Disabled rpcsLog = btclog.Disabled srvrLog = btclog.Disabled @@ -39,6 +41,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "LTND": ltndLog, "LNWL": lnwlLog, "PEER": peerLog, + "DISC": discLog, "RPCS": rpcsLog, "SRVR": srvrLog, "NTFN": ntfnLog, @@ -70,6 +73,10 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "PEER": peerLog = logger + case "DISC": + discLog = logger + discovery.UseLogger(logger) + case "RPCS": rpcsLog = logger diff --git a/routing/errors.go b/routing/errors.go index c7c8fa37..f4a1177f 100644 --- a/routing/errors.go +++ b/routing/errors.go @@ -1,31 +1,91 @@ package routing -import "errors" +import "github.com/go-errors/errors" -var ( +// errorCode represent the error code and is used for compile time check. +type errorCode uint8 + +const ( // ErrNoPathFound is returned when a path to the target destination // does not exist in the graph. - ErrNoPathFound = errors.New("unable to find a path to " + - "destination") + ErrNoPathFound errorCode = iota // ErrNoRouteFound is returned when the router is unable to find a // valid route to the target destination after fees and time-lock // limitations are factored in. - ErrNoRouteFound = errors.New("unable to find a eligible route to " + - "the destination") + ErrNoRouteFound // ErrInsufficientCapacity is returned when a path if found, yet the // capacity of one of the channels in the path is insufficient to carry // the payment. - ErrInsufficientCapacity = errors.New("channel graph has " + - "insufficient capacity for the payment") + ErrInsufficientCapacity // ErrMaxHopsExceeded is returned when a candidate path is found, but // the length of that path exceeds HopLimit. - ErrMaxHopsExceeded = errors.New("potential path has too many hops") + ErrMaxHopsExceeded // ErrTargetNotInNetwork is returned when the target of a path-finding // or payment attempt isn't known to be within the current version of // the channel graph. - ErrTargetNotInNetwork = errors.New("target not found") + ErrTargetNotInNetwork + + // ErrOutdated is returned when the routing update already have + // been applied. + ErrOutdated + + // ErrIgnored is returned when the update have been ignored because + // this update can't bring us something new. + ErrIgnored ) + +// routerError is a structure that represent the error inside the routing package, +// this structure carries additional information about error code in order to +// be able distinguish errors outside of the current package. +type routerError struct { + err *errors.Error + code errorCode +} + +// Error represents errors as the string +// NOTE: Part of the error interface. +func (e *routerError) Error() string { + return e.err.Error() +} + +// A compile time check to ensure routerError implements the error interface. +var _ error = (*routerError)(nil) + +// newErr creates an routerError by the given error description and its +// corresponding error code. +func newErr(code errorCode, a interface{}) *routerError { + return &routerError{ + code: code, + err: errors.New(a), + } +} + +// newErrf creates an routerError by the given error formatted description and +// its corresponding error code. +func newErrf(code errorCode, format string, a ...interface{}) *routerError { + return &routerError{ + code: code, + err: errors.Errorf(format, a...), + } +} + +// IsError is a helper function which is needed to have ability to check that +// returned error has specific error code. +func IsError(e interface{}, codes ...errorCode) bool { + err, ok := e.(*routerError) + if !ok { + return false + } + + for _, code := range codes { + if err.code == code { + return true + } + } + + return false +} diff --git a/routing/notifications.go b/routing/notifications.go index 114e77db..51e1a7d7 100644 --- a/routing/notifications.go +++ b/routing/notifications.go @@ -1,14 +1,13 @@ package routing import ( - "errors" "fmt" "net" "sync/atomic" "github.com/davecgh/go-spew/spew" + "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" @@ -277,17 +276,17 @@ type ChannelEdgeUpdate struct { // information required to create the topology change update from the graph // database. func addToTopologyChange(graph *channeldb.ChannelGraph, update *TopologyChange, - msg lnwire.Message) error { + msg interface{}) error { switch m := msg.(type) { // Any node announcement maps directly to a NetworkNodeUpdate struct. // No further data munging or db queries are required. - case *lnwire.NodeAnnouncement: + case *channeldb.LightningNode: nodeUpdate := &NetworkNodeUpdate{ Addresses: m.Addresses, - IdentityKey: m.NodeID, - Alias: m.Alias.String(), + IdentityKey: m.PubKey, + Alias: m.Alias, } nodeUpdate.IdentityKey.Curve = nil @@ -296,19 +295,19 @@ func addToTopologyChange(graph *channeldb.ChannelGraph, update *TopologyChange, // We ignore initial channel announcements as we'll only send out // updates once the individual edges themselves have been updated. - case *lnwire.ChannelAnnouncement: + case *channeldb.ChannelEdgeInfo: return nil // Any new ChannelUpdateAnnouncements will generate a corresponding // ChannelEdgeUpdate notification. - case *lnwire.ChannelUpdateAnnouncement: + case *channeldb.ChannelEdgePolicy: // We'll need to fetch the edge's information from the database // in order to get the information concerning which nodes are // being connected. - chanID := m.ChannelID.ToUint64() - edgeInfo, _, _, err := graph.FetchChannelEdgesByID(chanID) + edgeInfo, _, _, err := graph.FetchChannelEdgesByID(m.ChannelID) if err != nil { - return err + return errors.Errorf("unable fetch channel edge: %v", + err) } // If the flag is one, then the advertising node is actually @@ -321,13 +320,13 @@ func addToTopologyChange(graph *channeldb.ChannelGraph, update *TopologyChange, } edgeUpdate := &ChannelEdgeUpdate{ - ChanID: chanID, + ChanID: m.ChannelID, ChanPoint: edgeInfo.ChannelPoint, TimeLockDelta: m.TimeLockDelta, Capacity: edgeInfo.Capacity, - MinHTLC: btcutil.Amount(m.HtlcMinimumMsat), - BaseFee: btcutil.Amount(m.FeeBaseMsat), - FeeRate: btcutil.Amount(m.FeeProportionalMillionths), + MinHTLC: m.MinHTLC, + BaseFee: m.FeeBaseMSat, + FeeRate: m.FeeProportionalMillionths, AdvertisingNode: sourceNode, ConnectingNode: connectingNode, } diff --git a/routing/notifications_test.go b/routing/notifications_test.go index e4b12876..e11f18e8 100644 --- a/routing/notifications_test.go +++ b/routing/notifications_test.go @@ -53,7 +53,7 @@ func createGraphNode() (*channeldb.LightningNode, error) { }, nil } -func createTestWireNode() (*lnwire.NodeAnnouncement, error) { +func createTestNode() (*channeldb.LightningNode, error) { priv, err := btcec.NewPrivateKey(btcec.S256()) if err != nil { return nil, err @@ -66,25 +66,26 @@ func createTestWireNode() (*lnwire.NodeAnnouncement, error) { return nil, err } - return &lnwire.NodeAnnouncement{ - Timestamp: uint32(prand.Int31()), - Addresses: testAddrs, - NodeID: priv.PubKey(), - Alias: alias, + return &channeldb.LightningNode{ + LastUpdate: time.Now(), + Address: testAddr, + PubKey: priv.PubKey(), + Alias: alias.String(), Features: testFeatures, }, nil } -func randEdgePolicyAnn(chanID lnwire.ChannelID) *lnwire.ChannelUpdateAnnouncement { +func randEdgePolicy(chanID lnwire.ChannelID, + node *channeldb.LightningNode) *channeldb.ChannelEdgePolicy { - return &lnwire.ChannelUpdateAnnouncement{ - Signature: testSig, - ChannelID: chanID, - Timestamp: uint32(prand.Int31()), + return &channeldb.ChannelEdgePolicy{ + ChannelID: chanID.ToUint64(), + LastUpdate: time.Unix(int64(prand.Int31()), 0), TimeLockDelta: uint16(prand.Int63()), - HtlcMinimumMsat: uint32(prand.Int31()), - FeeBaseMsat: uint32(prand.Int31()), - FeeProportionalMillionths: uint32(prand.Int31()), + MinHTLC: btcutil.Amount(prand.Int31()), + FeeBaseMSat: btcutil.Amount(prand.Int31()), + FeeProportionalMillionths: btcutil.Amount(prand.Int31()), + Node: node, } } @@ -268,7 +269,7 @@ func (m *mockNotifier) Stop() error { return nil } -// TestEdgeUpdateNotification tests that when edges are updated or discovered, +// TestEdgeUpdateNotification tests that when edges are updated or added, // a proper notification is sent of to all registered clients. func TestEdgeUpdateNotification(t *testing.T) { const startingBlockHeight = 101 @@ -292,34 +293,43 @@ func TestEdgeUpdateNotification(t *testing.T) { // Next we'll create two test nodes that the fake channel will be open // between and add then as members of the channel graph. - node1, err := createTestWireNode() + node1, err := createTestNode() if err != nil { t.Fatalf("unable to create test node: %v", err) } - node2, err := createTestWireNode() + node2, err := createTestNode() if err != nil { t.Fatalf("unable to create test node: %v", err) } - // Send the two node announcements to the channel router so they can be - // validated and stored within the graph database. - ctx.router.ProcessRoutingMessage(node1, node1.NodeID) - ctx.router.ProcessRoutingMessage(node2, node2.NodeID) + // Send the two node topology updates to the channel router so they + // can be validated and stored within the graph database. + if err := ctx.router.AddNode(node1); err != nil { + t.Fatal(err) + } + if err := ctx.router.AddNode(node2); err != nil { + t.Fatal(err) + } // Finally, to conclude our test set up, we'll create a channel - // announcement to announce the created channel between the two nodes. - channelAnn := &lnwire.ChannelAnnouncement{ - FirstNodeSig: testSig, - SecondNodeSig: testSig, - ChannelID: chanID, - FirstBitcoinSig: testSig, - SecondBitcoinSig: testSig, - FirstNodeID: node1.NodeID, - SecondNodeID: node2.NodeID, - FirstBitcoinKey: node1.NodeID, - SecondBitcoinKey: node2.NodeID, + // update to announce the created channel between the two nodes. + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID.ToUint64(), + NodeKey1: node1.PubKey, + NodeKey2: node2.PubKey, + BitcoinKey1: node1.PubKey, + BitcoinKey2: node2.PubKey, + AuthProof: &channeldb.ChannelAuthProof{ + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, + }, + } + + if err := ctx.router.AddEdge(edge); err != nil { + t.Fatal(err) } - ctx.router.ProcessRoutingMessage(channelAnn, node1.NodeID) // With the channel edge now in place, we'll subscribe for topology // notifications. @@ -330,17 +340,21 @@ func TestEdgeUpdateNotification(t *testing.T) { // Create random policy edges that are stemmed to the channel id // created above. - edge1 := randEdgePolicyAnn(chanID) + edge1 := randEdgePolicy(chanID, node1) edge1.Flags = 0 - edge2 := randEdgePolicyAnn(chanID) + edge2 := randEdgePolicy(chanID, node2) edge2.Flags = 1 - ctx.router.ProcessRoutingMessage(edge1, node1.NodeID) - ctx.router.ProcessRoutingMessage(edge2, node2.NodeID) + if err := ctx.router.UpdateEdge(edge1); err != nil { + t.Fatalf("unable to add edge update: %v", err) + } + if err := ctx.router.UpdateEdge(edge2); err != nil { + t.Fatalf("unable to add edge update: %v", err) + } - assertEdgeAnnCorrect := func(t *testing.T, edgeUpdate *ChannelEdgeUpdate, - edgeAnn *lnwire.ChannelUpdateAnnouncement) { - if edgeUpdate.ChanID != edgeAnn.ChannelID.ToUint64() { + assertEdgeCorrect := func(t *testing.T, edgeUpdate *ChannelEdgeUpdate, + edgeAnn *channeldb.ChannelEdgePolicy) { + if edgeUpdate.ChanID != edgeAnn.ChannelID { t.Fatalf("channel ID of edge doesn't match: "+ "expected %v, got %v", chanID.ToUint64(), edgeUpdate.ChanID) } @@ -354,14 +368,14 @@ func TestEdgeUpdateNotification(t *testing.T) { t.Fatalf("capacity of edge doesn't match: "+ "expected %v, got %v", chanValue, edgeUpdate.Capacity) } - if edgeUpdate.MinHTLC != btcutil.Amount(edgeAnn.HtlcMinimumMsat) { + if edgeUpdate.MinHTLC != btcutil.Amount(edgeAnn.MinHTLC) { t.Fatalf("min HTLC of edge doesn't match: "+ - "expected %v, got %v", btcutil.Amount(edgeAnn.HtlcMinimumMsat), + "expected %v, got %v", btcutil.Amount(edgeAnn.MinHTLC), edgeUpdate.MinHTLC) } - if edgeUpdate.BaseFee != btcutil.Amount(edgeAnn.FeeBaseMsat) { + if edgeUpdate.BaseFee != btcutil.Amount(edgeAnn.FeeBaseMSat) { t.Fatalf("base fee of edge doesn't match: "+ - "expected %v, got %v", edgeAnn.FeeBaseMsat, + "expected %v, got %v", edgeAnn.FeeBaseMSat, edgeUpdate.BaseFee) } if edgeUpdate.FeeRate != btcutil.Amount(edgeAnn.FeeProportionalMillionths) { @@ -382,26 +396,26 @@ func TestEdgeUpdateNotification(t *testing.T) { case ntfn := <-ntfnClient.TopologyChanges: edgeUpdate := ntfn.ChannelEdgeUpdates[0] if i == 0 { - assertEdgeAnnCorrect(t, edgeUpdate, edge1) - if !edgeUpdate.AdvertisingNode.IsEqual(node1.NodeID) { - t.Fatalf("advertising node mismatch") + assertEdgeCorrect(t, edgeUpdate, edge1) + if !edgeUpdate.AdvertisingNode.IsEqual(node1.PubKey) { + t.Fatal("advertising node mismatch") } - if !edgeUpdate.ConnectingNode.IsEqual(node2.NodeID) { - t.Fatalf("connecting node mismatch") + if !edgeUpdate.ConnectingNode.IsEqual(node2.PubKey) { + t.Fatal("connecting node mismatch") } continue } - assertEdgeAnnCorrect(t, edgeUpdate, edge2) - if !edgeUpdate.ConnectingNode.IsEqual(node1.NodeID) { - t.Fatalf("connecting node mismatch") + assertEdgeCorrect(t, edgeUpdate, edge2) + if !edgeUpdate.ConnectingNode.IsEqual(node1.PubKey) { + t.Fatal("connecting node mismatch") } - if !edgeUpdate.AdvertisingNode.IsEqual(node2.NodeID) { - t.Fatalf("advertising node mismatch") + if !edgeUpdate.AdvertisingNode.IsEqual(node2.PubKey) { + t.Fatal("advertising node mismatch") } case <-time.After(time.Second * 5): - t.Fatalf("update not received") + t.Fatal("update not received") } } } @@ -424,20 +438,24 @@ func TestNodeUpdateNotification(t *testing.T) { // Create two random nodes to add to send as node announcement messages // to trigger notifications. - node1Ann, err := createTestWireNode() + node1, err := createTestNode() if err != nil { t.Fatalf("unable to create test node: %v", err) } - node2Ann, err := createTestWireNode() + node2, err := createTestNode() if err != nil { t.Fatalf("unable to create test node: %v", err) } - // Send both announcement message to the channel router. - ctx.router.ProcessRoutingMessage(node1Ann, node1Ann.NodeID) - ctx.router.ProcessRoutingMessage(node2Ann, node2Ann.NodeID) + // Change network topology by adding nodes to the channel router. + if err := ctx.router.AddNode(node1); err != nil { + t.Fatalf("unable to add node: %v", err) + } + if err := ctx.router.AddNode(node2); err != nil { + t.Fatalf("unable to add node: %v", err) + } - assertNodeNtfnCorrect := func(t *testing.T, ann *lnwire.NodeAnnouncement, + assertNodeNtfnCorrect := func(t *testing.T, ann *channeldb.LightningNode, ntfns []*NetworkNodeUpdate) { // For each processed announcement we should only receive a @@ -454,14 +472,14 @@ func TestNodeUpdateNotification(t *testing.T) { t.Fatalf("node address doesn't match: expected %v, got %v", nodeNtfn.Addresses[0], ann.Addresses[0]) } - if !nodeNtfn.IdentityKey.IsEqual(ann.NodeID) { + if !nodeNtfn.IdentityKey.IsEqual(ann.PubKey) { t.Fatalf("node identity keys don't match: expected %x, "+ - "got %x", ann.NodeID.SerializeCompressed(), + "got %x", ann.PubKey.SerializeCompressed(), nodeNtfn.IdentityKey.SerializeCompressed()) } - if nodeNtfn.Alias != ann.Alias.String() { + if nodeNtfn.Alias != ann.Alias { t.Fatalf("node alias doesn't match: expected %v, got %v", - ann.Alias.String(), nodeNtfn.Alias) + ann.Alias, nodeNtfn.Alias) } } @@ -472,11 +490,11 @@ func TestNodeUpdateNotification(t *testing.T) { select { case ntfn := <-ntfnClient.TopologyChanges: if i == 0 { - assertNodeNtfnCorrect(t, node1Ann, ntfn.NodeUpdates) + assertNodeNtfnCorrect(t, node1, ntfn.NodeUpdates) continue } - assertNodeNtfnCorrect(t, node2Ann, ntfn.NodeUpdates) + assertNodeNtfnCorrect(t, node2, ntfn.NodeUpdates) case <-time.After(time.Second * 5): } } @@ -484,11 +502,13 @@ func TestNodeUpdateNotification(t *testing.T) { // If we receive a new update from a node (with a higher timestamp), // then it should trigger a new notification. // TODO(roasbeef): assume monotonic time. - nodeUpdateAnn := *node1Ann - nodeUpdateAnn.Timestamp = node1Ann.Timestamp + 300 + nodeUpdateAnn := *node1 + nodeUpdateAnn.LastUpdate = node1.LastUpdate.Add(300 * time.Millisecond) - // Send off the new node announcement to the channel router. - ctx.router.ProcessRoutingMessage(&nodeUpdateAnn, node1Ann.NodeID) + // Add new node topology update to the channel router. + if err := ctx.router.AddNode(&nodeUpdateAnn); err != nil { + t.Fatalf("unable to add node: %v", err) + } // Once again a notification should be received reflecting the up to // date node announcement. @@ -515,9 +535,9 @@ func TestNotificationCancellation(t *testing.T) { t.Fatalf("unable to subscribe for channel notifications: %v", err) } - // We'll create a fresh new node announcement to feed to the channel + // We'll create a fresh new node topology update to feed to the channel // router. - node1Ann, err := createTestWireNode() + node, err := createTestNode() if err != nil { t.Fatalf("unable to create test node: %v", err) } @@ -528,7 +548,9 @@ func TestNotificationCancellation(t *testing.T) { // client. ntfnClient.Cancel() - ctx.router.ProcessRoutingMessage(node1Ann, node1Ann.NodeID) + if err := ctx.router.AddNode(node); err != nil { + t.Fatalf("unable to add node: %v", err) + } select { // The notification shouldn't be sent, however, the channel should be @@ -538,10 +560,10 @@ func TestNotificationCancellation(t *testing.T) { return } - t.Fatalf("notification sent but shouldn't have been") + t.Fatal("notification sent but shouldn't have been") case <-time.After(time.Second * 5): - t.Fatalf("notification client never cancelled") + t.Fatal("notification client never cancelled") } } @@ -569,29 +591,33 @@ func TestChannelCloseNotification(t *testing.T) { // Next we'll create two test nodes that the fake channel will be open // between and add then as members of the channel graph. - node1, err := createTestWireNode() + node1, err := createTestNode() if err != nil { t.Fatalf("unable to create test node: %v", err) } - node2, err := createTestWireNode() + node2, err := createTestNode() if err != nil { t.Fatalf("unable to create test node: %v", err) } // Finally, to conclude our test set up, we'll create a channel // announcement to announce the created channel between the two nodes. - channelAnn := lnwire.ChannelAnnouncement{ - FirstNodeSig: testSig, - SecondNodeSig: testSig, - ChannelID: chanID, - FirstBitcoinSig: testSig, - SecondBitcoinSig: testSig, - FirstNodeID: node1.NodeID, - SecondNodeID: node2.NodeID, - FirstBitcoinKey: node1.NodeID, - SecondBitcoinKey: node2.NodeID, + edge := &channeldb.ChannelEdgeInfo{ + ChannelID: chanID.ToUint64(), + NodeKey1: node1.PubKey, + NodeKey2: node2.PubKey, + BitcoinKey1: node1.PubKey, + BitcoinKey2: node2.PubKey, + AuthProof: &channeldb.ChannelAuthProof{ + NodeSig1: testSig, + NodeSig2: testSig, + BitcoinSig1: testSig, + BitcoinSig2: testSig, + }, + } + if err := ctx.router.AddEdge(edge); err != nil { + t.Fatalf("unable to add edge: %v", err) } - ctx.router.ProcessRoutingMessage(&channelAnn, node1.NodeID) // With the channel edge now in place, we'll subscribe for topology // notifications. @@ -626,7 +652,7 @@ func TestChannelCloseNotification(t *testing.T) { // "closed" above. closedChans := ntfn.ClosedChannels if len(closedChans) == 0 { - t.Fatalf("close channel ntfn not populated") + t.Fatal("close channel ntfn not populated") } else if len(closedChans) != 1 { t.Fatalf("only one should've been detected as closed, "+ "instead %v were", len(closedChans)) @@ -656,6 +682,6 @@ func TestChannelCloseNotification(t *testing.T) { } case <-time.After(time.Second * 5): - t.Fatalf("notification not sent") + t.Fatal("notification not sent") } } diff --git a/routing/pathfind.go b/routing/pathfind.go index 2178941d..b9c6cf48 100644 --- a/routing/pathfind.go +++ b/routing/pathfind.go @@ -187,7 +187,8 @@ func newRoute(amtToSend btcutil.Amount, pathEdges []*ChannelHop) (*Route, error) // enough capacity to forward the required amount which // includes the fee dictated at each hop. if nextHop.AmtToForward > nextHop.Channel.Capacity { - return nil, ErrInsufficientCapacity + return nil, newErrf(ErrInsufficientCapacity, "channel graph has "+ + "insufficient capacity for the payment") } // We don't pay any fees to ourselves on the first-hop channel, @@ -373,7 +374,8 @@ func findPath(graph *channeldb.ChannelGraph, sourceNode *channeldb.LightningNode // If the target node isn't found in the prev hop map, then a path // doesn't exist, so we terminate in an error. if _, ok := prev[newVertex(target)]; !ok { - return nil, ErrNoPathFound + return nil, newErrf(ErrNoPathFound, "unable to find a path to "+ + "destination") } // If the potential route if below the max hop limit, then we'll use @@ -398,7 +400,8 @@ func findPath(graph *channeldb.ChannelGraph, sourceNode *channeldb.LightningNode // hops, then it's invalid. numEdges := len(pathEdges) if numEdges > HopLimit { - return nil, ErrMaxHopsExceeded + return nil, newErr(ErrMaxHopsExceeded, "potential path has "+ + "too many hops") } // As our traversal of the prev map above walked backwards from the @@ -515,7 +518,7 @@ func findPaths(graph *channeldb.ChannelGraph, source *channeldb.LightningNode, // If we weren't able to find a path, we'll continue to // the next round. - if err == ErrNoPathFound { + if IsError(err, ErrNoPathFound) { continue } else if err != nil { return nil, err diff --git a/routing/pathfind_test.go b/routing/pathfind_test.go index de8050f7..596c826c 100644 --- a/routing/pathfind_test.go +++ b/routing/pathfind_test.go @@ -508,7 +508,7 @@ func TestPathNotAvailable(t *testing.T) { _, err = findPath(graph, sourceNode, unknownNode, ignoredVertexes, ignoredEdges, 100) - if err != ErrNoPathFound { + if !IsError(err, ErrNoPathFound) { t.Fatalf("path shouldn't have been found: %v", err) } } @@ -540,7 +540,7 @@ func TestPathInsufficientCapacity(t *testing.T) { const payAmt = btcutil.SatoshiPerBitcoin _, err = findPath(graph, sourceNode, target, ignoredVertexes, ignoredEdges, payAmt) - if err != ErrNoPathFound { + if !IsError(err, ErrNoPathFound) { t.Fatalf("graph shouldn't be able to support payment: %v", err) } } diff --git a/routing/router.go b/routing/router.go index 23333a11..d2febe33 100644 --- a/routing/router.go +++ b/routing/router.go @@ -2,12 +2,10 @@ package routing import ( "bytes" - "encoding/hex" "fmt" "sort" "sync" "sync/atomic" - "time" "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" @@ -18,9 +16,44 @@ import ( "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" + "github.com/go-errors/errors" "github.com/lightningnetwork/lightning-onion" ) +// ChannelGraphSource represent the source of information about the +// topology of lightning network, it responsible for addition of nodes, edges +// and applying edges updates, return the current block with with out +// topology is synchronized. +type ChannelGraphSource interface { + // AddNode is used to add node to the topology of the router, after + // this node might be used in construction of payment path. + AddNode(node *channeldb.LightningNode) error + + // AddEdge is used to add edge/channel to the topology of the router, + // after all information about channel will be gathered this + // edge/channel might be used in construction of payment path. + AddEdge(edge *channeldb.ChannelEdgeInfo) error + + // UpdateEdge is used to update edge information, without this + // message edge considered as not fully constructed. + UpdateEdge(policy *channeldb.ChannelEdgePolicy) error + + // ForAllOutgoingChannels is used to iterate over all self channels info. + ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgePolicy) error) error + + // CurrentBlockHeight returns the block height from POV of the router + // subsystem. + CurrentBlockHeight() (uint32, error) + + // ForEachNode is used to iterate over every node in router topology. + ForEachNode(func(node *channeldb.LightningNode) error) error + + // ForEachChannel is used to iterate over every channel in router + // topology. + ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo, + e1, e2 *channeldb.ChannelEdgePolicy) error) error +} + // FeeSchema is the set fee configuration for a Lighting Node on the network. // Using the coefficients described within he schema, the required fee to // forward outgoing payments can be derived. @@ -51,7 +84,6 @@ type Config struct { // Chain is the router's source to the most up-to-date blockchain data. // All incoming advertised channels will be checked against the chain // to ensure that the channels advertised are still open. - // TODO(roasbeef): remove after discovery service is in Chain lnwallet.BlockChainIO // Notifier is an instance of the ChainNotifier that the router uses to @@ -67,17 +99,6 @@ type Config struct { // TODO(roasbeef): should either be in discovery or switch FeeSchema *FeeSchema - // Broadcast is a function that is used to broadcast a particular set - // of messages to all peers that the daemon is connected to. If - // supplied, the exclude parameter indicates that the target peer should - // be excluded from the broadcast. - Broadcast func(exclude *btcec.PublicKey, msg ...lnwire.Message) error - - // SendMessages is a function which allows the ChannelRouter to send a - // set of messages to a particular peer identified by the target public - // key. - SendMessages func(target *btcec.PublicKey, msg ...lnwire.Message) error - // SendToSwitch is a function that directs a link-layer switch to // forward a fully encoded payment to the first hop in the route // denoted by its public key. A non-nil error is to be returned if the @@ -109,11 +130,9 @@ func newRouteTuple(amt btcutil.Amount, dest *btcec.PublicKey) routeTuple { // ChannelRouter is the HtlcSwitch, and below that is the Bitcoin blockchain // itself. The primary role of the ChannelRouter is to respond to queries for // potential routes that can support a payment amount, and also general graph -// reachability questions. The router will prune the channel graph -// automatically as new blocks are discovered which spend certain known funding -// outpoints, thereby closing their respective channels. Additionally, it's the -// duty of the router to sync up newly connected peers with the latest state of -// the channel graph. +// reachability questions. The router will prune the channel graph automatically +// as new blocks are discovered which spend certain known funding outpoints, +// thereby closing their respective channels. type ChannelRouter struct { ntfnClientCounter uint64 @@ -144,23 +163,10 @@ type ChannelRouter struct { // the main chain are sent over. newBlocks <-chan *chainntnfs.BlockEpoch - // networkMsgs is a channel that carries new network messages from - // outside the ChannelRouter to be processed by the networkHandler. - networkMsgs chan *routingMsg - - // syncRequests is a channel that carries requests to synchronize newly - // connected peers to the state of the channel graph from our PoV. - syncRequests chan *syncRequest - - // prematureAnnouncements maps a blockheight to a set of announcements - // which are "premature" from our PoV. An announcement is premature if - // it claims to be anchored in a block which is beyond the current main - // chain tip as we know it. Premature announcements will be processed - // once the chain tip as we know it extends to/past the premature - // height. - // - // TODO(roasbeef): limit premature announcements to N - prematureAnnouncements map[uint32][]lnwire.Message + // networkUpdates is a channel that carries new topology updates + // messages from outside the ChannelRouter to be processed by the + // networkHandler. + networkUpdates chan *routingMsg // topologyClients maps a client's unique notification ID to a // topologyClient client that contains its notification dispatch @@ -173,54 +179,34 @@ type ChannelRouter struct { // existing client. ntfnClientUpdates chan *topologyClientUpdate - // bestHeight is the height of the block at the tip of the main chain - // as we know it. - bestHeight uint32 - - fakeSig *btcec.Signature - sync.RWMutex quit chan struct{} wg sync.WaitGroup } +// A compile time check to ensure ChannelRouter implements the ChannelGraphSource interface. +var _ ChannelGraphSource = (*ChannelRouter)(nil) + // New creates a new instance of the ChannelRouter with the specified // configuration parameters. As part of initialization, if the router detects // that the channel graph isn't fully in sync with the latest UTXO (since the // channel graph is a subset of the UTXO set) set, then the router will proceed // to fully sync to the latest state of the UTXO set. func New(cfg Config) (*ChannelRouter, error) { - // TODO(roasbeef): remove this place holder after sigs are properly - // stored in the graph. - s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" + - "1bf0314f882d70220299105481d63e0f4bc2a88121167221b6700d72a0e" + - "ad154c03be696a292d24ae" - fakeSigHex, err := hex.DecodeString(s) - if err != nil { - return nil, err - } - fakeSig, err := btcec.ParseSignature(fakeSigHex, btcec.S256()) - if err != nil { - return nil, err - } - selfNode, err := cfg.Graph.SourceNode() if err != nil { return nil, err } return &ChannelRouter{ - cfg: &cfg, - selfNode: selfNode, - fakeSig: fakeSig, - networkMsgs: make(chan *routingMsg), - syncRequests: make(chan *syncRequest), - prematureAnnouncements: make(map[uint32][]lnwire.Message), - topologyClients: make(map[uint64]topologyClient), - ntfnClientUpdates: make(chan *topologyClientUpdate), - routeCache: make(map[routeTuple][]*Route), - quit: make(chan struct{}), + cfg: &cfg, + selfNode: selfNode, + networkUpdates: make(chan *routingMsg), + topologyClients: make(map[uint64]topologyClient), + ntfnClientUpdates: make(chan *topologyClientUpdate), + routeCache: make(map[routeTuple][]*Route), + quit: make(chan struct{}), }, nil } @@ -244,12 +230,6 @@ func (r *ChannelRouter) Start() error { } r.newBlocks = blockEpochs.Epochs - _, height, err := r.cfg.Chain.GetBestBlock() - if err != nil { - return err - } - r.bestHeight = uint32(height) - // Before we begin normal operation of the router, we first need to // synchronize the channel graph to the latest state of the UTXO set. if err := r.syncGraphWithChain(); err != nil { @@ -372,54 +352,42 @@ func (r *ChannelRouter) syncGraphWithChain() error { // networkHandler is the primary goroutine for the ChannelRouter. The roles of // this goroutine include answering queries related to the state of the -// network, syncing up newly connected peers, and also periodically -// broadcasting our latest state to all connected peers. +// network, pruning the graph on new block notification, applying network +// updates, and registering new topology clients. // // NOTE: This MUST be run as a goroutine. func (r *ChannelRouter) networkHandler() { defer r.wg.Done() - var announcementBatch []lnwire.Message - - // TODO(roasbeef): parametrize the above - trickleTimer := time.NewTicker(time.Millisecond * 300) - defer trickleTimer.Stop() - - retransmitTimer := time.NewTicker(time.Minute * 30) - defer retransmitTimer.Stop() - for { select { - // A new fully validated network message has just arrived. As a + // A new fully validated network update has just arrived. As a // result we'll modify the channel graph accordingly depending // on the exact type of the message. - case netMsg := <-r.networkMsgs: - // Process the network announcement to determine if - // this is either a new announcement from our PoV or an - // update to a prior vertex/edge we previously + case updateMsg := <-r.networkUpdates: + // Process the routing update to determine if this is + // either a new update from our PoV or an update to a + // prior vertex/edge we previously // accepted. - accepted := r.processNetworkAnnouncement(netMsg.msg) + err := r.processUpdate(updateMsg.msg) + updateMsg.err <- err + if err != nil { + continue + } - // If the update was accepted, then add it to our next - // announcement batch to be broadcast once the trickle - // timer ticks gain. - if accepted { - // TODO(roasbeef): exclude peer that sent - announcementBatch = append(announcementBatch, netMsg.msg) + // Send off a new notification for the newly + // accepted update. + topChange := &TopologyChange{} + err = addToTopologyChange(r.cfg.Graph, topChange, + updateMsg.msg) + if err != nil { + log.Errorf("unable to update topology "+ + "change notification: %v", err) + continue + } - // Send off a new notification for the newly - // accepted announcement. - topChange := &TopologyChange{} - err := addToTopologyChange(r.cfg.Graph, topChange, - netMsg.msg) - if err != nil { - log.Errorf("unable to update topology "+ - "change notification: %v", err) - } - - if !topChange.isEmpty() { - r.notifyTopologyChange(topChange) - } + if !topChange.isEmpty() { + r.notifyTopologyChange(topChange) } // TODO(roasbeef): remove all unconnected vertexes @@ -438,41 +406,6 @@ func (r *ChannelRouter) networkHandler() { // Once a new block arrives, we update our running // track of the height of the chain tip. blockHeight := uint32(newBlock.Height) - r.bestHeight = blockHeight - - // Next we check if we have any premature announcements - // for this height, if so, then we process them once - // more as normal announcements. - prematureAnns := r.prematureAnnouncements[uint32(newBlock.Height)] - if len(prematureAnns) != 0 { - log.Infof("Re-processing %v premature announcements for "+ - "height %v", len(prematureAnns), blockHeight) - } - - topChange := &TopologyChange{} - for _, ann := range prematureAnns { - if ok := r.processNetworkAnnouncement(ann); ok { - announcementBatch = append(announcementBatch, ann) - - // As the announcement was accepted, - // accumulate it to the running set of - // announcements for this block. - err := addToTopologyChange(r.cfg.Graph, - topChange, ann) - if err != nil { - log.Errorf("unable to update topology "+ - "change notification: %v", err) - } - } - } - delete(r.prematureAnnouncements, blockHeight) - - // If the pending notification generated above isn't - // empty, then send it out to all registered clients. - if !topChange.isEmpty() { - r.notifyTopologyChange(topChange) - } - log.Infof("Pruning channel graph using block %v (height=%v)", newBlock.Hash, blockHeight) @@ -526,101 +459,6 @@ func (r *ChannelRouter) networkHandler() { ClosedChannels: closeSummaries, }) - // The retransmission timer has ticked which indicates that we - // should broadcast our personal channel to the network. This - // addresses the case of channel advertisements whether being - // dropped, or not properly propagated through the network. - case <-retransmitTimer.C: - var selfChans []lnwire.Message - - selfPub := r.selfNode.PubKey.SerializeCompressed() - err := r.selfNode.ForEachChannel(nil, func(_ *channeldb.ChannelEdgeInfo, - c *channeldb.ChannelEdgePolicy) error { - - chanNodePub := c.Node.PubKey.SerializeCompressed() - - // Compare our public key with that of the - // channel peer. If our key is "less" than - // theirs, then we're the "first" node in the - // advertisement, otherwise we're the second. - flags := uint16(1) - if bytes.Compare(selfPub, chanNodePub) == -1 { - flags = 0 - } - - selfChans = append(selfChans, &lnwire.ChannelUpdateAnnouncement{ - Signature: r.fakeSig, - ChannelID: lnwire.NewChanIDFromInt(c.ChannelID), - Timestamp: uint32(c.LastUpdate.Unix()), - Flags: flags, - TimeLockDelta: c.TimeLockDelta, - HtlcMinimumMsat: uint32(c.MinHTLC), - FeeBaseMsat: uint32(c.FeeBaseMSat), - FeeProportionalMillionths: uint32(c.FeeProportionalMillionths), - }) - return nil - }) - if err != nil { - log.Errorf("unable to retransmit "+ - "channels: %v", err) - continue - } - - if len(selfChans) == 0 { - continue - } - - log.Debugf("Retransmitting %v outgoing channels", - len(selfChans)) - - // With all the wire messages properly crafted, we'll - // broadcast our known outgoing channel to all our - // immediate peers. - if err := r.cfg.Broadcast(nil, selfChans...); err != nil { - log.Errorf("unable to re-broadcast "+ - "channels: %v", err) - } - - // The trickle timer has ticked, which indicates we should - // flush to the network the pending batch of new announcements - // we've received since the last trickle tick. - case <-trickleTimer.C: - // If the current announcement batch is nil, then we - // have no further work here. - if len(announcementBatch) == 0 { - continue - } - - log.Infof("Broadcasting batch of %v new announcements", - len(announcementBatch)) - - // If we have new things to announce then broadcast - // then to all our immediately connected peers. - err := r.cfg.Broadcast(nil, announcementBatch...) - if err != nil { - log.Errorf("unable to send batch announcement: %v", err) - continue - } - - // If we we're able to broadcast the current batch - // successfully, then we reset the batch for a new - // round of announcements. - announcementBatch = nil - - // We've just received a new request to synchronize a peer with - // our latest graph state. This indicates that a peer has just - // connected for the first time, so for now we dump our entire - // graph and allow them to sift through the (subjectively) new - // information on their own. - case syncReq := <-r.syncRequests: - nodePub := syncReq.node.SerializeCompressed() - log.Infof("Synchronizing channel graph with %x", nodePub) - - if err := r.syncChannelGraph(syncReq); err != nil { - log.Errorf("unable to sync graph state with %x: %v", - nodePub, err) - } - // A new notification client update has arrived. We're either // gaining a new client, or cancelling notifications for an // existing client. @@ -650,241 +488,160 @@ func (r *ChannelRouter) networkHandler() { } } -// processNetworkAnnouncement processes a new network relate authenticated -// channel or node announcement. If the update didn't affect the internal state -// of the draft due to either being out of date, invalid, or redundant, then -// false is returned. Otherwise, true is returned indicating that the caller -// may want to batch this request to be broadcast to immediate peers during the -// next announcement epoch. -func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { - isPremature := func(chanID *lnwire.ChannelID) bool { - return chanID.BlockHeight > r.bestHeight - } +// processUpdate processes a new relate authenticated channel/edge, node or +// channel/edge update network update. If the update didn't affect the +// internal state of the draft due to either being out of date, invalid, or +// redundant, then error is returned. +func (r *ChannelRouter) processUpdate(msg interface{}) error { var invalidateCache bool switch msg := msg.(type) { - - // A new node announcement has arrived which either presents a new - // node, or a node updating previously advertised information. - case *lnwire.NodeAnnouncement: + case *channeldb.LightningNode: // Before proceeding ensure that we aren't already away of this // node, and if we are then this is a newer update that we // known of. - lastUpdate, exists, err := r.cfg.Graph.HasLightningNode(msg.NodeID) + lastUpdate, exists, err := r.cfg.Graph.HasLightningNode(msg.PubKey) if err != nil { - log.Errorf("Unable to query for the existence of node: %v", - err) - return false + return errors.Errorf("unable to query for the "+ + "existence of node: %v", err) + } // If we've reached this pint then we're aware of th vertex // being advertised. So we now check if the new message has a // new time stamp, if not then we won't accept the new data as // it would override newer data. - msgTimestamp := time.Unix(int64(msg.Timestamp), 0) - if exists && lastUpdate.After(msgTimestamp) || - lastUpdate.Equal(msgTimestamp) { + if exists && lastUpdate.After(msg.LastUpdate) || + lastUpdate.Equal(msg.LastUpdate) { - log.Debugf("Ignoring outdated announcement for %x", - msg.NodeID.SerializeCompressed()) - return false + return newErrf(ErrOutdated, "ignoring outdated "+ + "announcement for %x", msg.PubKey.SerializeCompressed()) } - node := &channeldb.LightningNode{ - LastUpdate: msgTimestamp, - Addresses: msg.Addresses, - PubKey: msg.NodeID, - Alias: msg.Alias.String(), - Features: msg.Features, - } - - if err = r.cfg.Graph.AddLightningNode(node); err != nil { - log.Errorf("unable to add node %v: %v", msg.NodeID, err) - return false + if err := r.cfg.Graph.AddLightningNode(msg); err != nil { + return errors.Errorf("unable to add msg %v to the "+ + "graph: %v", msg.PubKey.SerializeCompressed(), err) } log.Infof("Updated vertex data for node=%x", - msg.NodeID.SerializeCompressed()) + msg.PubKey.SerializeCompressed()) - // A new channel announcement has arrived, this indicates the - // *creation* of a new channel within the graph. This only advertises - // the existence of a channel and not yet the routing policies in - // either direction of the channel. - case *lnwire.ChannelAnnouncement: + case *channeldb.ChannelEdgeInfo: // Prior to processing the announcement we first check if we // already know of this channel, if so, then we can exit early. - channelID := msg.ChannelID.ToUint64() - _, _, exists, err := r.cfg.Graph.HasChannelEdge(channelID) + _, _, exists, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID) if err != nil && err != channeldb.ErrGraphNoEdgesFound { - log.Errorf("unable to check for edge existence: %v", err) - return false + return errors.Errorf("unable to check for edge "+ + "existence: %v", err) + } else if exists { - log.Debugf("Ignoring announcement for known chan_id=%v", - channelID) - return false + + return newErrf(ErrIgnored, "ignoring msg for known "+ + "chan_id=%v", msg.ChannelID) } - // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll put the announcement in limbo - // to be fully verified once we advance forward in the chain. - if isPremature(&msg.ChannelID) { - blockHeight := msg.ChannelID.BlockHeight - log.Infof("Announcement for chan_id=(%v), is "+ - "premature: advertises height %v, only height "+ - "%v is known", channelID, - msg.ChannelID.BlockHeight, r.bestHeight) - - r.prematureAnnouncements[blockHeight] = append( - r.prematureAnnouncements[blockHeight], - msg, - ) - return false - } + // TODO(andrew.shvv) Add validation that bitcoin keys are + // binded to the funding transaction. // Before we can add the channel to the channel graph, we need // to obtain the full funding outpoint that's encoded within // the channel ID. - fundingPoint, err := r.fetchChanPoint(&msg.ChannelID) + channelID := lnwire.NewChanIDFromInt(msg.ChannelID) + fundingPoint, err := r.fetchChanPoint(&channelID) if err != nil { - log.Errorf("unable to fetch chan point for chan_id=%v: %v", - channelID, err) - return false + return errors.Errorf("unable to fetch chan point for "+ + "chan_id=%v: %v", msg.ChannelID, err) } // Now that we have the funding outpoint of the channel, ensure // that it hasn't yet been spent. If so, then this channel has // been closed so we'll ignore it. - chanUtxo, err := r.cfg.Chain.GetUtxo(&fundingPoint.Hash, - fundingPoint.Index) + chanUtxo, err := r.cfg.Chain.GetUtxo(&fundingPoint.Hash, fundingPoint.Index) if err != nil { - log.Errorf("unable to fetch utxo for chan_id=%v: %v", - channelID, err) - return false + return errors.Errorf("unable to fetch utxo for "+ + "chan_id=%v: %v", channelID, err) } - edge := &channeldb.ChannelEdgeInfo{ - ChannelID: channelID, - NodeKey1: msg.FirstNodeID, - NodeKey2: msg.SecondNodeID, - BitcoinKey1: msg.FirstBitcoinKey, - BitcoinKey2: msg.SecondBitcoinKey, - AuthProof: &channeldb.ChannelAuthProof{ - NodeSig1: msg.FirstNodeSig, - NodeSig2: msg.SecondNodeSig, - BitcoinSig1: msg.FirstBitcoinSig, - BitcoinSig2: msg.SecondBitcoinSig, - }, - ChannelPoint: *fundingPoint, - // TODO(roasbeef): this is a hack, needs to be removed - // after commitment fees are dynamic. - Capacity: btcutil.Amount(chanUtxo.Value) - btcutil.Amount(5000), - } - if err := r.cfg.Graph.AddChannelEdge(edge); err != nil { - log.Errorf("unable to add channel: %v", err) - return false + // TODO(roasbeef): this is a hack, needs to be removed + // after commitment fees are dynamic. + msg.Capacity = btcutil.Amount(chanUtxo.Value) - btcutil.Amount(5000) + msg.ChannelPoint = *fundingPoint + + if err := r.cfg.Graph.AddChannelEdge(msg); err != nil { + return errors.Errorf("unable to add edge: %v", err) } invalidateCache = true log.Infof("New channel discovered! Link "+ "connects %x and %x with ChannelPoint(%v), chan_id=%v", - msg.FirstNodeID.SerializeCompressed(), - msg.SecondNodeID.SerializeCompressed(), - fundingPoint, channelID) + msg.NodeKey1.SerializeCompressed(), + msg.NodeKey2.SerializeCompressed(), + fundingPoint, msg.ChannelID) - // A new authenticated channel update has has arrived, this indicates - // that the directional information for an already known channel has - // been updated. All updates are signed and validated before reaching - // us, so we trust the data to be legitimate. - case *lnwire.ChannelUpdateAnnouncement: - chanID := msg.ChannelID.ToUint64() - edge1Timestamp, edge2Timestamp, _, err := r.cfg.Graph.HasChannelEdge(chanID) + case *channeldb.ChannelEdgePolicy: + channelID := lnwire.NewChanIDFromInt(msg.ChannelID) + edge1Timestamp, edge2Timestamp, _, err := r.cfg.Graph.HasChannelEdge(msg.ChannelID) if err != nil && err != channeldb.ErrGraphNoEdgesFound { - log.Errorf("unable to check for edge existence: %v", err) - return false - } + return errors.Errorf("unable to check for edge "+ + "existence: %v", err) - // If the advertised inclusionary block is beyond our knowledge - // of the chain tip, then we'll put the announcement in limbo - // to be fully verified once we advance forward in the chain. - if isPremature(&msg.ChannelID) { - blockHeight := msg.ChannelID.BlockHeight - log.Infof("Update announcement for chan_id=(%v), is "+ - "premature: advertises height %v, only height "+ - "%v is known", chanID, blockHeight, - r.bestHeight) - - r.prematureAnnouncements[blockHeight] = append( - r.prematureAnnouncements[blockHeight], - msg, - ) - return false } // As edges are directional edge node has a unique policy for // the direction of the edge they control. Therefore we first // check if we already have the most up to date information for // that edge. If so, then we can exit early. - updateTimestamp := time.Unix(int64(msg.Timestamp), 0) switch msg.Flags { // A flag set of 0 indicates this is an announcement for the // "first" node in the channel. case 0: - if edge1Timestamp.After(updateTimestamp) || - edge1Timestamp.Equal(updateTimestamp) { + if edge1Timestamp.After(msg.LastUpdate) || + edge1Timestamp.Equal(msg.LastUpdate) { + return newErrf(ErrIgnored, "ignoring announcement "+ + "(flags=%v) for known chan_id=%v", msg.Flags, + msg.ChannelID) - log.Debugf("Ignoring announcement (flags=%v) "+ - "for known chan_id=%v", msg.Flags, - chanID) - return false } // Similarly, a flag set of 1 indicates this is an announcement // for the "second" node in the channel. case 1: - if edge2Timestamp.After(updateTimestamp) || - edge2Timestamp.Equal(updateTimestamp) { + if edge2Timestamp.After(msg.LastUpdate) || + edge2Timestamp.Equal(msg.LastUpdate) { - log.Debugf("Ignoring announcement (flags=%v) "+ - "for known chan_id=%v", msg.Flags, - chanID) - return false + return newErrf(ErrIgnored, "ignoring announcement "+ + "(flags=%v) for known chan_id=%v", msg.Flags, + msg.ChannelID) } } // Before we can update the channel information, we need to get // the UTXO itself so we can store the proper capacity. - chanPoint, err := r.fetchChanPoint(&msg.ChannelID) + chanPoint, err := r.fetchChanPoint(&channelID) if err != nil { - log.Errorf("unable to fetch chan point for chan_id=%v: %v", chanID, err) - return false + return errors.Errorf("unable to fetch chan point for "+ + "chan_id=%v: %v", msg.ChannelID, err) } if _, err := r.cfg.Chain.GetUtxo(&chanPoint.Hash, chanPoint.Index); err != nil { - log.Errorf("unable to fetch utxo for chan_id=%v: %v", - chanID, err) - return false + return errors.Errorf("unable to fetch utxo for "+ + "chan_id=%v: %v", msg.ChannelID, err) } - // TODO(roasbeef): should be msat here - chanUpdate := &channeldb.ChannelEdgePolicy{ - ChannelID: chanID, - LastUpdate: updateTimestamp, - Flags: msg.Flags, - TimeLockDelta: msg.TimeLockDelta, - MinHTLC: btcutil.Amount(msg.HtlcMinimumMsat), - FeeBaseMSat: btcutil.Amount(msg.FeeBaseMsat), - FeeProportionalMillionths: btcutil.Amount(msg.FeeProportionalMillionths), - } - if err = r.cfg.Graph.UpdateEdgePolicy(chanUpdate); err != nil { - log.Errorf("unable to add channel: %v", err) - return false + if err = r.cfg.Graph.UpdateEdgePolicy(msg); err != nil { + err := errors.Errorf("unable to add channel: %v", err) + log.Error(err) + return err } invalidateCache = true log.Infof("New channel update applied: %v", - spew.Sdump(chanUpdate)) + spew.Sdump(msg)) + + default: + return errors.Errorf("wrong routing update message type") } // If we've received a channel update, then invalidate the route cache @@ -896,142 +653,7 @@ func (r *ChannelRouter) processNetworkAnnouncement(msg lnwire.Message) bool { r.routeCacheMtx.Unlock() } - return true -} - -// syncRequest represents a request from an outside subsystem to the wallet to -// sync a new node to the latest graph state. -type syncRequest struct { - node *btcec.PublicKey -} - -// SynchronizeNode sends a message to the ChannelRouter indicating it should -// synchronize routing state with the target node. This method is to be -// utilized when a node connections for the first time to provide it with the -// latest channel graph state. -func (r *ChannelRouter) SynchronizeNode(pub *btcec.PublicKey) { - select { - case r.syncRequests <- &syncRequest{ - node: pub, - }: - case <-r.quit: - return - } -} - -// syncChannelGraph attempts to synchronize the target node in the syncReq to -// the latest channel graph state. In order to accomplish this, (currently) the -// entire graph is read from disk, then serialized to the format defined within -// the current wire protocol. This cache of graph data is then sent directly to -// the target node. -func (r *ChannelRouter) syncChannelGraph(syncReq *syncRequest) error { - targetNode := syncReq.node - - // TODO(roasbeef): need to also store sig data in db - // * will be nice when we switch to pairing sigs would only need one ^_^ - - // We'll collate all the gathered routing messages into a single slice - // containing all the messages to be sent to the target peer. - var announceMessages []lnwire.Message - - // First run through all the vertexes in the graph, retrieving the data - // for the announcement we originally retrieved. - var numNodes uint32 - if err := r.cfg.Graph.ForEachNode(func(node *channeldb.LightningNode) error { - alias, err := lnwire.NewAlias(node.Alias) - if err != nil { - return err - } - - ann := &lnwire.NodeAnnouncement{ - Signature: r.fakeSig, - Timestamp: uint32(node.LastUpdate.Unix()), - NodeID: node.PubKey, - Alias: alias, - Features: node.Features, - Addresses: node.Addresses, - } - announceMessages = append(announceMessages, ann) - - numNodes++ - - return nil - }); err != nil { - return err - } - - // With the vertexes gathered, we'll no retrieve the initial - // announcement, as well as the latest channel update announcement for - // both of the directed edges that make up the channel. - var numEdges uint32 - if err := r.cfg.Graph.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo, - e1, e2 *channeldb.ChannelEdgePolicy) error { - - chanID := lnwire.NewChanIDFromInt(chanInfo.ChannelID) - - // First, using the parameters of the channel, along with the - // channel authentication proof, we'll create re-create the - // original authenticated channel announcement. - authProof := chanInfo.AuthProof - chanAnn := &lnwire.ChannelAnnouncement{ - FirstNodeSig: authProof.NodeSig1, - SecondNodeSig: authProof.NodeSig2, - ChannelID: chanID, - FirstBitcoinSig: authProof.BitcoinSig1, - SecondBitcoinSig: authProof.BitcoinSig2, - FirstNodeID: chanInfo.NodeKey1, - SecondNodeID: chanInfo.NodeKey2, - FirstBitcoinKey: chanInfo.BitcoinKey1, - SecondBitcoinKey: chanInfo.BitcoinKey2, - } - - // We'll unconditionally queue the channel's existence proof as - // it will need to be processed before either of the channel - // update announcements. - announceMessages = append(announceMessages, chanAnn) - - // Since it's up to a node's policy as to whether they - // advertise the edge in dire direction, we don't create an - // advertisement if the edge is nil. - if e1 != nil { - announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ - Signature: r.fakeSig, - ChannelID: chanID, - Timestamp: uint32(e1.LastUpdate.Unix()), - Flags: 0, - TimeLockDelta: e1.TimeLockDelta, - HtlcMinimumMsat: uint32(e1.MinHTLC), - FeeBaseMsat: uint32(e1.FeeBaseMSat), - FeeProportionalMillionths: uint32(e1.FeeProportionalMillionths), - }) - } - if e2 != nil { - announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{ - Signature: r.fakeSig, - ChannelID: chanID, - Timestamp: uint32(e2.LastUpdate.Unix()), - Flags: 1, - TimeLockDelta: e2.TimeLockDelta, - HtlcMinimumMsat: uint32(e2.MinHTLC), - FeeBaseMsat: uint32(e2.FeeBaseMSat), - FeeProportionalMillionths: uint32(e2.FeeProportionalMillionths), - }) - } - - numEdges++ - return nil - }); err != nil && err != channeldb.ErrGraphNoEdgesFound { - log.Errorf("unable to sync edges w/ peer: %v", err) - return err - } - - log.Infof("Syncing channel graph state with %x, sending %v "+ - "nodes and %v edges", targetNode.SerializeCompressed(), - numNodes, numEdges) - - // With all the announcement messages gathered, send them all in a - // single batch to the target peer. - return r.cfg.SendMessages(targetNode, announceMessages...) + return nil } // fetchChanPoint retrieves the original outpoint which is encoded within the @@ -1071,32 +693,11 @@ func (r *ChannelRouter) fetchChanPoint(chanID *lnwire.ChannelID) (*wire.OutPoint }, nil } -// routingMsg couples a routing related wire message with the peer that -// originally sent it. +// routingMsg couples a routing related routing topology update to the +// error channel. type routingMsg struct { - msg lnwire.Message - peer *btcec.PublicKey -} - -// ProcessRoutingMessage sends a new routing message along with the peer that -// sent the routing message to the ChannelRouter. The announcement will be -// processed then added to a queue for batched tickled announcement to all -// connected peers. -// -// TODO(roasbeef): need to move to discovery package -func (r *ChannelRouter) ProcessRoutingMessage(msg lnwire.Message, src *btcec.PublicKey) { - // TODO(roasbeef): msg wrappers to add a doneChan - - rMsg := &routingMsg{ - msg: msg, - peer: src, - } - - select { - case r.networkMsgs <- rMsg: - case <-r.quit: - return - } + msg interface{} + err chan error } // FindRoutes attempts to query the ChannelRouter for the all available paths @@ -1119,7 +720,7 @@ func (r *ChannelRouter) FindRoutes(target *btcec.PublicKey, amt btcutil.Amount) return nil, err } else if !exists { log.Debugf("Target %x is not in known graph", dest) - return nil, ErrTargetNotInNetwork + return nil, newErrf(ErrTargetNotInNetwork, "target not found") } // Now that we know the destination is reachable within the graph, @@ -1153,7 +754,8 @@ func (r *ChannelRouter) FindRoutes(target *btcec.PublicKey, amt btcutil.Amount) // If all our perspective routes were eliminating during the transition // from path to route, then we'll return an error to the caller if len(validRoutes) == 0 { - return nil, ErrNoPathFound + return nil, newErr(ErrNoPathFound, "unable to find a path to "+ + "destination") } // Finally, we'll sort the set of validate routes to optimize for @@ -1350,3 +952,84 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route // routes we've found, then return an error. return [32]byte{}, nil, sendError } + +// AddNode is used to add node to the topology of the router, after +// this node might be used in construction of payment path. +// NOTE: Part of the ChannelGraphSource interface. +func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error { + rMsg := &routingMsg{ + msg: node, + err: make(chan error, 1), + } + + select { + case r.networkUpdates <- rMsg: + return <-rMsg.err + case <-r.quit: + return errors.New("router has been shutted down") + } +} + +// AddEdge is used to add edge/channel to the topology of the router, +// after all information about channel will be gathered this +// edge/channel might be used in construction of payment path. +// NOTE: Part of the ChannelGraphSource interface. +func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error { + rMsg := &routingMsg{ + msg: edge, + err: make(chan error, 1), + } + + select { + case r.networkUpdates <- rMsg: + return <-rMsg.err + case <-r.quit: + return errors.New("router has been shutted down") + } +} + +// UpdateEdge is used to update edge information, without this +// message edge considered as not fully constructed. +// NOTE: Part of the ChannelGraphSource interface. +func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy) error { + rMsg := &routingMsg{ + msg: update, + err: make(chan error, 1), + } + + select { + case r.networkUpdates <- rMsg: + return <-rMsg.err + case <-r.quit: + return errors.New("router has been shutted down") + } +} + +// CurrentBlockHeight returns the block height from POV of the router subsystem. +// NOTE: Part of the ChannelGraphSource interface. +func (r *ChannelRouter) CurrentBlockHeight() (uint32, error) { + _, height, err := r.cfg.Chain.GetBestBlock() + return uint32(height), err +} + +// ForEachNode is used to iterate over every node in router topology. +// NOTE: Part of the ChannelGraphSource interface. +func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) error { + return r.cfg.Graph.ForEachNode(cb) +} + +// ForAllOutgoingChannels is used to iterate over all self channels info. +// NOTE: Part of the ChannelGraphSource interface. +func (r *ChannelRouter) ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgePolicy) error) error { + return r.selfNode.ForEachChannel(nil, func(_ *channeldb.ChannelEdgeInfo, + c *channeldb.ChannelEdgePolicy) error { + return cb(c) + }) +} + +// ForEachChannel is used to iterate over every channel in router topology. +// NOTE: Part of the ChannelGraphSource interface. +func (r *ChannelRouter) ForEachChannel(cb func(chanInfo *channeldb.ChannelEdgeInfo, + e1, e2 *channeldb.ChannelEdgePolicy) error) error { + return r.cfg.Graph.ForEachChannel(cb) +} diff --git a/routing/router_test.go b/routing/router_test.go index abd70a4f..96cbddeb 100644 --- a/routing/router_test.go +++ b/routing/router_test.go @@ -77,12 +77,6 @@ func createTestCtx(startingHeight uint32, testGraph ...string) (*testCtx, func() Graph: graph, Chain: chain, Notifier: notifier, - Broadcast: func(_ *btcec.PublicKey, msg ...lnwire.Message) error { - return nil - }, - SendMessages: func(_ *btcec.PublicKey, msg ...lnwire.Message) error { - return nil - }, SendToSwitch: func(_ *btcec.PublicKey, _ *lnwire.UpdateAddHTLC) ([32]byte, error) { return [32]byte{}, nil