multi: minor coding style and comment clean ups post-discovery merge

This commit implements some minor coding style, commenting and naming
clean up after the recent major discovery service was merged into the
codebase.

Highlights of the naming changes:
* fundingManager.SendToDiscovery -> SendAnnouncement
* discovery.Discovery -> discovery.AuthenticatedGossiper

The rest of the changes consist primary of grammar fixes and proper
column wrapping.
This commit is contained in:
Olaoluwa Osuntokun 2017-04-01 14:33:17 +02:00
parent 1894a208bd
commit ca053e5273
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
8 changed files with 425 additions and 339 deletions

View File

@ -6,6 +6,7 @@ import (
"sync/atomic"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
@ -15,9 +16,10 @@ import (
"github.com/roasbeef/btcutil"
)
// waitingProofKey is the proof key which uniquely identifies the
// announcement signature message. The goal of this key is distinguish the
// local and remote proof for the same channel id.
// waitingProofKey is the proof key which uniquely identifies the announcement
// signature message. The goal of this key is distinguish the local and remote
// proof for the same channel id.
//
// TODO(andrew.shvv) move to the channeldb package after waiting proof map
// becomes persistent.
type waitingProofKey struct {
@ -28,10 +30,12 @@ type waitingProofKey struct {
// networkMsg couples a routing related wire message with the peer that
// originally sent it.
type networkMsg struct {
msg lnwire.Message
peer *btcec.PublicKey
msg lnwire.Message
isRemote bool
peer *btcec.PublicKey
err chan error
err chan error
}
// syncRequest represents a request from an outside subsystem to the wallet to
@ -44,30 +48,31 @@ type syncRequest struct {
// configuration MUST be non-nil for the service to carry out its duties.
type Config struct {
// Router is the subsystem which is responsible for managing the
// topology of lightning network. After incoming channel, node,
// channel updates announcements are validated they are sent to the
// router in order to be included in the LN graph.
// topology of lightning network. After incoming channel, node, channel
// updates announcements are validated they are sent to the router in
// order to be included in the LN graph.
Router routing.ChannelGraphSource
// Notifier is used for receiving notifications of incoming blocks.
// With each new incoming block found we process previously premature
// announcements.
//
// TODO(roasbeef): could possibly just replace this with an epoch
// channel.
Notifier chainntnfs.ChainNotifier
// Broadcast broadcasts a particular set of announcements to all peers
// that the daemon is connected to. If supplied, the exclude parameter
// indicates that the target peer should be excluded from the broadcast.
// indicates that the target peer should be excluded from the
// broadcast.
Broadcast func(exclude *btcec.PublicKey, msg ...lnwire.Message) error
// SendToPeer is a function which allows the service to send a set of
// messages to a particular peer identified by the target public
// key.
// messages to a particular peer identified by the target public key.
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
// ProofMatureDelta the number of confirmations which is needed
// before exchange the channel announcement proofs.
// ProofMatureDelta the number of confirmations which is needed before
// exchange the channel announcement proofs.
ProofMatureDelta uint32
// TrickleDelay the period of trickle timer which flushing to the
@ -76,29 +81,33 @@ type Config struct {
TrickleDelay time.Duration
}
// New create new discovery service structure.
func New(cfg Config) (*Discovery, error) {
return &Discovery{
// New creates a new AuthenticatedGossiper instance, initialized with the
// passed configuration paramters.
func New(cfg Config) (*AuthenticatedGossiper, error) {
return &AuthenticatedGossiper{
cfg: &cfg,
networkMsgs: make(chan *networkMsg),
quit: make(chan bool),
quit: make(chan struct{}),
syncRequests: make(chan *syncRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
waitingProofs: make(map[waitingProofKey]*lnwire.AnnounceSignatures),
}, nil
}
// Discovery is a subsystem which is responsible for receiving announcements
// validate them and apply the changes to router, syncing lightning network
// with newly connected nodes, broadcasting announcements after validation,
// negotiating the channel announcement proofs exchange and handling the
// premature announcements.
type Discovery struct {
// Parameters which are needed to properly handle the start and stop
// of the service.
// AuthenticatedGossiper is a subsystem which is responsible for receiving
// announcements validate them and apply the changes to router, syncing
// lightning network with newly connected nodes, broadcasting announcements
// after validation, negotiating the channel announcement proofs exchange and
// handling the premature announcements. All outgoing announcements are
// expected to be properly signed as dictated in BOLT#7, additionally, all
// incoming message are expected to be well formed and signed. Invalid messages
// will be rejected by this struct.
type AuthenticatedGossiper struct {
// Parameters which are needed to properly handle the start and stop of
// the service.
started uint32
stopped uint32
quit chan bool
quit chan struct{}
wg sync.WaitGroup
// cfg is a copy of the configuration struct that the discovery service
@ -119,10 +128,12 @@ type Discovery struct {
// TODO(roasbeef): limit premature networkMsgs to N
prematureAnnouncements map[uint32][]*networkMsg
// waitingProofs is the map of proof announcement messages which were
// processed and waiting for opposite local or remote proof to be
// received in order to construct full proof, validate it and
// announce the channel.
// waitingProofs is a map of partial channel proof announcement
// messages. We use this map to buffer half of the material needed to
// reconstruct a full authenticated channel announcement. Once we
// receive the other half the channel proof, we'll be able to properly
// validate it an re-broadcast it out to the network.
//
// TODO(andrew.shvv) make this map persistent.
waitingProofs map[waitingProofKey]*lnwire.AnnounceSignatures
@ -141,60 +152,11 @@ type Discovery struct {
bestHeight uint32
}
// ProcessRemoteAnnouncement sends a new remote announcement message along with
// the peer that sent the routing message. The announcement will be processed then
// added to a queue for batched trickled announcement to all connected peers.
// Remote channel announcements should contain the announcement proof and be
// fully validated.
func (d *Discovery) ProcessRemoteAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) chan error {
nMsg := &networkMsg{
msg: msg,
isRemote: true,
peer: src,
err: make(chan error, 1),
}
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- errors.New("discovery has shut down")
}
return nMsg.err
}
// ProcessLocalAnnouncement sends a new remote announcement message along with
// the peer that sent the routing message. The announcement will be processed then
// added to a queue for batched trickled announcement to all connected peers.
// Local channel announcements not contain the announcement proof and should be
// fully validated. The channels proofs will be included farther if nodes agreed
// to announce this channel to the rest of the network.
func (d *Discovery) ProcessLocalAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) chan error {
nMsg := &networkMsg{
msg: msg,
isRemote: false,
peer: src,
err: make(chan error, 1),
}
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- errors.New("discovery has shut down")
}
return nMsg.err
}
// SynchronizeNode sends a message to the service indicating it should
// synchronize lightning topology state with the target node. This method
// is to be utilized when a node connections for the first time to provide it
// with the latest topology update state.
func (d *Discovery) SynchronizeNode(pub *btcec.PublicKey) {
// synchronize lightning topology state with the target node. This method is to
// be utilized when a node connections for the first time to provide it with
// the latest topology update state.
func (d *AuthenticatedGossiper) SynchronizeNode(pub *btcec.PublicKey) {
select {
case d.syncRequests <- &syncRequest{
node: pub,
@ -206,11 +168,13 @@ func (d *Discovery) SynchronizeNode(pub *btcec.PublicKey) {
// Start spawns network messages handler goroutine and registers on new block
// notifications in order to properly handle the premature announcements.
func (d *Discovery) Start() error {
func (d *AuthenticatedGossiper) Start() error {
if !atomic.CompareAndSwapUint32(&d.started, 0, 1) {
return nil
}
log.Info("Authenticated Gossiper is starting")
// First we register for new notifications of newly discovered blocks.
// We do this immediately so we'll later be able to consume any/all
// blocks which were discovered.
@ -229,28 +193,78 @@ func (d *Discovery) Start() error {
d.wg.Add(1)
go d.networkHandler()
log.Info("Discovery service is started")
return nil
}
// Stop signals any active goroutines for a graceful closure.
func (d *Discovery) Stop() {
func (d *AuthenticatedGossiper) Stop() {
if !atomic.CompareAndSwapUint32(&d.stopped, 0, 1) {
return
}
log.Info("Authenticated Gossiper is stopping")
close(d.quit)
d.wg.Wait()
log.Info("Discovery service is stoped.")
}
// networkHandler is the primary goroutine. The roles of this goroutine include
// answering queries related to the state of the network, syncing up newly
// connected peers, and also periodically broadcasting our latest topology state
// to all connected peers.
// ProcessRemoteAnnouncement sends a new remote announcement message along with
// the peer that sent the routing message. The announcement will be processed
// then added to a queue for batched trickled announcement to all connected
// peers. Remote channel announcements should contain the announcement proof
// and be fully validated.
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) chan error {
nMsg := &networkMsg{
msg: msg,
isRemote: true,
peer: src,
err: make(chan error, 1),
}
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- errors.New("discovery has shut down")
}
return nMsg.err
}
// ProcessLocalAnnouncement sends a new remote announcement message along with
// the peer that sent the routing message. The announcement will be processed
// then added to a queue for batched trickled announcement to all connected
// peers. Local channel announcements don't contain the announcement proof and
// will not be fully validated. Once the channel proofs are received, the
// entire channel announcement and update messages will be re-constructed and
// broadcast to the rest of the network.
func (d *AuthenticatedGossiper) ProcessLocalAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) chan error {
nMsg := &networkMsg{
msg: msg,
isRemote: false,
peer: src,
err: make(chan error, 1),
}
select {
case d.networkMsgs <- nMsg:
case <-d.quit:
nMsg.err <- errors.New("discovery has shut down")
}
return nMsg.err
}
// networkHandler is the primary goroutine that drives this service. The roles
// of this goroutine includes answering queries related to the state of the
// network, syncing up newly connected peers, and also periodically
// broadcasting our latest topology state to all connected peers.
//
// NOTE: This MUST be run as a goroutine.
func (d *Discovery) networkHandler() {
func (d *AuthenticatedGossiper) networkHandler() {
defer d.wg.Done()
var announcementBatch []lnwire.Message
@ -283,8 +297,8 @@ func (d *Discovery) networkHandler() {
)
}
// A new block has arrived, so we can re-process the
// previously premature announcements.
// A new block has arrived, so we can re-process the previously
// premature announcements.
case newBlock, ok := <-d.newBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
@ -354,26 +368,27 @@ func (d *Discovery) networkHandler() {
// Iterate over our channels and construct the
// announcements array.
err := d.cfg.Router.ForAllOutgoingChannels(
func(p *channeldb.ChannelEdgePolicy) error {
c := &lnwire.ChannelUpdateAnnouncement{
Signature: p.Signature,
ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID),
Timestamp: uint32(p.LastUpdate.Unix()),
Flags: p.Flags,
TimeLockDelta: p.TimeLockDelta,
HtlcMinimumMsat: uint32(p.MinHTLC),
FeeBaseMsat: uint32(p.FeeBaseMSat),
FeeProportionalMillionths: uint32(p.FeeProportionalMillionths),
}
selfChans = append(selfChans, c)
return nil
})
err := d.cfg.Router.ForAllOutgoingChannels(func(p *channeldb.ChannelEdgePolicy) error {
c := &lnwire.ChannelUpdateAnnouncement{
Signature: p.Signature,
ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID),
Timestamp: uint32(p.LastUpdate.Unix()),
Flags: p.Flags,
TimeLockDelta: p.TimeLockDelta,
HtlcMinimumMsat: uint32(p.MinHTLC),
FeeBaseMsat: uint32(p.FeeBaseMSat),
FeeProportionalMillionths: uint32(p.FeeProportionalMillionths),
}
selfChans = append(selfChans, c)
return nil
},
)
if err != nil {
log.Errorf("unable to iterate over chann"+
"els: %v", err)
log.Errorf("unable to retrieve outgoing channels: %v", err)
continue
} else if len(selfChans) == 0 {
}
if len(selfChans) == 0 {
continue
}
@ -381,8 +396,8 @@ func (d *Discovery) networkHandler() {
len(selfChans))
// With all the wire announcements properly crafted,
// we'll broadcast our known outgoing channel to all our
// immediate peers.
// we'll broadcast our known outgoing channel to all
// our immediate peers.
if err := d.cfg.Broadcast(nil, selfChans...); err != nil {
log.Errorf("unable to re-broadcast "+
"channels: %v", err)
@ -390,18 +405,18 @@ func (d *Discovery) networkHandler() {
// We've just received a new request to synchronize a peer with
// our latest lightning network topology state. This indicates
// that a peer has just connected for the first time, so for now
// we dump our entire network graph and allow them to sift
// that a peer has just connected for the first time, so for
// now we dump our entire network graph and allow them to sift
// through the (subjectively) new information on their own.
case syncReq := <-d.syncRequests:
nodePub := syncReq.node.SerializeCompressed()
if err := d.synchronize(syncReq); err != nil {
if err := d.synchronizeWithNode(syncReq); err != nil {
log.Errorf("unable to sync graph state with %x: %v",
nodePub, err)
}
// The discovery has been signalled to exit, to we exit our main
// loop so the wait group can be decremented.
// The discovery has been signalled to exit, to we exit our
// main loop so the wait group can be decremented.
case <-d.quit:
return
}
@ -411,15 +426,17 @@ func (d *Discovery) networkHandler() {
// processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid,
// or redundant, then nil is returned. Otherwise, the set of announcements
// will be returned which should be broadcasted to the rest of the network.
func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message {
var announcements []lnwire.Message
// or redundant, then nil is returned. Otherwise, the set of announcements will
// be returned which should be broadcasted to the rest of the network.
func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message {
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
return chanID.BlockHeight+delta > d.bestHeight
}
var announcements []lnwire.Message
switch msg := nMsg.msg.(type) {
// A new node announcement has arrived which either presents a new
// node, or a node updating previously advertised information.
case *lnwire.NodeAnnouncement:
@ -443,23 +460,22 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
}
if err := d.cfg.Router.AddNode(node); err != nil {
e := errors.Errorf("unable to add node: %v", err)
if routing.IsError(err,
routing.ErrOutdated,
routing.ErrIgnored) {
log.Info(e)
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) {
log.Info(err)
} else {
log.Error(e)
log.Error(err)
}
nMsg.err <- e
nMsg.err <- err
return nil
}
// Node announcement was successfully proceeded and know it
// might be broadcasted to other connected nodes.
// might be broadcast to other connected nodes.
announcements = append(announcements, msg)
nMsg.err <- nil
// TODO(roasbeef): get rid of the above
return announcements
// A new channel announcement has arrived, this indicates the
@ -472,9 +488,9 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
// to be fully verified once we advance forward in the chain.
if isPremature(msg.ShortChannelID, 0) {
blockHeight := msg.ShortChannelID.BlockHeight
log.Infof("Announcement for chan_id=(%v), is "+
"premature: advertises height %v, only height "+
"%v is known", msg.ShortChannelID, msg.ShortChannelID.BlockHeight,
log.Infof("Announcement for chan_id=(%v), is premature: "+
"advertises height %v, only height %v is known",
msg.ShortChannelID, msg.ShortChannelID.BlockHeight,
d.bestHeight)
d.prematureAnnouncements[blockHeight] = append(
@ -484,16 +500,23 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
return nil
}
// If this is a remote channel annoucement, then we'll validate
// all the signatues within the proof as it should be well
// formed.
var proof *channeldb.ChannelAuthProof
if nMsg.isRemote {
if err := d.validateChannelAnn(msg); err != nil {
err := errors.Errorf("unable to validate "+
"announcement: %v", err)
log.Error(err)
nMsg.err <- err
return nil
}
// If the proof checks out, then we'll save the proof
// itself to the database so we can fetch it later when
// gossiping with other nodes.
proof = &channeldb.ChannelAuthProof{
NodeSig1: msg.NodeSig1,
NodeSig2: msg.NodeSig2,
@ -502,6 +525,8 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
}
}
// With the proof validate (if necessary), we can now store it
// within the database for our path finding and syncing needs.
edge := &channeldb.ChannelEdgeInfo{
ChannelID: msg.ShortChannelID.ToUint64(),
NodeKey1: msg.NodeID1,
@ -510,22 +535,21 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
BitcoinKey2: msg.BitcoinKey2,
AuthProof: proof,
}
if err := d.cfg.Router.AddEdge(edge); err != nil {
e := errors.Errorf("unable to add edge: %v", err)
if routing.IsError(err,
routing.ErrOutdated,
if routing.IsError(err, routing.ErrOutdated,
routing.ErrIgnored) {
log.Info(e)
log.Info(err)
} else {
log.Error(e)
log.Error(err)
}
nMsg.err <- e
nMsg.err <- err
return nil
}
// Channel announcement was successfully proceeded and know it
// might be broadcasted to other connected nodes if it was
// might be broadcast to other connected nodes if it was
// announcement with proof (remote).
if proof != nil {
announcements = append(announcements, msg)
@ -534,7 +558,7 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
nMsg.err <- nil
return announcements
// A new authenticated channel edges has arrived, this indicates
// A new authenticated channel edge update has arrived. This indicates
// that the directional information for an already known channel has
// been updated.
case *lnwire.ChannelUpdateAnnouncement:
@ -546,7 +570,7 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
// to be fully verified once we advance forward in the chain.
if isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+
"shortChanID=(%v), is premature: advertises "+
"short_chan_id(%v), is premature: advertises "+
"height %v, only height %v is known",
shortChanID, blockHeight, d.bestHeight)
@ -557,18 +581,20 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
return nil
}
// Get the node pub key as far as we don't have it in
// channel update announcement message and verify
// message signature.
// Get the node pub key as far as we don't have it in channel
// update announcement message. We'll need this to properly
// verify message signature.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil {
err := errors.Errorf("unable to validate "+
"channel update shortChanID=%v: %v",
"channel update short_chan_id=%v: %v",
shortChanID, err)
nMsg.err <- err
return nil
}
// The flag on the channel update announcement tells us "which"
// side of the channels directed edge is being updated.
var pubKey *btcec.PublicKey
switch msg.Flags {
case 0:
@ -577,9 +603,14 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
pubKey = chanInfo.NodeKey2
}
// Validate the channel announcement with the expected public
// key, In the case of an invalid channel , we'll return an
// error to the caller and exit early.
if err := d.validateChannelUpdateAnn(pubKey, msg); err != nil {
err := errors.Errorf("unable to validate channel"+
"update announcement for shortChanID=%v: %v", msg.ShortChannelID, err)
err := errors.Errorf("unable to validate channel "+
"update announcement for short_chan_id=%v: %v",
spew.Sdump(msg.ShortChannelID), err)
log.Error(err)
nMsg.err <- err
return nil
@ -598,23 +629,20 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
}
if err := d.cfg.Router.UpdateEdge(update); err != nil {
e := errors.Errorf("unable to update edge: %v", err)
if routing.IsError(err,
routing.ErrOutdated,
routing.ErrIgnored) {
log.Info(e)
if routing.IsError(err, routing.ErrOutdated, routing.ErrIgnored) {
log.Info(err)
} else {
log.Error(e)
log.Error(err)
}
nMsg.err <- e
nMsg.err <- err
return nil
}
// Channel update announcement was successfully proceeded and
// know it might be broadcasted to other connected nodes.
// We should announce the edge to rest of the network only
// if channel has the authentication proof.
// Channel update announcement was successfully processed and
// now it can be broadcast to the rest of the network. However,
// we'll only broadcast the channel update announcement if it
// has an attached authentication proof.
if chanInfo.AuthProof != nil {
announcements = append(announcements, msg)
}
@ -622,40 +650,45 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
nMsg.err <- nil
return announcements
// New signature announcement received which indicates willingness
// of the parties (to exchange the channel signatures / announce newly
// created channel).
// A new signature announcement has been received. This indicates
// willingness of nodes involved in the funding of a channel to
// announce this new channel to the rest of the world.
case *lnwire.AnnounceSignatures:
needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta
shortChanID := msg.ShortChannelID.ToUint64()
prefix := "local"
if nMsg.isRemote {
prefix = "remote"
}
// By the specification proof should be sent after some number of
// confirmations after channel was registered in bitcoin
// blockchain. So we should check that proof is premature and
// if not send it to the be proceeded again. This allows us to
// be tolerant to other clients if this constraint was changed.
log.Infof("Received new channel announcement: %v",
spew.Sdump(msg))
// By the specification, channel announcement proofs should be
// sent after some number of confirmations after channel was
// registered in bitcoin blockchain. Therefore, we check if the
// proof is premature. If so we'll halt processing until the
// expected announcement height. This allows us to be tolerant
// to other clients if this constraint was changed.
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
d.prematureAnnouncements[needBlockHeight] = append(
d.prematureAnnouncements[needBlockHeight],
nMsg,
)
log.Infof("Premature proof annoucement, "+
log.Infof("Premature proof announcement, "+
"current block height lower than needed: %v <"+
" %v, add announcement to reprocessing batch",
d.bestHeight, needBlockHeight)
return nil
}
// Check that we have channel with such channel id in out
// lightning network topology.
// Ensure that we know of a channel with the target channel ID
// before proceeding further.
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil {
err := errors.Errorf("unable to process channel "+
"%v proof with shortChanID=%v: %v", prefix,
"%v proof with short_chan_id=%v: %v", prefix,
shortChanID, err)
nMsg.err <- err
return nil
@ -666,32 +699,32 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
chanInfo.NodeKey2.SerializeCompressed())
// Check that channel that was retrieved belongs to the peer
// which sent the proof announcement, otherwise the proof for
// might be rewritten by the any lightning network node.
// Ensure that channel that was retrieved belongs to the peer
// which sent the proof announcement.
if !(isFirstNode || isSecondNode) {
err := errors.Errorf("channel that was received not "+
"belongs to the peer which sent the proof, "+
"shortChanID=%v", shortChanID)
"short_chan_id=%v", shortChanID)
log.Error(err)
nMsg.err <- err
return nil
}
// Check that we received the opposite proof, if so, than we
// should construct the full proof, and create the channel
// Check that we received the opposite proof. If so, then we're
// now able to construct the full proof, and create the channel
// announcement. If we didn't receive the opposite half of the
// proof than we should store it this one, and wait for opposite
// to be received.
// proof than we should store it this one, and wait for
// opposite to be received.
oppositeKey := newProofKey(chanInfo.ChannelID, !nMsg.isRemote)
oppositeProof, ok := d.waitingProofs[oppositeKey]
if !ok {
key := newProofKey(chanInfo.ChannelID, nMsg.isRemote)
d.waitingProofs[key] = msg
// If proof was send from funding manager than we
// should send the announce signature message to
// remote side.
// If proof was sent b a local sub-system, then we'll
// send the announcement signature to the remote node
// so they can also reconstruct the full channel
// announcement.
if !nMsg.isRemote {
// Check that first node of the channel info
// corresponds to us.
@ -705,24 +738,27 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
err := d.cfg.SendToPeer(remotePeer, msg)
if err != nil {
log.Errorf("unable to send "+
"announcement message to "+
"peer: %x",
"announcement message to peer: %x",
remotePeer.SerializeCompressed())
}
log.Infof("Send channel announcement proof "+
"for shortChanID=%v to remote peer: "+
"%x", shortChanID, remotePeer.SerializeCompressed())
log.Infof("Sent channel announcement proof "+
"for short_chan_id=%v to remote peer: "+
"%x", shortChanID,
remotePeer.SerializeCompressed())
}
log.Infof("Incoming %v proof announcement for "+
"shortChanID=%v have been proceeded and waiting for opposite proof",
prefix, shortChanID)
log.Infof("1/2 of channel ann proof received for "+
"short_chan_id=%v, waiting for other half",
shortChanID)
nMsg.err <- nil
return nil
}
// If we now have both halves of the channel announcement
// proof, then we'll reconstruct the initial announcement so we
// can validate it shortly below.
var dbProof channeldb.ChannelAuthProof
if isFirstNode {
dbProof.NodeSig1 = msg.NodeSignature
@ -735,13 +771,15 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
dbProof.BitcoinSig1 = oppositeProof.BitcoinSignature
dbProof.BitcoinSig2 = msg.BitcoinSignature
}
chanAnn, e1Ann, e2Ann := createChanAnnouncement(&dbProof, chanInfo, e1, e2)
// With all the necessary components assembled validate the
// full channel announcement proof.
if err := d.validateChannelAnn(chanAnn); err != nil {
err := errors.Errorf("channel announcement proof "+
"for shortChanID=%v isn't valid: %v",
"for short_chan_id=%v isn't valid: %v",
shortChanID, err)
log.Error(err)
nMsg.err <- err
return nil
@ -749,11 +787,11 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
// If the channel was returned by the router it means that
// existence of funding point and inclusion of nodes bitcoin
// keys in it already checked by the router. On this stage we
// should check that node keys are corresponds to the bitcoin
// keys by validating the signatures of announcement.
// If proof is valid than we should populate the channel
// edge with it, so we can announce it on peer connect.
// keys in it already checked by the router. In this stage we
// should check that node keys are attest to the bitcoin keys
// by validating the signatures of announcement. If proof is
// valid then we'll populate the channel edge with it, so we
// can announce it on peer connect.
err = d.cfg.Router.AddProof(msg.ShortChannelID, &dbProof)
if err != nil {
err := errors.Errorf("unable add proof to the "+
@ -765,10 +803,12 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
// Proof was successfully created and now can announce the
// channel to the remain network.
log.Infof("Incoming %v proof announcement for shortChanID=%v"+
" have been proceeded, adding channel announcement in"+
" the broadcasting batch", prefix, shortChanID)
log.Infof("Fully valid channel proof for short_chan_id=%v "+
"constructed, adding to next ann batch",
shortChanID)
// Assemble the necessary announcements to add to the next
// broadcasting batch.
announcements = append(announcements, chanAnn)
if e1Ann != nil {
announcements = append(announcements, e1Ann)
@ -777,6 +817,9 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
announcements = append(announcements, e2Ann)
}
// If this a local announcement, then we'll send it to the
// remote side so they can reconstruct the full channel
// announcement proof.
if !nMsg.isRemote {
var remotePeer *btcec.PublicKey
if isFirstNode {
@ -784,8 +827,8 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
} else {
remotePeer = chanInfo.NodeKey1
}
err = d.cfg.SendToPeer(remotePeer, msg)
if err != nil {
if err = d.cfg.SendToPeer(remotePeer, msg); err != nil {
log.Errorf("unable to send announcement "+
"message to peer: %x",
remotePeer.SerializeCompressed())
@ -801,12 +844,12 @@ func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Messag
}
}
// synchronize attempts to synchronize the target node in the syncReq to
// the latest channel graph state. In order to accomplish this, (currently) the
// entire network graph is read from disk, then serialized to the format
// synchronizeWithNode attempts to synchronize the target node in the syncReq
// to the latest channel graph state. In order to accomplish this, (currently)
// the entire network graph is read from disk, then serialized to the format
// defined within the current wire protocol. This cache of graph data is then
// sent directly to the target node.
func (d *Discovery) synchronize(syncReq *syncRequest) error {
func (d *AuthenticatedGossiper) synchronizeWithNode(syncReq *syncRequest) error {
targetNode := syncReq.node
// TODO(roasbeef): need to also store sig data in db
@ -873,7 +916,7 @@ func (d *Discovery) synchronize(syncReq *syncRequest) error {
}
log.Infof("Syncing channel graph state with %x, sending %v "+
"nodes and %v infos", targetNode.SerializeCompressed(),
"vertexes and %v edges", targetNode.SerializeCompressed(),
numNodes, numEdges)
// With all the announcement messages gathered, send them all in a

View File

@ -351,7 +351,7 @@ func createRemoteChannelAnnouncement(blockHeight uint32) (*lnwire.ChannelAnnounc
}
type testCtx struct {
discovery *Discovery
discovery *AuthenticatedGossiper
router *mockGraphSource
notifier *mockNotifier
broadcastedMessage chan lnwire.Message
@ -559,7 +559,8 @@ func TestPrematureAnnouncement(t *testing.T) {
}
}
// TestSignatureAnnouncement....
// TestSignatureAnnouncement ensures that the AuthenticatedGossiper properly
// processes partial and fully announcement signatures message.
func TestSignatureAnnouncement(t *testing.T) {
ctx, cleanup, err := createTestCtx(proofMatureDelta)
if err != nil {
@ -575,8 +576,8 @@ func TestSignatureAnnouncement(t *testing.T) {
localKey := batch.nodeAnn1.NodeID
remoteKey := batch.nodeAnn2.NodeID
// Recreate lightning network topology. Initialize router with
// channel between two nodes.
// Recreate lightning network topology. Initialize router with channel
// between two nodes.
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)

View File

@ -35,17 +35,19 @@ func (k waitingProofKey) ToBytes() []byte {
return key[:]
}
// createChanAnnouncement helper function which creates the channel announcement
// by the given channeldb objects.
// createChanAnnouncement is a helper function which creates all channel
// announcements given the necessary channel related database items. This
// function is used to transform out databse structs into the coresponding wire
// sturcts for announcing new channels to other peers, or simply syncing up a
// peer's initial routing table upon connect.
func createChanAnnouncement(chanProof *channeldb.ChannelAuthProof,
chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) (
*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdateAnnouncement,
*lnwire.ChannelUpdateAnnouncement) {
// First, using the parameters of the channel, along with the
// channel authentication chanProof, we'll create re-create the
// original authenticated channel announcement.
e1, e2 *channeldb.ChannelEdgePolicy) (*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdateAnnouncement, *lnwire.ChannelUpdateAnnouncement) {
// First, using the parameters of the channel, along with the channel
// authentication chanProof, we'll create re-create the original
// authenticated channel announcement.
chanID := lnwire.NewShortChanIDFromInt(chanInfo.ChannelID)
chanAnn := &lnwire.ChannelAnnouncement{
NodeSig1: chanProof.NodeSig1,
@ -59,13 +61,13 @@ func createChanAnnouncement(chanProof *channeldb.ChannelAuthProof,
BitcoinKey2: chanInfo.BitcoinKey2,
}
// We'll unconditionally queue the channel's existence chanProof as
// it will need to be processed before either of the channel
// update networkMsgs.
// We'll unconditionally queue the channel's existence chanProof as it
// will need to be processed before either of the channel update
// networkMsgs.
// Since it's up to a node's policy as to whether they
// advertise the edge in dire direction, we don't create an
// advertisement if the edge is nil.
// Since it's up to a node's policy as to whether they advertise the
// edge in dire direction, we don't create an advertisement if the edge
// is nil.
var edge1Ann, edge2Ann *lnwire.ChannelUpdateAnnouncement
if e1 != nil {
edge1Ann = &lnwire.ChannelUpdateAnnouncement{
@ -95,10 +97,8 @@ func createChanAnnouncement(chanProof *channeldb.ChannelAuthProof,
return chanAnn, edge1Ann, edge2Ann
}
// copyPubKey is copying the public key and setting curve.
// NOTE: At the moment of creation the function was need only because we are
// setting the curve to nil in the read message function and in order to
// properly validate the signatures we need to set the curve again.
// copyPubKey performs a copy of the target public key, setting a fresh curve
// parameter during the process.
func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
return &btcec.PublicKey{
Curve: btcec.S256(),
@ -107,12 +107,16 @@ func copyPubKey(pub *btcec.PublicKey) *btcec.PublicKey {
}
}
// SignAnnouncement helper function which is used for signing the announce
// messages.
// SignAnnouncement is a helper function which is used to sign any outgoing
// channel node node announcement messages.
func SignAnnouncement(signer *lnwallet.MessageSigner,
msg lnwire.Message) (*btcec.Signature, error) {
var data []byte
var err error
var (
data []byte
err error
)
switch m := msg.(type) {
case *lnwire.ChannelAnnouncement:
data, err = m.DataToSign()
@ -125,7 +129,7 @@ func SignAnnouncement(signer *lnwallet.MessageSigner,
"of this format")
}
if err != nil {
return nil, errors.Errorf("can't get data to sign: %v", err)
return nil, errors.Errorf("unable to get data to sign: %v", err)
}
return signer.SignData(data)

View File

@ -8,75 +8,81 @@ import (
)
// validateChannelAnn validates the channel announcement message and checks
// that node signatures contains the announcement message, and that the
// bitcoin signatures contains the node keys.
func (d *Discovery) validateChannelAnn(a *lnwire.ChannelAnnouncement) error {
// that node signatures covers the announcement message, and that the bitcoin
// signatures covers the node keys.
func (d *AuthenticatedGossiper) validateChannelAnn(a *lnwire.ChannelAnnouncement) error {
// First we'll verify that the passed bitcoin key signature is indeed a
// signature over the digest of the node signature.
sigHash := chainhash.DoubleHashB(a.NodeID1.SerializeCompressed())
if !a.BitcoinSig1.Verify(sigHash, copyPubKey(a.BitcoinKey1)) {
return errors.New("can't verify first bitcoin signature")
}
// If that checks out, then we'll verify that the second bitcoin
// signature is a valid signature of the bitcoin public key over the
// second node signature.
sigHash = chainhash.DoubleHashB(a.NodeID2.SerializeCompressed())
if !a.BitcoinSig2.Verify(sigHash, copyPubKey(a.BitcoinKey2)) {
return errors.New("can't verify second bitcoin signature")
}
// Get the data of announcement which should be encapsulated in
// signature and then check it.
// With the first two bitcoin signatures verified, we'll reconstruct
// the original digest of the channel announcement message.
data, err := a.DataToSign()
if err != nil {
return err
}
dataHash := chainhash.DoubleHashB(data)
// Both node signatures attached should indeed be a valid signature
// over the selected digest of the channel announcement signature.
if !a.NodeSig1.Verify(dataHash, copyPubKey(a.NodeID1)) {
return errors.New("can't verify data in first node signature")
}
if !a.NodeSig2.Verify(dataHash, copyPubKey(a.NodeID2)) {
return errors.New("can't verify data in second node signature")
}
return nil
}
// validateNodeAnn validates the node announcement by checking that the
// the signature corresponds to the node key and have been created with
// announcement data.
func (d *Discovery) validateNodeAnn(a *lnwire.NodeAnnouncement) error {
// Get the data of announcement which should be encapsulated in
// signature and then check it.
// validateNodeAnn validates the node announcement by ensuring that the
// attached signature is needed a signature of the node announcement under the
// specified node public key.
func (d *AuthenticatedGossiper) validateNodeAnn(a *lnwire.NodeAnnouncement) error {
// Reconstruct the data of announcement which should be covered by the
// signature so we can verify the signature shortly below
data, err := a.DataToSign()
if err != nil {
return err
}
// Finally ensure that the passed signature is valid, if not we'll
// return an error so this node announcement can be rejected.
dataHash := chainhash.DoubleHashB(data)
if !a.Signature.Verify(dataHash, copyPubKey(a.NodeID)) {
return errors.New("can't check the node annoucement signature")
return errors.New("signature on node announcement is invalid!")
}
return nil
}
// validateChannelUpdateAnn validates the channel update announcement by
// checking that the the signature corresponds to the node key and have been
// created with announcement data.
func (d *Discovery) validateChannelUpdateAnn(pubKey *btcec.PublicKey,
// checking that the included signature covers he announcement and has been
// signed by the node's private key.
func (d *AuthenticatedGossiper) validateChannelUpdateAnn(pubKey *btcec.PublicKey,
a *lnwire.ChannelUpdateAnnouncement) error {
// Get the data of announcement which should be encapsulated in
// signature and then check it.
data, err := a.DataToSign()
if err != nil {
return errors.Errorf("can't retrieve data to sign: %v", err)
return errors.Errorf("unable to reconstruct message: %v", err)
}
dataHash := chainhash.DoubleHashB(data)
if !a.Signature.Verify(dataHash, copyPubKey(pubKey)) {
return errors.Errorf("verification of channel updates "+
"failed chan_id=%v", a.ShortChannelID)
"failed chan_id=%v", a.ShortChannelID.ToUint64())
}
return nil

View File

@ -140,18 +140,22 @@ type fundingConfig struct {
// so that the channel creation process can be completed.
Notifier chainntnfs.ChainNotifier
// SignNodeKey is used to generate signature for node public key with
// funding keys.
// SignNodeKey is used to generate a signature of a given node identity
// public key signed under the passed node funding key. This function
// closure is used to generate one half the channel proof which attests
// the target nodeKey is indeed in control of the channel funding key
// in question.
SignNodeKey func(nodeKey, fundingKey *btcec.PublicKey) (*btcec.Signature, error)
// SignAnnouncement is used to generate the signatures for channel
// update, and node announcements, and also to generate the proof for
// the channel announcement.
// the channel announcements. The key used to generate this signature
// is the identity public key of the running daemon.
SignAnnouncement func(msg lnwire.Message) (*btcec.Signature, error)
// SendToDiscovery is used by the FundingManager to announce newly created
// channels to the rest of the Lightning Network.
SendToDiscovery func(msg lnwire.Message) error
// SendAnnouncement is used by the FundingManager to announce newly
// created channels to the rest of the Lightning Network.
SendAnnouncement func(msg lnwire.Message) error
// SendToPeer allows the FundingManager to send messages to the peer
// node during the multiple steps involved in the creation of the
@ -884,8 +888,7 @@ func (f *fundingManager) waitForFundingConfirmation(
}
fundingPoint := *completeChan.FundingOutpoint
fndgLog.Infof("ChannelPoint(%v) is now active",
fundingPoint)
fndgLog.Infof("ChannelPoint(%v) is now active", fundingPoint)
completeChan.IsPending = false
err := f.cfg.Wallet.ChannelDB.MarkChannelAsOpen(&fundingPoint)
@ -913,7 +916,6 @@ func (f *fundingManager) waitForFundingConfirmation(
fndgLog.Errorf("Unable to find peer: %v", err)
return
}
newChanDone := make(chan struct{})
newChanMsg := &newChannelMsg{
channel: channel,
@ -974,6 +976,7 @@ func (f *fundingManager) waitForFundingConfirmation(
// the funding workflow.
func (f *fundingManager) processFundingLocked(msg *lnwire.FundingLocked,
peerAddress *lnwire.NetAddress) {
f.fundingMsgs <- &fundingLockedMsg{msg, peerAddress}
}
@ -987,9 +990,8 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
return
}
// Register the new link with the L3 routing manager so this
// new channel can be utilized during path
// finding.
// Register the new link with the L3 routing manager so this new
// channel can be utilized during path finding.
go f.announceChannel(f.cfg.IDKey, fmsg.peerAddress.IdentityKey,
channel.LocalFundingKey, channel.RemoteFundingKey,
fmsg.msg.ChannelID, fundingPoint)
@ -1073,28 +1075,37 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
FeeProportionalMillionths: 0,
}
// With the channel update announcement constructed, we'll generate a
// signature that signs a double-sha digest of the announcement.
// This'll serve to authenticate this announcement and other Other
// future updates we may send.
chanUpdateAnn.Signature, err = f.cfg.SignAnnouncement(chanUpdateAnn)
if err != nil {
return nil, errors.Errorf("unable to generate channel "+
"update announcement signature: %v", err)
}
// Channel proof should be announced in the separate message, so we
// already have the bitcoin signature but in order to construct
// the proof we also need node signature. Use message signer in order
// to sign the message with node private key.
// The channel existence proofs itself is currently announced in
// distinct message. In order to properly authenticate this message, we
// need two signatures: one under the identity public key used which
// signs the message itself and another signature of the identity
// public key under the funding key itself.
// TODO(roasbeef): need to revisit, ensure signatures are signed
// properly
nodeSig, err := f.cfg.SignAnnouncement(chanAnn)
if err != nil {
return nil, errors.Errorf("unable to generate node "+
"signature for channel announcement: %v", err)
}
bitcoinSig, err := f.cfg.SignNodeKey(localPubKey, localFundingKey)
if err != nil {
return nil, errors.Errorf("unable to generate bitcoin "+
"signature for node public key: %v", err)
}
// Finally, we'll generate the announcement proof which we'll use to
// provide the other side with the necessary signatures required to
// allow them to reconstruct the full channel announcement.
proof := &lnwire.AnnounceSignatures{
ChannelID: chanPoint,
ShortChannelID: chanID,
@ -1110,12 +1121,12 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
}
// announceChannel announces a newly created channel to the rest of the network
// by crafting the two authenticated announcements required for the peers on the
// network to recognize the legitimacy of the channel. The crafted
// announcements are then send to the channel router to handle broadcasting to
// by crafting the two authenticated announcements required for the peers on
// the network to recognize the legitimacy of the channel. The crafted
// announcements are then sent to the channel router to handle broadcasting to
// the network during its next trickle.
func (f *fundingManager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
localFundingKey, remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID,
func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKey,
remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID,
chanPoint wire.OutPoint) {
ann, err := f.newChanAnnouncement(localIDKey, remoteIDKey, localFundingKey,
@ -1125,12 +1136,12 @@ func (f *fundingManager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKe
return
}
fndgLog.Infof("Send channel, channel update, and proof announcements"+
" for chanID=%v, shortChannelID=%v to discovery service",
chanPoint, chanID.ToUint64())
f.cfg.SendToDiscovery(ann.chanAnn)
f.cfg.SendToDiscovery(ann.chanUpdateAnn)
f.cfg.SendToDiscovery(ann.chanProof)
fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", chanPoint,
spew.Sdump(chanID))
f.cfg.SendAnnouncement(ann.chanAnn)
f.cfg.SendAnnouncement(ann.chanUpdateAnn)
f.cfg.SendAnnouncement(ann.chanProof)
}
// initFundingWorkflow sends a message to the funding manager instructing it
@ -1211,7 +1222,8 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
return
}
fndgLog.Infof("Starting funding workflow with for pendingID(%v)", chanID)
fndgLog.Infof("Starting funding workflow with %v for pendingID(%v)",
msg.peerAddress.Address, chanID)
// TODO(roasbeef): add FundingRequestFromContribution func
// TODO(roasbeef): need to set fee/kb
@ -1284,7 +1296,7 @@ func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) {
return
}
fndgLog.Errorf("Received funding error from %v: %v",
fndgLog.Errorf("Received funding error from %x: %v",
peerKey.SerializeCompressed(), newLogClosure(func() string {
return spew.Sdump(e)
}),

View File

@ -220,9 +220,10 @@ func NewFundingSigner(wallet *BtcWallet) *FundingSigner {
}
}
// SignData sign given data with the private key which corresponds to
// the given public key.
// This is a part of the DataSigner interface.
// SignData sign given data with the private key which corresponds to the given
// public key.
//
// NOTE: This is a part of the DataSigner interface.
func (s *FundingSigner) SignData(data []byte,
pubKey *btcec.PublicKey) (*btcec.Signature, error) {

View File

@ -20,8 +20,8 @@ import (
"github.com/lightningnetwork/lightning-onion"
)
// ChannelGraphSource represent the source of information about the
// topology of lightning network, it responsible for addition of nodes, edges
// ChannelGraphSource represent the source of information about the topology of
// lightning network, it responsible for addition of nodes, edges
// and applying edges updates, return the current block with with out
// topology is synchronized.
type ChannelGraphSource interface {
@ -34,15 +34,17 @@ type ChannelGraphSource interface {
// edge/channel might be used in construction of payment path.
AddEdge(edge *channeldb.ChannelEdgeInfo) error
// AddProof updates the channel edge info with proof which is needed
// to properly announce the edge to the rest of the network.
// AddProof updates the channel edge info with proof which is needed to
// properly announce the edge to the rest of the network.
AddProof(chanID lnwire.ShortChannelID, proof *channeldb.ChannelAuthProof) error
// UpdateEdge is used to update edge information, without this
// message edge considered as not fully constructed.
// UpdateEdge is used to update edge information, without this message
// edge considered as not fully constructed.
UpdateEdge(policy *channeldb.ChannelEdgePolicy) error
// ForAllOutgoingChannels is used to iterate over all self channels info.
// ForAllOutgoingChannels is used to iterate over all channels
// eminating from the "source" node which is the center of the
// star-graph.
ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgePolicy) error) error
// CurrentBlockHeight returns the block height from POV of the router
@ -53,11 +55,11 @@ type ChannelGraphSource interface {
GetChannelByID(chanID lnwire.ShortChannelID) (*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy, *channeldb.ChannelEdgePolicy, error)
// ForEachNode is used to iterate over every node in router topology.
// ForEachNode is used to iterate over every node in the known graph.
ForEachNode(func(node *channeldb.LightningNode) error) error
// ForEachChannel is used to iterate over every channel in router
// topology.
// ForEachChannel is used to iterate over every channel in the known
// graph.
ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error) error
}
@ -497,9 +499,9 @@ func (r *ChannelRouter) networkHandler() {
}
// processUpdate processes a new relate authenticated channel/edge, node or
// channel/edge update network update. If the update didn't affect the
// internal state of the draft due to either being out of date, invalid, or
// redundant, then error is returned.
// channel/edge update network update. If the update didn't affect the internal
// state of the draft due to either being out of date, invalid, or redundant,
// then error is returned.
func (r *ChannelRouter) processUpdate(msg interface{}) error {
var invalidateCache bool
@ -528,7 +530,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}) error {
}
if err := r.cfg.Graph.AddLightningNode(msg); err != nil {
return errors.Errorf("unable to add msg %v to the "+
return errors.Errorf("unable to add node %v to the "+
"graph: %v", msg.PubKey.SerializeCompressed(), err)
}
@ -978,9 +980,10 @@ func (r *ChannelRouter) SendPayment(payment *LightningPayment) ([32]byte, *Route
return [32]byte{}, nil, sendError
}
// AddNode is used to add node to the topology of the router, after
// this node might be used in construction of payment path.
// NOTE: Part of the ChannelGraphSource interface.
// AddNode is used to add node to the topology of the router, after this node
// might be used in construction of payment path.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error {
rMsg := &routingMsg{
msg: node,
@ -995,10 +998,11 @@ func (r *ChannelRouter) AddNode(node *channeldb.LightningNode) error {
}
}
// AddEdge is used to add edge/channel to the topology of the router,
// after all information about channel will be gathered this
// AddEdge is used to add edge/channel to the topology of the router, after all
// information about channel will be gathered this
// edge/channel might be used in construction of payment path.
// NOTE: Part of the ChannelGraphSource interface.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
rMsg := &routingMsg{
msg: edge,
@ -1013,9 +1017,10 @@ func (r *ChannelRouter) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
}
}
// UpdateEdge is used to update edge information, without this
// message edge considered as not fully constructed.
// NOTE: Part of the ChannelGraphSource interface.
// UpdateEdge is used to update edge information, without this message edge
// considered as not fully constructed.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy) error {
rMsg := &routingMsg{
msg: update,
@ -1031,46 +1036,57 @@ func (r *ChannelRouter) UpdateEdge(update *channeldb.ChannelEdgePolicy) error {
}
// CurrentBlockHeight returns the block height from POV of the router subsystem.
// NOTE: Part of the ChannelGraphSource interface.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) CurrentBlockHeight() (uint32, error) {
_, height, err := r.cfg.Chain.GetBestBlock()
return uint32(height), err
}
// GetChannelByID return the channel by the channel id.
// NOTE: Part of the Router interface.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) GetChannelByID(chanID lnwire.ShortChannelID) (
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy,
*channeldb.ChannelEdgePolicy, error) {
return r.cfg.Graph.FetchChannelEdgesByID(chanID.ToUint64())
}
// ForEachNode is used to iterate over every node in router topology.
// NOTE: Part of the ChannelGraphSource interface.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) ForEachNode(cb func(*channeldb.LightningNode) error) error {
return r.cfg.Graph.ForEachNode(cb)
}
// ForAllOutgoingChannels is used to iterate over all self channels info.
// NOTE: Part of the ChannelGraphSource interface.
// ForAllOutgoingChannels is used to iterate over all outgiong channel owned by
// the router.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) ForAllOutgoingChannels(cb func(c *channeldb.ChannelEdgePolicy) error) error {
return r.selfNode.ForEachChannel(nil, func(_ *channeldb.ChannelEdgeInfo,
c *channeldb.ChannelEdgePolicy) error {
return cb(c)
})
}
// ForEachChannel is used to iterate over every channel in router topology.
// NOTE: Part of the ChannelGraphSource interface.
// ForEachChannel is used to iterate over every known edge (channel) within our
// view of the channel graph.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) ForEachChannel(cb func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error) error {
return r.cfg.Graph.ForEachChannel(cb)
}
// AddProof updates the channel edge info with proof which is needed
// to properly announce the edge to the rest of the network.
// NOTE: Part of the Router interface.
// AddProof updates the channel edge info with proof which is needed to
// properly announce the edge to the rest of the network.
//
// NOTE: This method is part of the ChannelGraphSource interface.
func (r *ChannelRouter) AddProof(chanID lnwire.ShortChannelID,
proof *channeldb.ChannelAuthProof) error {

View File

@ -62,7 +62,7 @@ type server struct {
chanRouter *routing.ChannelRouter
discoverSrv *discovery.Discovery
discoverSrv *discovery.AuthenticatedGossiper
utxoNursery *utxoNursery
@ -187,17 +187,20 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
Features: globalFeatures,
}
// Initialize graph with authenticated lightning node, signature is
// needed in order to be able to announce node to other network.
// Initialize graph with authenticated lightning node. We need to
// generate a valid signature in order for other nodes on the network
// to accept our announcement.
messageSigner := lnwallet.NewMessageSigner(s.identityPriv)
if self.AuthSig, err = discovery.SignAnnouncement(messageSigner,
self.AuthSig, err = discovery.SignAnnouncement(messageSigner,
&lnwire.NodeAnnouncement{
Timestamp: uint32(self.LastUpdate.Unix()),
Addresses: self.Addresses,
NodeID: self.PubKey,
Alias: alias,
Features: self.Features,
}); err != nil {
},
)
if err != nil {
return nil, fmt.Errorf("unable to generate signature for "+
"self node announcement: %v", err)
}
@ -247,14 +250,14 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
Notifier: s.chainNotifier,
SignNodeKey: func(nodeKey,
fundingKey *btcec.PublicKey) (*btcec.Signature, error) {
data := nodeKey.SerializeCompressed()
return fundingSigner.SignData(data, fundingKey)
},
SignAnnouncement: func(msg lnwire.Message) (*btcec.Signature,
error) {
SignAnnouncement: func(msg lnwire.Message) (*btcec.Signature, error) {
return discovery.SignAnnouncement(messageSigner, msg)
},
SendToDiscovery: func(msg lnwire.Message) error {
SendAnnouncement: func(msg lnwire.Message) error {
s.discoverSrv.ProcessLocalAnnouncement(msg,
s.identityPriv.PubKey())
return nil
@ -383,17 +386,17 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
}
srvrLog.Debugf("Attempting persistent connection to "+
"channel peer %v", lnAddr)
// Send the persistent connection request to
// the connection manager, saving the request
// itself so we can cancel/restart the process
// as needed.
// Send the persistent connection request to the
// connection manager, saving the request itself so we
// can cancel/restart the process as needed.
connReq := &connmgr.ConnReq{
Addr: lnAddr,
Permanent: true,
}
s.persistentConnReqs[pubStr] =
append(s.persistentConnReqs[pubStr], connReq)
s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr],
connReq)
go s.connMgr.Connect(connReq)
}
}