discovery+funding: add 'AnnounceSignature' proof exchange

Add the interaction between nodes of announce signature messages, which
will allow us to exhcnage the half channel announcemen proofs later.
This commit is contained in:
Andrey Samokhvalov 2017-03-28 22:08:14 +03:00 committed by Olaoluwa Osuntokun
parent 07076cce21
commit fbf766e3c6
6 changed files with 891 additions and 289 deletions

View File

@ -5,6 +5,8 @@ import (
"sync/atomic"
"time"
"bytes"
"encoding/hex"
"github.com/go-errors/errors"
@ -16,12 +18,23 @@ import (
"github.com/roasbeef/btcutil"
)
// waitingProofKey is the proof key which uniquely identifies the
// announcement signature message. The goal of this key is distinguish the
// local and remote proof for the same channel id.
// TODO(andrew.shvv) move to the channeldb package after waiting proof map
// becomes persistent.
type waitingProofKey struct {
chanID uint64
isRemote bool
}
// networkMsg couples a routing related wire message with the peer that
// originally sent it.
type networkMsg struct {
msg lnwire.Message
isRemote bool
peer *btcec.PublicKey
err chan error
}
// syncRequest represents a request from an outside subsystem to the wallet to
@ -51,10 +64,19 @@ type Config struct {
// indicates that the target peer should be excluded from the broadcast.
Broadcast func(exclude *btcec.PublicKey, msg ...lnwire.Message) error
// SendMessages is a function which allows the service to send a set of
// SendToPeer is a function which allows the service to send a set of
// messages to a particular peer identified by the target public
// key.
SendMessages func(target *btcec.PublicKey, msg ...lnwire.Message) error
SendToPeer func(target *btcec.PublicKey, msg ...lnwire.Message) error
// ProofMatureDelta the number of confirmations which is needed
// before exchange the channel announcement proofs.
ProofMatureDelta uint32
// TrickleDelay the period of trickle timer which flushing to the
// network the pending batch of new announcements we've received since
// the last trickle tick.
TrickleDelay time.Duration
}
// New create new discovery service structure.
@ -79,6 +101,7 @@ func New(cfg Config) (*Discovery, error) {
quit: make(chan bool),
syncRequests: make(chan *syncRequest),
prematureAnnouncements: make(map[uint32][]*networkMsg),
waitingProofs: make(map[waitingProofKey]*lnwire.AnnounceSignatures),
fakeSig: fakeSig,
}, nil
}
@ -104,16 +127,23 @@ type Discovery struct {
// the main chain are sent over.
newBlocks <-chan *chainntnfs.BlockEpoch
// prematureAnnouncements maps a blockheight to a set of announcements
// which are "premature" from our PoV. An message is premature if
// it claims to be anchored in a block which is beyond the current main
// chain tip as we know it. Premature network messages will be processed
// once the chain tip as we know it extends to/past the premature
// height.
// prematureAnnouncements maps a block height to a set of network
// messages which are "premature" from our PoV. An message is premature
// if it claims to be anchored in a block which is beyond the current
// main chain tip as we know it. Premature network messages will be
// processed once the chain tip as we know it extends to/past the
// premature height.
//
// TODO(roasbeef): limit premature networkMsgs to N
prematureAnnouncements map[uint32][]*networkMsg
// waitingProofs is the map of proof announcement messages which were
// processed and waiting for opposite local or remote proof to be
// received in order to construct full proof, validate it and
// announce the channel.
// TODO(andrew.shvv) make this map persistent.
waitingProofs map[waitingProofKey]*lnwire.AnnounceSignatures
// networkMsgs is a channel that carries new network broadcasted
// message from outside the discovery service to be processed by the
// networkHandler.
@ -137,20 +167,22 @@ type Discovery struct {
// Remote channel announcements should contain the announcement proof and be
// fully validated.
func (d *Discovery) ProcessRemoteAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) error {
src *btcec.PublicKey) chan error {
aMsg := &networkMsg{
nMsg := &networkMsg{
msg: msg,
isRemote: true,
peer: src,
err: make(chan error, 1),
}
select {
case d.networkMsgs <- aMsg:
return nil
case d.networkMsgs <- nMsg:
case <-d.quit:
return errors.New("discovery has been shutted down")
nMsg.err <- errors.New("discovery has shut down")
}
return nMsg.err
}
// ProcessLocalAnnouncement sends a new remote announcement message along with
@ -160,20 +192,22 @@ func (d *Discovery) ProcessRemoteAnnouncement(msg lnwire.Message,
// fully validated. The channels proofs will be included farther if nodes agreed
// to announce this channel to the rest of the network.
func (d *Discovery) ProcessLocalAnnouncement(msg lnwire.Message,
src *btcec.PublicKey) error {
src *btcec.PublicKey) chan error {
aMsg := &networkMsg{
nMsg := &networkMsg{
msg: msg,
isRemote: false,
peer: src,
err: make(chan error, 1),
}
select {
case d.networkMsgs <- aMsg:
return nil
case d.networkMsgs <- nMsg:
case <-d.quit:
return errors.New("discovery has been shutted down")
nMsg.err <- errors.New("discovery has shut down")
}
return nMsg.err
}
// SynchronizeNode sends a message to the service indicating it should
@ -246,7 +280,7 @@ func (d *Discovery) networkHandler() {
defer retransmitTimer.Stop()
// TODO(roasbeef): parametrize the above
trickleTimer := time.NewTicker(time.Millisecond * 300)
trickleTimer := time.NewTicker(d.cfg.TrickleDelay)
defer trickleTimer.Stop()
for {
@ -254,18 +288,18 @@ func (d *Discovery) networkHandler() {
case announcement := <-d.networkMsgs:
// Process the network announcement to determine if
// this is either a new announcement from our PoV or an
// updates to a prior vertex/edge we previously
// accepted.
accepted := d.processNetworkAnnouncement(announcement)
// edges to a prior vertex/edge we previously
// proceeded.
emittedAnnouncements := d.processNetworkAnnouncement(announcement)
// If the updates was accepted, then add it to our next
// announcement batch to be broadcast once the trickle
// timer ticks gain.
if accepted {
// If the announcement was accepted, then add the
// emitted announcements to our announce batch to be
// broadcast once the trickle timer ticks gain.
if emittedAnnouncements != nil {
// TODO(roasbeef): exclude peer that sent
announcementBatch = append(
announcementBatch,
announcement.msg,
emittedAnnouncements...,
)
}
@ -294,12 +328,11 @@ func (d *Discovery) networkHandler() {
}
for _, ann := range prematureAnns {
accepted := d.processNetworkAnnouncement(ann)
if accepted {
emittedAnnouncements := d.processNetworkAnnouncement(ann)
if emittedAnnouncements != nil {
announcementBatch = append(
announcementBatch,
ann.msg,
emittedAnnouncements...,
)
}
}
@ -309,7 +342,7 @@ func (d *Discovery) networkHandler() {
// flush to the network the pending batch of new announcements
// we've received since the last trickle tick.
case <-trickleTimer.C:
// If the current announcement batch is nil, then we
// If the current announcements batch is nil, then we
// have no further work here.
if len(announcementBatch) == 0 {
continue
@ -322,7 +355,8 @@ func (d *Discovery) networkHandler() {
// them to all our immediately connected peers.
err := d.cfg.Broadcast(nil, announcementBatch...)
if err != nil {
log.Errorf("unable to send batch announcement: %v", err)
log.Errorf("unable to send batch "+
"announcements: %v", err)
continue
}
@ -343,7 +377,7 @@ func (d *Discovery) networkHandler() {
err := d.cfg.Router.ForAllOutgoingChannels(
func(p *channeldb.ChannelEdgePolicy) error {
c := &lnwire.ChannelUpdateAnnouncement{
Signature: d.fakeSig,
Signature: p.Signature,
ShortChannelID: lnwire.NewShortChanIDFromInt(p.ChannelID),
Timestamp: uint32(p.LastUpdate.Unix()),
Flags: p.Flags,
@ -381,8 +415,6 @@ func (d *Discovery) networkHandler() {
// through the (subjectively) new information on their own.
case syncReq := <-d.syncRequests:
nodePub := syncReq.node.SerializeCompressed()
log.Infof("Synchronizing channel graph with %x", nodePub)
if err := d.synchronize(syncReq); err != nil {
log.Errorf("unable to sync graph state with %x: %v",
nodePub, err)
@ -397,23 +429,22 @@ func (d *Discovery) networkHandler() {
}
// processNetworkAnnouncement processes a new network relate authenticated
// channel or node announcement. If the updates didn't affect the internal state
// of the draft due to either being out of date, invalid, or redundant, then
// false is returned. Otherwise, true is returned indicating that the caller
// may want to batch this request to be broadcast to immediate peers during the
// next announcement epoch.
func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool {
isPremature := func(chanID *lnwire.ShortChannelID) bool {
return chanID.BlockHeight > d.bestHeight
// channel or node announcement or announcements proofs. If the announcement
// didn't affect the internal state due to either being out of date, invalid,
// or redundant, then nil is returned. Otherwise, the set of announcements
// will be returned which should be broadcasted to the rest of the network.
func (d *Discovery) processNetworkAnnouncement(nMsg *networkMsg) []lnwire.Message {
var announcements []lnwire.Message
isPremature := func(chanID lnwire.ShortChannelID, delta uint32) bool {
return chanID.BlockHeight+delta > d.bestHeight
}
switch msg := aMsg.msg.(type) {
switch msg := nMsg.msg.(type) {
// A new node announcement has arrived which either presents a new
// node, or a node updating previously advertised information.
case *lnwire.NodeAnnouncement:
if aMsg.isRemote {
// TODO(andrew.shvv) Add node validation
if nMsg.isRemote {
// TODO(andrew.shvv) add validation
}
node := &channeldb.LightningNode{
@ -426,10 +457,25 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool {
}
if err := d.cfg.Router.AddNode(node); err != nil {
log.Errorf("unable to add node: %v", err)
return false
e := errors.Errorf("unable to add node: %v", err)
if routing.IsError(err,
routing.ErrOutdated,
routing.ErrIgnored) {
log.Info(e)
} else {
log.Error(e)
}
nMsg.err <- e
return nil
}
// Node announcement was successfully proceeded and know it
// might be broadcasted to other connected nodes.
announcements = append(announcements, msg)
nMsg.err <- nil
return announcements
// A new channel announcement has arrived, this indicates the
// *creation* of a new channel within the network. This only advertises
// the existence of a channel and not yet the routing policies in
@ -438,7 +484,7 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool {
// If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
if isPremature(&msg.ShortChannelID) {
if isPremature(msg.ShortChannelID, 0) {
blockHeight := msg.ShortChannelID.BlockHeight
log.Infof("Announcement for chan_id=(%v), is "+
"premature: advertises height %v, only height "+
@ -447,21 +493,21 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool {
d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight],
aMsg,
nMsg,
)
return false
return nil
}
var proof *channeldb.ChannelAuthProof
if aMsg.isRemote {
// TODO(andrew.shvv) Add channel validation
}
if nMsg.isRemote {
// TODO(andrew.shvv) Add validation
proof = &channeldb.ChannelAuthProof{
NodeSig1: msg.NodeSig1,
NodeSig2: msg.NodeSig2,
BitcoinSig1: msg.BitcoinSig1,
BitcoinSig2: msg.BitcoinSig2,
proof = &channeldb.ChannelAuthProof{
NodeSig1: msg.NodeSig1,
NodeSig2: msg.NodeSig2,
BitcoinSig1: msg.BitcoinSig1,
BitcoinSig2: msg.BitcoinSig2,
}
}
edge := &channeldb.ChannelEdgeInfo{
@ -474,45 +520,69 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool {
}
if err := d.cfg.Router.AddEdge(edge); err != nil {
if !routing.IsError(err, routing.ErrOutdated) {
log.Errorf("unable to add edge: %v", err)
e := errors.Errorf("unable to add edge: %v", err)
if routing.IsError(err,
routing.ErrOutdated,
routing.ErrIgnored) {
log.Info(e)
} else {
log.Info("Unable to add edge: %v", err)
log.Error(e)
}
return false
nMsg.err <- e
return nil
}
// A new authenticated channel updates has arrived, this indicates
// Channel announcement was successfully proceeded and know it
// might be broadcasted to other connected nodes if it was
// announcement with proof (remote).
if proof != nil {
announcements = append(announcements, msg)
}
nMsg.err <- nil
return announcements
// A new authenticated channel edges has arrived, this indicates
// that the directional information for an already known channel has
// been updated.
case *lnwire.ChannelUpdateAnnouncement:
chanID := msg.ShortChannelID.ToUint64()
blockHeight := msg.ShortChannelID.BlockHeight
shortChanID := msg.ShortChannelID.ToUint64()
// If the advertised inclusionary block is beyond our knowledge
// of the chain tip, then we'll put the announcement in limbo
// to be fully verified once we advance forward in the chain.
if isPremature(&msg.ShortChannelID) {
blockHeight := msg.ShortChannelID.BlockHeight
log.Infof("Update announcement for chan_id=(%v), is "+
"premature: advertises height %v, only height "+
"%v is known", chanID, blockHeight,
d.bestHeight)
if isPremature(msg.ShortChannelID, 0) {
log.Infof("Update announcement for "+
"shortChanID=(%v), is premature: advertises "+
"height %v, only height %v is known",
shortChanID, blockHeight, d.bestHeight)
d.prematureAnnouncements[blockHeight] = append(
d.prematureAnnouncements[blockHeight],
aMsg,
nMsg,
)
return false
return nil
}
if aMsg.isRemote {
// TODO(andrew.shvv) Add update channel validation
// Get the node pub key as far as we don't have it in
// channel update announcement message and verify
// message signature.
chanInfo, _, _, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil {
err := errors.Errorf("unable to validate "+
"channel update shortChanID=%v: %v",
shortChanID, err)
nMsg.err <- err
return nil
}
// TODO(andrew.shvv) Add validation
// TODO(roasbeef): should be msat here
update := &channeldb.ChannelEdgePolicy{
ChannelID: chanID,
Signature: msg.Signature,
ChannelID: shortChanID,
LastUpdate: time.Unix(int64(msg.Timestamp), 0),
Flags: msg.Flags,
TimeLockDelta: msg.TimeLockDelta,
@ -522,12 +592,200 @@ func (d *Discovery) processNetworkAnnouncement(aMsg *networkMsg) bool {
}
if err := d.cfg.Router.UpdateEdge(update); err != nil {
log.Errorf("unable to update edge: %v", err)
return false
}
}
e := errors.Errorf("unable to update edge: %v", err)
if routing.IsError(err,
routing.ErrOutdated,
routing.ErrIgnored) {
log.Info(e)
} else {
log.Error(e)
}
return true
nMsg.err <- e
return nil
}
// Channel update announcement was successfully proceeded and
// know it might be broadcasted to other connected nodes.
// We should announce the edge to rest of the network only
// if channel has the authentication proof.
if chanInfo.AuthProof != nil {
announcements = append(announcements, msg)
}
nMsg.err <- nil
return announcements
// New signature announcement received which indicates willingness
// of the parties (to exchange the channel signatures / announce newly
// created channel).
case *lnwire.AnnounceSignatures:
needBlockHeight := msg.ShortChannelID.BlockHeight + d.cfg.ProofMatureDelta
shortChanID := msg.ShortChannelID.ToUint64()
prefix := "local"
if nMsg.isRemote {
prefix = "remote"
}
// By the specification proof should be sent after some number of
// confirmations after channel was registered in bitcoin
// blockchain. So we should check that proof is premature and
// if not send it to the be proceeded again. This allows us to
// be tolerant to other clients if this constraint was changed.
if isPremature(msg.ShortChannelID, d.cfg.ProofMatureDelta) {
d.prematureAnnouncements[needBlockHeight] = append(
d.prematureAnnouncements[needBlockHeight],
nMsg,
)
log.Infof("Premature proof annoucement, "+
"current block height lower than needed: %v <"+
" %v, add announcement to reprocessing batch",
d.bestHeight, needBlockHeight)
return nil
}
// Check that we have channel with such channel id in out
// lightning network topology.
chanInfo, e1, e2, err := d.cfg.Router.GetChannelByID(msg.ShortChannelID)
if err != nil {
err := errors.Errorf("unable to process channel "+
"%v proof with shortChanID=%v: %v", prefix,
shortChanID, err)
nMsg.err <- err
return nil
}
isFirstNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
chanInfo.NodeKey1.SerializeCompressed())
isSecondNode := bytes.Equal(nMsg.peer.SerializeCompressed(),
chanInfo.NodeKey2.SerializeCompressed())
// Check that channel that was retrieved belongs to the peer
// which sent the proof announcement, otherwise the proof for
// might be rewritten by the any lightning network node.
if !(isFirstNode || isSecondNode) {
err := errors.Errorf("channel that was received not "+
"belongs to the peer which sent the proof, "+
"shortChanID=%v", shortChanID)
log.Error(err)
nMsg.err <- err
return nil
}
// Check that we received the opposite proof, if so, than we
// should construct the full proof, and create the channel
// announcement. If we didn't receive the opposite half of the
// proof than we should store it this one, and wait for opposite
// to be received.
oppositeKey := newProofKey(chanInfo.ChannelID, !nMsg.isRemote)
oppositeProof, ok := d.waitingProofs[oppositeKey]
if !ok {
key := newProofKey(chanInfo.ChannelID, nMsg.isRemote)
d.waitingProofs[key] = msg
// If proof was send from funding manager than we
// should send the announce signature message to
// remote side.
if !nMsg.isRemote {
// Check that first node of the channel info
// corresponds to us.
var remotePeer *btcec.PublicKey
if isFirstNode {
remotePeer = chanInfo.NodeKey2
} else {
remotePeer = chanInfo.NodeKey1
}
err := d.cfg.SendToPeer(remotePeer, msg)
if err != nil {
log.Errorf("unable to send "+
"announcement message to "+
"peer: %x",
remotePeer.SerializeCompressed())
}
log.Infof("Send channel announcement proof "+
"for shortChanID=%v to remote peer: "+
"%x", shortChanID, remotePeer.SerializeCompressed())
}
log.Infof("Incoming %v proof announcement for "+
"shortChanID=%v have been proceeded and waiting for opposite proof",
prefix, shortChanID)
nMsg.err <- nil
return nil
}
var dbProof channeldb.ChannelAuthProof
if isFirstNode {
dbProof.NodeSig1 = msg.NodeSignature
dbProof.NodeSig2 = oppositeProof.NodeSignature
dbProof.BitcoinSig1 = msg.BitcoinSignature
dbProof.BitcoinSig2 = oppositeProof.BitcoinSignature
} else {
dbProof.NodeSig1 = oppositeProof.NodeSignature
dbProof.NodeSig2 = msg.NodeSignature
dbProof.BitcoinSig1 = oppositeProof.BitcoinSignature
dbProof.BitcoinSig2 = msg.BitcoinSignature
}
chanAnn, e1Ann, e2Ann := createChanAnnouncement(&dbProof, chanInfo, e1, e2)
// TODO(andrew.shvv) Add validation
// If the channel was returned by the router it means that
// existence of funding point and inclusion of nodes bitcoin
// keys in it already checked by the router. On this stage we
// should check that node keys are corresponds to the bitcoin
// keys by validating the signatures of announcement.
// If proof is valid than we should populate the channel
// edge with it, so we can announce it on peer connect.
err = d.cfg.Router.AddProof(msg.ShortChannelID, &dbProof)
if err != nil {
err := errors.Errorf("unable add proof to the "+
"channel chanID=%v: %v", msg.ChannelID, err)
log.Error(err)
nMsg.err <- err
return nil
}
// Proof was successfully created and now can announce the
// channel to the remain network.
log.Infof("Incoming %v proof announcement for shortChanID=%v"+
" have been proceeded, adding channel announcement in"+
" the broadcasting batch", prefix, shortChanID)
announcements = append(announcements, chanAnn)
if e1Ann != nil {
announcements = append(announcements, e1Ann)
}
if e2Ann != nil {
announcements = append(announcements, e2Ann)
}
if !nMsg.isRemote {
var remotePeer *btcec.PublicKey
if isFirstNode {
remotePeer = chanInfo.NodeKey2
} else {
remotePeer = chanInfo.NodeKey1
}
err = d.cfg.SendToPeer(remotePeer, msg)
if err != nil {
log.Errorf("unable to send announcement "+
"message to peer: %x",
remotePeer.SerializeCompressed())
}
}
nMsg.err <- nil
return announcements
default:
nMsg.err <- errors.New("wrong type of the announcement")
return nil
}
}
// synchronize attempts to synchronize the target node in the syncReq to
@ -555,7 +813,7 @@ func (d *Discovery) synchronize(syncReq *syncRequest) error {
}
ann := &lnwire.NodeAnnouncement{
Signature: d.fakeSig,
Signature: node.AuthSig,
Timestamp: uint32(node.LastUpdate.Unix()),
Addresses: node.Addresses,
NodeID: node.PubKey,
@ -573,71 +831,39 @@ func (d *Discovery) synchronize(syncReq *syncRequest) error {
// With the vertexes gathered, we'll no retrieve the initial
// announcement, as well as the latest channel update announcement for
// both of the directed edges that make up the channel.
// both of the directed infos that make up the channel.
var numEdges uint32
if err := d.cfg.Router.ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) error {
chanID := lnwire.NewShortChanIDFromInt(chanInfo.ChannelID)
// First, using the parameters of the channel, along with the
// channel authentication proof, we'll create re-create the
// original authenticated channel announcement.
// TODO(andrew.shvv) skip if proof is nil
authProof := chanInfo.AuthProof
chanAnn := &lnwire.ChannelAnnouncement{
NodeSig1: authProof.NodeSig1,
NodeSig2: authProof.NodeSig2,
ShortChannelID: chanID,
BitcoinSig1: authProof.BitcoinSig1,
BitcoinSig2: authProof.BitcoinSig2,
NodeID1: chanInfo.NodeKey1,
NodeID2: chanInfo.NodeKey2,
BitcoinKey1: chanInfo.BitcoinKey1,
BitcoinKey2: chanInfo.BitcoinKey2,
}
announceMessages = append(announceMessages, chanAnn)
if chanInfo.AuthProof != nil {
chanAnn, e1Ann, e2Ann := createChanAnnouncement(
chanInfo.AuthProof, chanInfo, e1, e2)
// Since it's up to a node's policy as to whether they
// advertise the edge in dire direction, we don't create an
// advertisement if the edge is nil.
if e1 != nil {
announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{
Signature: d.fakeSig,
ShortChannelID: chanID,
Timestamp: uint32(e1.LastUpdate.Unix()),
Flags: 0,
TimeLockDelta: e1.TimeLockDelta,
HtlcMinimumMsat: uint32(e1.MinHTLC),
FeeBaseMsat: uint32(e1.FeeBaseMSat),
FeeProportionalMillionths: uint32(e1.FeeProportionalMillionths),
})
}
if e2 != nil {
announceMessages = append(announceMessages, &lnwire.ChannelUpdateAnnouncement{
Signature: d.fakeSig,
ShortChannelID: chanID,
Timestamp: uint32(e2.LastUpdate.Unix()),
Flags: 1,
TimeLockDelta: e2.TimeLockDelta,
HtlcMinimumMsat: uint32(e2.MinHTLC),
FeeBaseMsat: uint32(e2.FeeBaseMSat),
FeeProportionalMillionths: uint32(e2.FeeProportionalMillionths),
})
announceMessages = append(announceMessages, chanAnn)
if e1Ann != nil {
announceMessages = append(announceMessages, e1Ann)
}
if e2Ann != nil {
announceMessages = append(announceMessages, e2Ann)
}
numEdges++
}
numEdges++
return nil
}); err != nil && err != channeldb.ErrGraphNoEdgesFound {
log.Errorf("unable to sync edges w/ peer: %v", err)
log.Errorf("unable to sync infos with peer: %v", err)
return err
}
log.Infof("Syncing channel graph state with %x, sending %v "+
"nodes and %v edges", targetNode.SerializeCompressed(),
"nodes and %v infos", targetNode.SerializeCompressed(),
numNodes, numEdges)
// With all the announcement messages gathered, send them all in a
// single batch to the target peer.
return d.cfg.SendMessages(targetNode, announceMessages...)
return d.cfg.SendToPeer(targetNode, announceMessages...)
}

View File

@ -13,6 +13,7 @@ import (
"time"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
@ -33,18 +34,39 @@ var (
}
_, _ = testSig.R.SetString("63724406601629180062774974542967536251589935445068131219452686511677818569431", 10)
_, _ = testSig.S.SetString("18801056069249825825291287104931333862866033135609736119018462340006816851118", 10)
inputStr = "147caa76786596590baa4e98f5d9f48b86c7765e489f7a6ff3360fe5c674360b"
sha, _ = chainhash.NewHashFromStr(inputStr)
outpoint = wire.NewOutPoint(sha, 0)
bitcoinKeyPriv1, _ = btcec.NewPrivateKey(btcec.S256())
bitcoinKeyPub1 = bitcoinKeyPriv1.PubKey()
nodeKeyPriv1, _ = btcec.NewPrivateKey(btcec.S256())
nodeKeyPub1 = nodeKeyPriv1.PubKey()
bitcoinKeyPriv2, _ = btcec.NewPrivateKey(btcec.S256())
bitcoinKeyPub2 = bitcoinKeyPriv2.PubKey()
nodeKeyPriv2, _ = btcec.NewPrivateKey(btcec.S256())
nodeKeyPub2 = nodeKeyPriv2.PubKey()
trickleDelay = time.Millisecond * 300
proofMatureDelta uint32
)
type mockGraphSource struct {
nodes []*channeldb.LightningNode
edges []*channeldb.ChannelEdgeInfo
updates []*channeldb.ChannelEdgePolicy
infos map[uint64]*channeldb.ChannelEdgeInfo
edges map[uint64][]*channeldb.ChannelEdgePolicy
bestHeight uint32
}
func newMockRouter(height uint32) *mockGraphSource {
return &mockGraphSource{
bestHeight: height,
infos: make(map[uint64]*channeldb.ChannelEdgeInfo),
edges: make(map[uint64][]*channeldb.ChannelEdgePolicy),
}
}
@ -55,13 +77,19 @@ func (r *mockGraphSource) AddNode(node *channeldb.LightningNode) error {
return nil
}
func (r *mockGraphSource) AddEdge(edge *channeldb.ChannelEdgeInfo) error {
r.edges = append(r.edges, edge)
func (r *mockGraphSource) AddEdge(info *channeldb.ChannelEdgeInfo) error {
if _, ok := r.infos[info.ChannelID]; ok {
return errors.New("info already exist")
}
r.infos[info.ChannelID] = info
return nil
}
func (r *mockGraphSource) UpdateEdge(policy *channeldb.ChannelEdgePolicy) error {
r.updates = append(r.updates, policy)
func (r *mockGraphSource) UpdateEdge(edge *channeldb.ChannelEdgePolicy) error {
r.edges[edge.ChannelID] = append(
r.edges[edge.ChannelID],
edge,
)
return nil
}
@ -91,6 +119,28 @@ func (r *mockGraphSource) ForEachChannel(func(chanInfo *channeldb.ChannelEdgeInf
return nil
}
func (r *mockGraphSource) GetChannelByID(chanID lnwire.ShortChannelID) (
*channeldb.ChannelEdgeInfo,
*channeldb.ChannelEdgePolicy,
*channeldb.ChannelEdgePolicy, error) {
chanInfo, ok := r.infos[chanID.ToUint64()]
if !ok {
return nil, nil, nil, errors.New("can't find channel info")
}
edges := r.edges[chanID.ToUint64()]
if len(edges) == 0 {
return chanInfo, nil, nil, nil
}
if len(edges) == 1 {
return chanInfo, edges[0], nil, nil
}
return chanInfo, edges[0], edges[1], nil
}
type mockNotifier struct {
clientCounter uint32
epochClients map[uint32]chan *chainntnfs.BlockEpoch
@ -149,31 +199,87 @@ func (m *mockNotifier) Stop() error {
return nil
}
func createNodeAnnouncement() (*lnwire.NodeAnnouncement,
type annBatch struct {
nodeAnn1 *lnwire.NodeAnnouncement
nodeAnn2 *lnwire.NodeAnnouncement
localChanAnn *lnwire.ChannelAnnouncement
remoteChanAnn *lnwire.ChannelAnnouncement
chanUpdAnn *lnwire.ChannelUpdateAnnouncement
localProofAnn *lnwire.AnnounceSignatures
remoteProofAnn *lnwire.AnnounceSignatures
}
func createAnnouncements(blockHeight uint32) (*annBatch, error) {
var err error
var batch annBatch
batch.nodeAnn1, err = createNodeAnnouncement(nodeKeyPriv1)
if err != nil {
return nil, err
}
batch.nodeAnn2, err = createNodeAnnouncement(nodeKeyPriv2)
if err != nil {
return nil, err
}
batch.remoteChanAnn, err = createRemoteChannelAnnouncement(blockHeight)
if err != nil {
return nil, err
}
batch.localProofAnn = &lnwire.AnnounceSignatures{
NodeSignature: batch.remoteChanAnn.NodeSig1,
BitcoinSignature: batch.remoteChanAnn.BitcoinSig1,
}
batch.remoteProofAnn = &lnwire.AnnounceSignatures{
NodeSignature: batch.remoteChanAnn.NodeSig2,
BitcoinSignature: batch.remoteChanAnn.BitcoinSig2,
}
batch.localChanAnn, err = createRemoteChannelAnnouncement(blockHeight)
if err != nil {
return nil, err
}
batch.localChanAnn.BitcoinSig1 = nil
batch.localChanAnn.BitcoinSig2 = nil
batch.localChanAnn.NodeSig1 = nil
batch.localChanAnn.NodeSig2 = nil
batch.chanUpdAnn, err = createUpdateAnnouncement(blockHeight)
if err != nil {
return nil, err
}
return &batch, nil
}
func createNodeAnnouncement(priv *btcec.PrivateKey) (*lnwire.NodeAnnouncement,
error) {
priv, err := btcec.NewPrivateKey(btcec.S256())
alias, err := lnwire.NewAlias("kek" + string(priv.Serialize()))
if err != nil {
return nil, err
}
pub := priv.PubKey().SerializeCompressed()
alias, err := lnwire.NewAlias("kek" + string(pub[:]))
if err != nil {
return nil, err
}
return &lnwire.NodeAnnouncement{
a := &lnwire.NodeAnnouncement{
Signature: testSig,
Timestamp: uint32(prand.Int31()),
Addresses: testAddrs,
NodeID: priv.PubKey(),
Alias: alias,
Features: testFeatures,
}, nil
}
return a, nil
}
func createUpdateAnnouncement(blockHeight uint32) *lnwire.ChannelUpdateAnnouncement {
return &lnwire.ChannelUpdateAnnouncement{
func createUpdateAnnouncement(blockHeight uint32) (*lnwire.ChannelUpdateAnnouncement,
error) {
a := &lnwire.ChannelUpdateAnnouncement{
Signature: testSig,
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: blockHeight,
@ -184,26 +290,37 @@ func createUpdateAnnouncement(blockHeight uint32) *lnwire.ChannelUpdateAnnouncem
FeeBaseMsat: uint32(prand.Int31()),
FeeProportionalMillionths: uint32(prand.Int31()),
}
return a, nil
}
func createChannelAnnouncement(blockHeight uint32) *lnwire.ChannelAnnouncement {
// Our fake channel will be "confirmed" at height 101.
chanID := lnwire.ShortChannelID{
BlockHeight: blockHeight,
TxIndex: 0,
TxPosition: 0,
func createRemoteChannelAnnouncement(blockHeight uint32) (*lnwire.ChannelAnnouncement,
error) {
a := &lnwire.ChannelAnnouncement{
ShortChannelID: lnwire.ShortChannelID{
BlockHeight: blockHeight,
TxIndex: 0,
TxPosition: 0,
},
NodeID1: nodeKeyPub1,
NodeID2: nodeKeyPub2,
BitcoinKey1: bitcoinKeyPub1,
BitcoinKey2: bitcoinKeyPub2,
NodeSig1: testSig,
NodeSig2: testSig,
BitcoinSig1: testSig,
BitcoinSig2: testSig,
}
return &lnwire.ChannelAnnouncement{
ShortChannelID: chanID,
}
return a, nil
}
type testCtx struct {
discovery *Discovery
router *mockGraphSource
notifier *mockNotifier
discovery *Discovery
router *mockGraphSource
notifier *mockNotifier
broadcastedMessage chan lnwire.Message
}
@ -215,7 +332,7 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
notifier := newMockNotifier()
router := newMockRouter(startHeight)
broadcastedMessage := make(chan lnwire.Message)
broadcastedMessage := make(chan lnwire.Message, 10)
discovery, err := New(Config{
Notifier: notifier,
Broadcast: func(_ *btcec.PublicKey, msgs ...lnwire.Message) error {
@ -224,8 +341,12 @@ func createTestCtx(startHeight uint32) (*testCtx, func(), error) {
}
return nil
},
SendMessages: nil,
Router: router,
SendToPeer: func(target *btcec.PublicKey, msg ...lnwire.Message) error {
return nil
},
Router: router,
TrickleDelay: trickleDelay,
ProofMatureDelta: proofMatureDelta,
})
if err != nil {
return nil, nil, fmt.Errorf("unable to create router %v", err)
@ -255,21 +376,23 @@ func TestProcessAnnouncement(t *testing.T) {
}
defer cleanup()
priv, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
t.Fatalf("can't create node pub key: %v", err)
}
nodePub := priv.PubKey()
na, err := createNodeAnnouncement()
// Create node valid, signed announcement, process it with with
// discovery service, check that valid announcement have been
// propagated farther into the lightning network, and check that we
// added new node into router.
na, err := createNodeAnnouncement(nodeKeyPriv1)
if err != nil {
t.Fatalf("can't create node announcement: %v", err)
}
ctx.discovery.ProcessRemoteAnnouncement(na, nodePub)
err = <-ctx.discovery.ProcessRemoteAnnouncement(na, na.NodeID)
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
case <-time.After(2 * trickleDelay):
t.Fatal("announcememt wasn't proceeded")
}
@ -277,32 +400,54 @@ func TestProcessAnnouncement(t *testing.T) {
t.Fatalf("node wasn't added to router: %v", err)
}
ca := createChannelAnnouncement(0)
ctx.discovery.ProcessRemoteAnnouncement(ca, nodePub)
// Pretending that we receive the valid channel announcement from
// remote side, and check that we broadcasted it to the our network,
// and added channel info in the router.
ca, err := createRemoteChannelAnnouncement(0)
if err != nil {
t.Fatalf("can't create channel announcement: %v", err)
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID)
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
case <-time.After(2 * trickleDelay):
t.Fatal("announcememt wasn't proceeded")
}
if len(ctx.router.infos) != 1 {
t.Fatalf("edge wasn't added to router: %v", err)
}
// Pretending that we received valid channel policy update from remote
// side, and check that we broadcasted it to the other network, and
// added updates to the router.
ua, err := createUpdateAnnouncement(0)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID)
if err != nil {
t.Fatalf("can't process remote announcement: %v", err)
}
select {
case <-ctx.broadcastedMessage:
case <-time.After(2 * trickleDelay):
t.Fatal("announcememt wasn't proceeded")
}
if len(ctx.router.edges) != 1 {
t.Fatalf("edge wasn't added to router: %v", err)
}
ua := createUpdateAnnouncement(0)
ctx.discovery.ProcessRemoteAnnouncement(ua, nodePub)
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcememt wasn't proceeded")
}
if len(ctx.router.updates) != 1 {
t.Fatalf("edge update wasn't added to router: %v", err)
}
}
// TestPrematureAnnouncement checks that premature networkMsgs are
// TestPrematureAnnouncement checks that premature announcements are
// not propagated to the router subsystem until block with according
// block height received.
func TestPrematureAnnouncement(t *testing.T) {
@ -312,62 +457,150 @@ func TestPrematureAnnouncement(t *testing.T) {
}
defer cleanup()
priv, err := btcec.NewPrivateKey(btcec.S256())
na, err := createNodeAnnouncement(nodeKeyPriv1)
if err != nil {
t.Fatalf("can't create node pub key: %v", err)
t.Fatalf("can't create node announcement: %v", err)
}
// Pretending that we receive the valid channel announcement from
// remote side, but block height of this announcement is greater than
// highest know to us, for that reason it should be added to the
// repeat/premature batch.
ca, err := createRemoteChannelAnnouncement(1)
if err != nil {
t.Fatalf("can't create channel announcement: %v", err)
}
nodePub := priv.PubKey()
ca := createChannelAnnouncement(1)
ctx.discovery.ProcessRemoteAnnouncement(ca, nodePub)
select {
case <-ctx.broadcastedMessage:
case <-ctx.discovery.ProcessRemoteAnnouncement(ca, na.NodeID):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
if len(ctx.router.infos) != 0 {
t.Fatal("edge was added to router")
}
// Pretending that we receive the valid channel update announcement from
// remote side, but block height of this announcement is greater than
// highest know to us, for that reason it should be added to the
// repeat/premature batch.
ua, err := createUpdateAnnouncement(1)
if err != nil {
t.Fatalf("can't create update announcement: %v", err)
}
select {
case <-ctx.discovery.ProcessRemoteAnnouncement(ua, na.NodeID):
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
if len(ctx.router.edges) != 0 {
t.Fatal("edge was added to router")
}
ua := createUpdateAnnouncement(1)
ctx.discovery.ProcessRemoteAnnouncement(ua, nodePub)
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcement was proceeded")
case <-time.After(100 * time.Millisecond):
}
if len(ctx.router.updates) != 0 {
t.Fatal("edge update was added to router")
}
// Generate new block and waiting the previously added announcements
// to be proceeded.
newBlock := &wire.MsgBlock{}
ctx.notifier.notifyBlock(newBlock.Header.BlockHash(), 1)
select {
case <-ctx.broadcastedMessage:
if err != nil {
t.Fatalf("announcememt was proceeded with err: %v", err)
}
case <-time.After(time.Second):
t.Fatal("announcememt wasn't proceeded")
case <-time.After(2 * trickleDelay):
t.Fatal("announcememt wasn't broadcasted")
}
if len(ctx.router.edges) != 1 {
if len(ctx.router.infos) != 1 {
t.Fatalf("edge was't added to router: %v", err)
}
select {
case <-ctx.broadcastedMessage:
if err != nil {
t.Fatalf("announcememt was proceeded with err: %v", err)
}
case <-time.After(time.Second):
t.Fatal("announcememt wasn't proceeded")
case <-time.After(2 * trickleDelay):
t.Fatal("announcememt wasn't broadcasted")
}
if len(ctx.router.updates) != 1 {
if len(ctx.router.edges) != 1 {
t.Fatalf("edge update wasn't added to router: %v", err)
}
}
// TestSignatureAnnouncement....
func TestSignatureAnnouncement(t *testing.T) {
ctx, cleanup, err := createTestCtx(proofMatureDelta)
if err != nil {
t.Fatalf("can't create context: %v", err)
}
defer cleanup()
batch, err := createAnnouncements(0)
if err != nil {
t.Fatalf("can't generate announcements: %v", err)
}
localKey := batch.nodeAnn1.NodeID
remoteKey := batch.nodeAnn2.NodeID
// Recreate lightning network topology. Initialize router with
// channel between two nodes.
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localChanAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel announcement was broadcasted")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.chanUpdAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcasted")
case <-time.After(2 * trickleDelay):
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.chanUpdAnn, remoteKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("channel update announcement was broadcasted")
case <-time.After(2 * trickleDelay):
}
// Pretending that we receive local channel announcement from funding
// manager, thereby kick off the announcement exchange process.
err = <-ctx.discovery.ProcessLocalAnnouncement(batch.localProofAnn, localKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
select {
case <-ctx.broadcastedMessage:
t.Fatal("announcements were broadcasted")
case <-time.After(2 * trickleDelay):
}
if len(ctx.discovery.waitingProofs) != 1 {
t.Fatal("local proof annoucement should be stored")
}
err = <-ctx.discovery.ProcessRemoteAnnouncement(batch.remoteProofAnn, remoteKey)
if err != nil {
t.Fatalf("unable to process :%v", err)
}
for i := 0; i < 3; i++ {
select {
case <-ctx.broadcastedMessage:
case <-time.After(time.Second):
t.Fatal("announcement wasn't broadcasted")
}
}
}

93
discovery/utils.go Normal file
View File

@ -0,0 +1,93 @@
package discovery
import (
"encoding/binary"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwire"
)
// newProofKey constructs new announcement signature message key.
func newProofKey(chanID uint64, isRemote bool) waitingProofKey {
return waitingProofKey{
chanID: chanID,
isRemote: isRemote,
}
}
// ToBytes represents the key in the byte format.
func (k waitingProofKey) ToBytes() []byte {
var key [10]byte
var b uint8
if k.isRemote {
b = 0
} else {
b = 1
}
binary.BigEndian.PutUint64(key[:], k.chanID)
key[9] = b
return key[:]
}
// createChanAnnouncement helper function which creates the channel announcement
// by the given channeldb objects.
func createChanAnnouncement(chanProof *channeldb.ChannelAuthProof,
chanInfo *channeldb.ChannelEdgeInfo,
e1, e2 *channeldb.ChannelEdgePolicy) (
*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdateAnnouncement,
*lnwire.ChannelUpdateAnnouncement) {
// First, using the parameters of the channel, along with the
// channel authentication chanProof, we'll create re-create the
// original authenticated channel announcement.
chanID := lnwire.NewShortChanIDFromInt(chanInfo.ChannelID)
chanAnn := &lnwire.ChannelAnnouncement{
NodeSig1: chanProof.NodeSig1,
NodeSig2: chanProof.NodeSig2,
ShortChannelID: chanID,
BitcoinSig1: chanProof.BitcoinSig1,
BitcoinSig2: chanProof.BitcoinSig2,
NodeID1: chanInfo.NodeKey1,
NodeID2: chanInfo.NodeKey2,
BitcoinKey1: chanInfo.BitcoinKey1,
BitcoinKey2: chanInfo.BitcoinKey2,
}
// We'll unconditionally queue the channel's existence chanProof as
// it will need to be processed before either of the channel
// update networkMsgs.
// Since it's up to a node's policy as to whether they
// advertise the edge in dire direction, we don't create an
// advertisement if the edge is nil.
var edge1Ann, edge2Ann *lnwire.ChannelUpdateAnnouncement
if e1 != nil {
edge1Ann = &lnwire.ChannelUpdateAnnouncement{
Signature: e1.Signature,
ShortChannelID: chanID,
Timestamp: uint32(e1.LastUpdate.Unix()),
Flags: 0,
TimeLockDelta: e1.TimeLockDelta,
HtlcMinimumMsat: uint32(e1.MinHTLC),
FeeBaseMsat: uint32(e1.FeeBaseMSat),
FeeProportionalMillionths: uint32(e1.FeeProportionalMillionths),
}
}
if e2 != nil {
edge2Ann = &lnwire.ChannelUpdateAnnouncement{
Signature: e2.Signature,
ShortChannelID: chanID,
Timestamp: uint32(e2.LastUpdate.Unix()),
Flags: 1,
TimeLockDelta: e2.TimeLockDelta,
HtlcMinimumMsat: uint32(e2.MinHTLC),
FeeBaseMsat: uint32(e2.FeeBaseMSat),
FeeProportionalMillionths: uint32(e2.FeeProportionalMillionths),
}
}
return chanAnn, edge1Ann, edge2Ann
}

View File

@ -2,7 +2,6 @@ package main
import (
"bytes"
"encoding/hex"
"sync"
"sync/atomic"
"time"
@ -141,9 +140,18 @@ type fundingConfig struct {
// so that the channel creation process can be completed.
Notifier chainntnfs.ChainNotifier
// SignNodeKey is used to generate signature for node public key with
// funding keys.
SignNodeKey func(nodeKey, fundingKey *btcec.PublicKey) (*btcec.Signature, error)
// SignAnnouncement is used to generate the signatures for channel
// update, and node announcements, and also to generate the proof for
// the channel announcement.
SignAnnouncement func(msg lnwire.Message) (*btcec.Signature, error)
// SendToDiscovery is used by the FundingManager to announce newly created
// channels to the rest of the Lightning Network.
SendToDiscovery func(msg lnwire.Message)
SendToDiscovery func(msg lnwire.Message) error
// SendToPeer allows the FundingManager to send messages to the peer
// node during the multiple steps involved in the creation of the
@ -202,8 +210,6 @@ type fundingManager struct {
barrierMtx sync.RWMutex
newChanBarriers map[wire.OutPoint]chan struct{}
fakeProof *channelProof
quit chan struct{}
wg sync.WaitGroup
}
@ -211,22 +217,8 @@ type fundingManager struct {
// newFundingManager creates and initializes a new instance of the
// fundingManager.
func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
// TODO(roasbeef): remove once we actually sign the funding_locked
// stuffs
s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" +
"1bf0314f882d70220299105481d63e0f4bc2a88121167221b6700d72a0e" +
"ad154c03be696a292d24ae"
fakeSigHex, _ := hex.DecodeString(s)
fakeSig, _ := btcec.ParseSignature(fakeSigHex, btcec.S256())
return &fundingManager{
cfg: &cfg,
fakeProof: &channelProof{
nodeSig: fakeSig,
bitcoinSig: fakeSig,
},
cfg: &cfg,
activeReservations: make(map[serializedPubKey]pendingChannels),
newChanBarriers: make(map[wire.OutPoint]chan struct{}),
fundingMsgs: make(chan interface{}, msgBufferSize),
@ -998,8 +990,9 @@ func (f *fundingManager) handleFundingLocked(fmsg *fundingLockedMsg) {
// Register the new link with the L3 routing manager so this
// new channel can be utilized during path
// finding.
go f.announceChannel(f.cfg.IDKey, fmsg.peerAddress.IdentityKey, channel,
fmsg.msg.ChannelID, f.fakeProof, f.fakeProof)
go f.announceChannel(f.cfg.IDKey, fmsg.peerAddress.IdentityKey,
channel.LocalFundingKey, channel.RemoteFundingKey,
fmsg.msg.ChannelID, fundingPoint)
}
// channelProof is one half of the proof necessary to create an authenticated
@ -1013,8 +1006,9 @@ type channelProof struct {
// chanAnnouncement encapsulates the two authenticated announcements that we
// send out to the network after a new channel has been created locally.
type chanAnnouncement struct {
chanAnn *lnwire.ChannelAnnouncement
edgeUpdate *lnwire.ChannelUpdateAnnouncement
chanAnn *lnwire.ChannelAnnouncement
chanUpdateAnn *lnwire.ChannelUpdateAnnouncement
chanProof *lnwire.AnnounceSignatures
}
// newChanAnnouncement creates the authenticated channel announcement messages
@ -1024,11 +1018,12 @@ type chanAnnouncement struct {
// identity pub keys of both parties to the channel, and the second segment is
// authenticated only by us and contains our directional routing policy for the
// channel.
func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey,
channel *lnwallet.LightningChannel, chanID lnwire.ShortChannelID,
localProof, remoteProof *channelProof) *chanAnnouncement {
func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.PublicKey,
localFundingKey, remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID,
chanPoint wire.OutPoint) (*chanAnnouncement, error) {
var err error
// The unconditional section of the announcement is the ChannelID
// The unconditional section of the announcement is the ShortChannelID
// itself which compactly encodes the location of the funding output
// within the blockchain.
chanAnn := &lnwire.ChannelAnnouncement{
@ -1045,30 +1040,22 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey,
// nodes indicates which of the nodes is "first". If our serialized
// identity key is lower than theirs then we're the "first" node and
// second otherwise.
selfBytes := localIdentity.SerializeCompressed()
remoteBytes := remotePub.SerializeCompressed()
selfBytes := localPubKey.SerializeCompressed()
remoteBytes := remotePubKey.SerializeCompressed()
if bytes.Compare(selfBytes, remoteBytes) == -1 {
chanAnn.NodeID1 = localIdentity
chanAnn.NodeID2 = remotePub
chanAnn.NodeSig1 = localProof.nodeSig
chanAnn.NodeSig2 = remoteProof.nodeSig
chanAnn.BitcoinSig1 = localProof.nodeSig
chanAnn.BitcoinSig2 = remoteProof.nodeSig
chanAnn.BitcoinKey1 = channel.LocalFundingKey
chanAnn.BitcoinKey2 = channel.RemoteFundingKey
chanAnn.NodeID1 = localPubKey
chanAnn.NodeID2 = remotePubKey
chanAnn.BitcoinKey1 = localFundingKey
chanAnn.BitcoinKey2 = remoteFundingKey
// If we're the first node then update the chanFlags to
// indicate the "direction" of the update.
chanFlags = 0
} else {
chanAnn.NodeID1 = remotePub
chanAnn.NodeID2 = localIdentity
chanAnn.NodeSig1 = remoteProof.nodeSig
chanAnn.NodeSig2 = localProof.nodeSig
chanAnn.BitcoinSig1 = remoteProof.nodeSig
chanAnn.BitcoinSig2 = localProof.nodeSig
chanAnn.BitcoinKey1 = channel.RemoteFundingKey
chanAnn.BitcoinKey2 = channel.LocalFundingKey
chanAnn.NodeID1 = remotePubKey
chanAnn.NodeID2 = localPubKey
chanAnn.BitcoinKey1 = remoteFundingKey
chanAnn.BitcoinKey2 = localFundingKey
// If we're the second node then update the chanFlags to
// indicate the "direction" of the update.
@ -1077,7 +1064,6 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey,
// TODO(roasbeef): add real sig, populate proper FeeSchema
chanUpdateAnn := &lnwire.ChannelUpdateAnnouncement{
Signature: localProof.nodeSig,
ShortChannelID: chanID,
Timestamp: uint32(time.Now().Unix()),
Flags: chanFlags,
@ -1087,10 +1073,40 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey,
FeeProportionalMillionths: 0,
}
return &chanAnnouncement{
chanAnn: chanAnn,
edgeUpdate: chanUpdateAnn,
chanUpdateAnn.Signature, err = f.cfg.SignAnnouncement(chanUpdateAnn)
if err != nil {
return nil, errors.Errorf("unable to generate channel "+
"update announcement signature: %v", err)
}
// Channel proof should be announced in the separate message, so we
// already have the bitcoin signature but in order to construct
// the proof we also need node signature. Use message signer in order
// to sign the message with node private key.
nodeSig, err := f.cfg.SignAnnouncement(chanAnn)
if err != nil {
return nil, errors.Errorf("unable to generate node "+
"signature for channel announcement: %v", err)
}
bitcoinSig, err := f.cfg.SignNodeKey(localPubKey, localFundingKey)
if err != nil {
return nil, errors.Errorf("unable to generate bitcoin "+
"signature for node public key: %v", err)
}
proof := &lnwire.AnnounceSignatures{
ChannelID: chanPoint,
ShortChannelID: chanID,
NodeSignature: nodeSig,
BitcoinSignature: bitcoinSig,
}
return &chanAnnouncement{
chanAnn: chanAnn,
chanUpdateAnn: chanUpdateAnn,
chanProof: proof,
}, nil
}
// announceChannel announces a newly created channel to the rest of the network
@ -1098,17 +1114,23 @@ func newChanAnnouncement(localIdentity, remotePub *btcec.PublicKey,
// network to recognize the legitimacy of the channel. The crafted
// announcements are then send to the channel router to handle broadcasting to
// the network during its next trickle.
func (f *fundingManager) announceChannel(idKey, remoteIDKey *btcec.PublicKey,
channel *lnwallet.LightningChannel, chanID lnwire.ShortChannelID, localProof,
remoteProof *channelProof) {
func (f *fundingManager) announceChannel(localIDKey, remoteIDKey *btcec.PublicKey,
localFundingKey, remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID,
chanPoint wire.OutPoint) {
// TODO(roasbeef): need a Signer.SignMessage method to finalize
// advertisements
chanAnnouncement := newChanAnnouncement(idKey, remoteIDKey, channel, chanID,
localProof, remoteProof)
ann, err := f.newChanAnnouncement(localIDKey, remoteIDKey, localFundingKey,
remoteFundingKey, chanID, chanPoint)
if err != nil {
fndgLog.Errorf("can't generate channel announcement: %v", err)
return
}
f.cfg.SendToDiscovery(chanAnnouncement.chanAnn)
f.cfg.SendToDiscovery(chanAnnouncement.edgeUpdate)
fndgLog.Infof("Send channel, channel update, and proof announcements"+
" for chanID=%v, shortChannelID=%v to discovery service",
chanPoint, chanID.ToUint64())
f.cfg.SendToDiscovery(ann.chanAnn)
f.cfg.SendToDiscovery(ann.chanUpdateAnn)
f.cfg.SendToDiscovery(ann.chanProof)
}
// initFundingWorkflow sends a message to the funding manager instructing it

View File

@ -494,12 +494,15 @@ out:
isChanUpdate = true
targetChan = msg.ChannelPoint
case *lnwire.NodeAnnouncement,
case *lnwire.ChannelUpdateAnnouncement,
*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdateAnnouncement:
*lnwire.NodeAnnouncement,
*lnwire.AnnounceSignatures:
p.server.discoverSrv.ProcessRemoteAnnouncement(msg,
p.addr.IdentityKey)
default:
peerLog.Errorf("unknown message received from peer "+"%v", p)
}
if isChanUpdate {

View File

@ -168,8 +168,23 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
selfAddrs = append(selfAddrs, addr)
}
// TODO(roasbeef): remove once we actually sign the funding_locked
// stuffs
fakeSigHex, err := hex.DecodeString("30450221008ce2bc69281ce27da07e66" +
"83571319d18e949ddfa2965fb6caa1bf0314f882d70220299105481d63e0f" +
"4bc2a88121167221b6700d72a0ead154c03be696a292d24ae")
if err != nil {
return nil, err
}
fakeSig, err := btcec.ParseSignature(fakeSigHex, btcec.S256())
if err != nil {
return nil, err
}
chanGraph := chanDB.ChannelGraph()
self := &channeldb.LightningNode{
AuthSig: fakeSig,
LastUpdate: time.Now(),
Addresses: selfAddrs,
PubKey: privKey.PubKey(),
@ -202,10 +217,12 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
}
s.discoverSrv, err = discovery.New(discovery.Config{
Broadcast: s.broadcastMessage,
Notifier: s.chainNotifier,
Router: s.chanRouter,
SendMessages: s.sendToPeer,
Broadcast: s.broadcastMessage,
Notifier: s.chainNotifier,
Router: s.chanRouter,
SendToPeer: s.sendToPeer,
TrickleDelay: time.Millisecond * 300,
ProofMatureDelta: 0,
})
if err != nil {
return nil, err
@ -218,8 +235,16 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
IDKey: s.identityPriv.PubKey(),
Wallet: wallet,
Notifier: s.chainNotifier,
SendToDiscovery: func(msg lnwire.Message) {
s.discoverSrv.ProcessLocalAnnouncement(msg,
SignNodeKey: func(nodeKey, fundingKey *btcec.PublicKey) (*btcec.Signature,
error) {
return fakeSig, nil
},
SignAnnouncement: func(msg lnwire.Message) (*btcec.Signature,
error) {
return fakeSig, nil
},
SendToDiscovery: func(msg lnwire.Message) chan error {
return s.discoverSrv.ProcessLocalAnnouncement(msg,
s.identityPriv.PubKey())
},
ArbiterChan: s.breachArbiter.newContracts,