lnd: fully integrate the ChannelRouter of the new routing package

This commit fully integrates the ChannelRouter of the new routing
package into the main lnd daemon.

A number of changes have been made to properly support the new
authenticated gossiping scheme.

Two new messages have been added to the server which allow outside
services to: send a message to all peers possible excluding one, and
send a series of messages to a single peer. These two new capabilities
are used by the ChannelRouter to gossip new accepted announcements and
also to synchronize graph state with a new peer on initial connect.

The switch no longer needs a pointer to the routing state machine as it
no longer needs to report when channels closed since the channel
closures will be detected by the ChannelRouter during graph pruning
when a new block comes in.

Finally, the funding manager now crafts the proper authenticated
announcement to send to the ChannelRouter once a new channel has bene
fully confirmed. As a place holder we have fake signatures everywhere
since we don’t properly store the funding keys and haven’t yet adapted
the Signer interface (or create a new one) that abstracts out the
process of signing a generic interface.
This commit is contained in:
Olaoluwa Osuntokun 2016-12-26 23:42:23 -06:00
parent 473f298524
commit c965eda29e
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
5 changed files with 391 additions and 171 deletions

@ -1,14 +1,16 @@
package main
import (
"bytes"
"encoding/hex"
"sync"
"sync/atomic"
"time"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/rt/graph"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
@ -135,6 +137,8 @@ type fundingManager struct {
// requests from a local sub-system within the daemon.
fundingRequests chan *initFundingMsg
fakeProof *channelProof
quit chan struct{}
wg sync.WaitGroup
}
@ -142,10 +146,23 @@ type fundingManager struct {
// newFundingManager creates and initializes a new instance of the
// fundingManager.
func newFundingManager(w *lnwallet.LightningWallet, b *breachArbiter) *fundingManager {
// TODO(roasbeef): remove once we actually sign the funding_locked
// stuffs
s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" +
"1bf0314f882d70220299105481d63e0f4bc2a88121167221b6700d72a0e" +
"ad154c03be696a292d24ae"
fakeSigHex, _ := hex.DecodeString(s)
fakeSig, _ := btcec.ParseSignature(fakeSigHex, btcec.S256())
return &fundingManager{
wallet: w,
breachAribter: b,
fakeProof: &channelProof{
nodeSig: fakeSig,
bitcoinSig: fakeSig,
},
activeReservations: make(map[int32]pendingChannels),
fundingMsgs: make(chan interface{}, msgBufferSize),
fundingRequests: make(chan *initFundingMsg, msgBufferSize),
@ -179,7 +196,6 @@ func (f *fundingManager) Stop() error {
fndgLog.Infof("Funding manager shutting down")
close(f.quit)
f.wg.Wait()
return nil
@ -229,7 +245,8 @@ func (f *fundingManager) PendingChannels() []*pendingChannel {
//
// NOTE: This MUST be run as a goroutine.
func (f *fundingManager) reservationCoordinator() {
out:
defer f.wg.Done()
for {
select {
case msg := <-f.fundingMsgs:
@ -257,11 +274,9 @@ out:
f.handlePendingChannels(msg)
}
case <-f.quit:
break out
return
}
}
f.wg.Done()
}
// handleNumPending handles a request for the total number of pending channels.
@ -561,11 +576,109 @@ func (f *fundingManager) processFundingSignComplete(msg *lnwire.SingleFundingSig
f.fundingMsgs <- &fundingSignCompleteMsg{msg, peer}
}
// channelProof is one half of the proof necessary to create an authenticated
// announcement on the network. The two signatures individually sign a
// statement of the existence of a channel.
type channelProof struct {
nodeSig *btcec.Signature
bitcoinSig *btcec.Signature
}
// chanAnnouncement encapsulates the two authenticated announcements that we
// send out to the network after a new channel has been created locally.
type chanAnnouncement struct {
chanAnn *lnwire.ChannelAnnouncement
edgeUpdate *lnwire.ChannelUpdateAnnouncement
}
// newChanAnnouncement creates the authenticated channel announcement messages
// required to broadcast a newly created channel to the network. The
// announcement is two part: the first part authenticates the existence of the
// channel and contains four signatures binding the funding pub keys and
// identity pub keys of both parties to the channel, and the second segment is
// authenticated only by us an contains our directional routing policy for the
// channel.
func newChanAnnouncement(localIdentity *btcec.PublicKey,
channel *lnwallet.LightningChannel, chanID lnwire.ChannelID,
localProof, remoteProof *channelProof) *chanAnnouncement {
// First obtain the remote party's identity public key, this will be
// used to determine the order of the keys and signatures in the
// channel announcement.
chanInfo := channel.StateSnapshot()
remotePub := chanInfo.RemoteIdentity
localPub := localIdentity
// The unconditional section of the announcement is the ChannelID
// itself which compactly encodes the location of the funding output
// within the blockchain.
chanAnn := &lnwire.ChannelAnnouncement{
ChannelID: chanID,
}
// The chanFlags field indicates which directed edge of the channel is
// being updated within the ChannelUpdateAnnouncement announcement
// below. A value of zero means it's the edge of the "first" node and 1
// being the other node.
var chanFlags uint16
// The lexicographical ordering of the two identity public keys of the
// nodes indicates which of the nodes is "first". If our serialized
// identity key is lower than theirs then we're the "first" node and
// second otherwise.
selfBytes := localIdentity.SerializeCompressed()
remoteBytes := remotePub.SerializeCompressed()
if bytes.Compare(selfBytes, remoteBytes) == -1 {
chanAnn.FirstNodeID = localPub
chanAnn.SecondNodeID = &remotePub
chanAnn.FirstNodeSig = localProof.nodeSig
chanAnn.SecondNodeSig = remoteProof.nodeSig
chanAnn.FirstBitcoinSig = localProof.nodeSig
chanAnn.SecondBitcoinSig = remoteProof.nodeSig
chanAnn.FirstBitcoinKey = channel.LocalFundingKey
chanAnn.SecondBitcoinKey = channel.RemoteFundingKey
// If we're the first node then update the chanFlags to
// indicate the "direction" of the update.
chanFlags = 0
} else {
chanAnn.FirstNodeID = &remotePub
chanAnn.SecondNodeID = localPub
chanAnn.FirstNodeSig = remoteProof.nodeSig
chanAnn.SecondNodeSig = localProof.nodeSig
chanAnn.FirstBitcoinSig = remoteProof.nodeSig
chanAnn.SecondBitcoinSig = localProof.nodeSig
chanAnn.FirstBitcoinKey = channel.RemoteFundingKey
chanAnn.SecondBitcoinKey = channel.LocalFundingKey
// If we're the second node then update the chanFlags to
// indicate the "direction" of the update.
chanFlags = 1
}
// TODO(roasbeef): add real sig, populate proper FeeSchema
chanUpdateAnn := &lnwire.ChannelUpdateAnnouncement{
Signature: localProof.nodeSig,
ChannelID: chanID,
Timestamp: uint32(time.Now().Unix()),
Flags: chanFlags,
Expiry: 1,
HtlcMinimumMstat: 0,
FeeBaseMstat: 0,
FeeProportionalMillionths: 0,
}
return &chanAnnouncement{
chanAnn: chanAnn,
edgeUpdate: chanUpdateAnn,
}
}
// handleFundingSignComplete processes the final message received in a single
// funder workflow. Once this message is processed, the funding transaction is
// broadcast. Once the funding transaction reaches a sufficient number of
// confirmations, a message is sent to the responding peer along with an SPV
// proofs of transaction inclusion.
// confirmations, a message is sent to the responding peer along with a compact
// encoding of the location of the channel within the block chain.
func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) {
chanID := fmsg.msg.ChannelID
peerID := fmsg.peer.id
@ -609,73 +722,88 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
// once it reaches a sufficient number of confirmations.
// TODO(roasbeef): semaphore to limit active chan open goroutines
go func() {
select {
// TODO(roasbeef): need to persist pending broadcast channels,
// send chan open proof during scan of blocks mined while down.
case openChan := <-resCtx.reservation.DispatchChan():
// This reservation is no longer pending as the funding
// transaction has been fully confirmed.
f.deleteReservationCtx(peerID, chanID)
openChan, confHeight, confBlockIndex := resCtx.reservation.DispatchChan()
// This reservation is no longer pending as the funding
// transaction has been fully confirmed.
f.deleteReservationCtx(peerID, chanID)
fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active",
fundingPoint, peerID)
fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active",
fundingPoint, peerID)
// Now that the channel is open, we need to notify a
// number of parties of this event.
// Now that the channel is open, we need to notify a number of
// parties of this event.
// First we send the newly opened channel to the source
// server peer.
fmsg.peer.newChannels <- openChan
// First we send the newly opened channel to the source server
// peer.
fmsg.peer.newChannels <- openChan
// Afterwards we send the breach arbiter the new
// channel so it can watch for attempts to breach the
// channel's contract by the remote party.
f.breachAribter.newContracts <- openChan
// Afterwards we send the breach arbiter the new channel so it
// can watch for attempts to breach the channel's contract by
// the remote party.
f.breachAribter.newContracts <- openChan
// Next, we queue a message to notify the remote peer
// that the channel is open. We additionally provide an
// SPV proof allowing them to verify the transaction
// inclusion.
// TODO(roasbeef): obtain SPV proof from sub-system.
// * ChainNotifier constructs proof also?
spvProof := []byte("fake proof")
fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, spvProof)
fmsg.peer.queueMsg(fundingOpen, nil)
// With the block height and the transaction index known, we
// can construct the compact chainID which is used on the
// network to unique identify channels.
chainID := lnwire.ChannelID{
BlockHeight: confHeight,
TxIndex: confBlockIndex,
TxPosition: uint16(fundingPoint.Index),
}
// Register the new link with the L3 routing manager
// so this new channel can be utilized during path
// finding.
chanInfo := openChan.StateSnapshot()
capacity := int64(chanInfo.LocalBalance + chanInfo.RemoteBalance)
pubSerialized := fmsg.peer.addr.IdentityKey.SerializeCompressed()
fmsg.peer.server.routingMgr.OpenChannel(
graph.NewVertex(pubSerialized),
graph.NewEdgeID(*fundingPoint),
&graph.ChannelInfo{
Cpt: capacity,
},
)
// Next, we queue a message to notify the remote peer that the
// channel is open. We additionally provide the compact
// channelID so they can advertise the channel.
fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, chainID)
fmsg.peer.queueMsg(fundingOpen, nil)
// Finally give the caller a final update notifying
// them that the channel is now open.
// TODO(roasbeef): helper funcs for proto construction
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: &lnrpc.ChannelPoint{
FundingTxid: fundingPoint.Hash[:],
OutputIndex: fundingPoint.Index,
},
// Register the new link with the L3 routing manager so this
// new channel can be utilized during path
// finding.
// TODO(roasbeef): should include sigs from funding
// locked
// * should be moved to after funding locked is recv'd
f.announceChannel(fmsg.peer.server, openChan, chainID, f.fakeProof,
f.fakeProof)
// Finally give the caller a final update notifying them that
// the channel is now open.
// TODO(roasbeef): helper funcs for proto construction
resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{
ChannelPoint: &lnrpc.ChannelPoint{
FundingTxid: fundingPoint.Hash[:],
OutputIndex: fundingPoint.Index,
},
},
}
return
case <-f.quit:
return
},
}
return
}()
}
// announceChannel announces a newly created channel to the rest of the network
// by crafting the two authenticated announcement required for the peers on the
// network to recognize the legitimacy of the channel. The crafted
// announcements are then send to the channel router to handle broadcasting to
// the network during its next trickle.
func (f *fundingManager) announceChannel(s *server,
channel *lnwallet.LightningChannel, chanID lnwire.ChannelID,
localProof, remoteProof *channelProof) {
// TODO(roasbeef): need a Signer.SignMessage method to finalize
// advertisements
localIdentity := s.identityPriv.PubKey()
chanAnnouncement := newChanAnnouncement(localIdentity, channel,
chanID, localProof, remoteProof)
s.chanRouter.ProcessRoutingMessage(chanAnnouncement.chanAnn, localIdentity)
s.chanRouter.ProcessRoutingMessage(chanAnnouncement.edgeUpdate, localIdentity)
}
// processFundingOpenProof sends a message to the fundingManager allowing it
// to process the final message recieved when the daemon is on the responding
// side of a single funder channel workflow.
@ -684,9 +812,7 @@ func (f *fundingManager) processFundingOpenProof(msg *lnwire.SingleFundingOpenPr
}
// handleFundingOpen processes the final message when the daemon is the
// responder to a single funder channel workflow. The SPV proofs supplied by
// the initiating node is verified, which if correct, marks the channel as open
// to the source peer.
// responder to a single funder channel workflow.
func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
chanID := fmsg.msg.ChannelID
peerID := fmsg.peer.id
@ -721,19 +847,15 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
resCtx.reservation.FundingOutpoint, peerID)
// Notify the L3 routing manager of the newly active channel link.
capacity := int64(resCtx.reservation.OurContribution().FundingAmount +
resCtx.reservation.TheirContribution().FundingAmount)
vertex := fmsg.peer.addr.IdentityKey.SerializeCompressed()
fmsg.peer.server.routingMgr.OpenChannel(
graph.NewVertex(vertex),
graph.NewEdgeID(*resCtx.reservation.FundingOutpoint()),
&graph.ChannelInfo{
Cpt: capacity,
},
)
// TODO(roasbeef): should have sigs, only after funding_locked is
// recv'd
// * also ensure fault tolerance, scan opened chan on start up check
// for graph existence
f.announceChannel(fmsg.peer.server, openChan, fmsg.msg.ChanChainID,
f.fakeProof, f.fakeProof)
// Send the newly opened channel to the breach arbiter to it can watch
// for uncopperative channel breaches, potentially punishing the
// for uncooperative channel breaches, potentially punishing the
// counter-party for attempting to cheat us.
f.breachAribter.newContracts <- openChan

@ -13,8 +13,6 @@ import (
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/rt/graph"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
@ -98,7 +96,7 @@ type paymentCircuit struct {
settle *link
}
// HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's.
// htlcSwitch is a central messaging bus for all incoming/outgoing HTLC's.
// Connected peers with active channels are treated as named interfaces which
// refer to active channels as links. A link is the switche's message
// communication point with the goroutine that manages an active channel. New
@ -151,11 +149,6 @@ type htlcSwitch struct {
// fully locked in.
htlcPlex chan *htlcPacket
gateway []byte
router *routing.RoutingManager
// TODO(roasbeef): messaging chan to/from upper layer (routing - L3)
// TODO(roasbeef): sampler to log sat/sec and tx/sec
wg sync.WaitGroup
@ -163,10 +156,8 @@ type htlcSwitch struct {
}
// newHtlcSwitch creates a new htlcSwitch.
func newHtlcSwitch(gateway []byte, r *routing.RoutingManager) *htlcSwitch {
func newHtlcSwitch() *htlcSwitch {
return &htlcSwitch{
router: r,
gateway: gateway,
chanIndex: make(map[wire.OutPoint]*link),
interfaces: make(map[wire.ShaHash][]*link),
onionIndex: make(map[[ripemd160.Size]byte][]*link),
@ -495,7 +486,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
// A request with a nil channel point indicates that all the current
// links for this channel should be cleared.
chansRemoved := make([]*wire.OutPoint, 0, len(links))
if req.chanPoint == nil {
hswcLog.Debugf("purging all active links for interface %v",
hex.EncodeToString(chanInterface[:]))
@ -504,9 +494,8 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
h.chanIndexMtx.Lock()
delete(h.chanIndex, *link.chanPoint)
h.chanIndexMtx.Unlock()
chansRemoved = append(chansRemoved, link.chanPoint)
}
links = nil
} else {
h.chanIndexMtx.Lock()
@ -516,8 +505,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
for i := 0; i < len(links); i++ {
chanLink := links[i]
if chanLink.chanPoint == req.chanPoint {
chansRemoved = append(chansRemoved, req.chanPoint)
// We perform an in-place delete by sliding
// every element down one, then slicing off the
// last element. Additionally, we update the
@ -534,22 +521,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) {
}
}
// Purge the now inactive channels from the routing table.
// TODO(roasbeef): routing layer should only see the links as a
// summation of their capacity/etc
// * distinction between connection close and channel close
for _, linkChan := range chansRemoved {
err := h.router.RemoveChannel(
graph.NewVertex(h.gateway),
graph.NewVertex(req.remoteID),
graph.NewEdgeID(*linkChan),
)
if err != nil {
hswcLog.Errorf("unable to remove channel from "+
"routing table: %v", err)
}
}
// TODO(roasbeef): clean up/modify onion links
// * just have the interfaces index be keyed on hash160?

7
log.go

@ -10,6 +10,7 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/routing"
)
// Loggers per subsystem. Note that backendLog is a seelog logger that all of
@ -30,6 +31,7 @@ var (
utxnLog = btclog.Disabled
brarLog = btclog.Disabled
cmgrLog = btclog.Disabled
crtrLog = btclog.Disabled
)
// subsystemLoggers maps each subsystem identifier to its associated logger.
@ -46,6 +48,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"UTXN": utxnLog,
"BRAR": brarLog,
"CMGR": cmgrLog,
"CRTR": crtrLog,
}
// useLogger updates the logger references for subsystemID to logger. Invalid
@ -96,6 +99,10 @@ func useLogger(subsystemID string, logger btclog.Logger) {
case "CMGR":
cmgrLog = logger
connmgr.UseLogger(logger)
case "CRTR":
crtrLog = logger
routing.UseLogger(crtrLog)
}
}

30
peer.go

@ -19,7 +19,6 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/rt/graph"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
@ -233,17 +232,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint)
// Notify the routing table of this newly loaded channel.
chanInfo := lnChan.StateSnapshot()
capacity := int64(chanInfo.LocalBalance + chanInfo.RemoteBalance)
pubSerialized := p.addr.IdentityKey.SerializeCompressed()
p.server.routingMgr.OpenChannel(
graph.NewVertex(pubSerialized),
graph.NewEdgeID(*chanInfo.ChannelPoint),
&graph.ChannelInfo{
Cpt: capacity,
},
)
// TODO(roasbeef): register active channel with breach observer
// Register this new channel link with the HTLC Switch. This is
// necessary to properly route multi-hop payments, and forward
@ -379,7 +368,6 @@ out:
case *lnwire.Ping:
p.queueMsg(lnwire.NewPong(msg.Nonce), nil)
// TODO(roasbeef): consolidate into predicate (single vs dual)
case *lnwire.SingleFundingRequest:
p.server.fundingMgr.processFundingRequest(msg, p)
case *lnwire.SingleFundingResponse:
@ -392,11 +380,10 @@ out:
p.server.fundingMgr.processFundingOpenProof(msg, p)
case *lnwire.CloseRequest:
p.remoteCloseChanReqs <- msg
// TODO(roasbeef): interface for htlc update msgs
// * .(CommitmentUpdater)
case *lnwire.ErrorGeneric:
p.server.fundingMgr.processErrorGeneric(msg, p)
case *lnwire.HTLCAddRequest:
isChanUpdate = true
targetChan = msg.ChannelPoint
@ -409,14 +396,13 @@ out:
case *lnwire.CommitSignature:
isChanUpdate = true
targetChan = msg.ChannelPoint
case *lnwire.NeighborAckMessage,
*lnwire.NeighborHelloMessage,
*lnwire.NeighborRstMessage,
*lnwire.NeighborUpdMessage:
// Convert to base routing message and set sender and receiver
vertex := p.addr.IdentityKey.SerializeCompressed()
p.server.routingMgr.ReceiveRoutingMessage(msg, graph.NewVertex(vertex))
case *lnwire.NodeAnnouncement,
*lnwire.ChannelAnnouncement,
*lnwire.ChannelUpdateAnnouncement:
p.server.chanRouter.ProcessRoutingMessage(msg,
p.addr.IdentityKey)
}
if isChanUpdate {

220
server.go

@ -1,6 +1,8 @@
package main
import (
"encoding/hex"
"errors"
"fmt"
"net"
"sync"
@ -20,7 +22,6 @@ import (
"github.com/roasbeef/btcutil"
"github.com/lightningnetwork/lnd/routing"
"github.com/lightningnetwork/lnd/routing/rt/graph"
)
// server is the main server of the Lightning Network Daemon. The server houses
@ -57,7 +58,7 @@ type server struct {
invoices *invoiceRegistry
breachArbiter *breachArbiter
routingMgr *routing.RoutingManager
chanRouter *routing.ChannelRouter
utxoNursery *utxoNursery
@ -69,6 +70,9 @@ type server struct {
persistentConnReqs map[string]*connmgr.ConnReq
pendingConnRequests map[string]*connectPeerMsg
broadcastRequests chan *broadcastReq
sendRequests chan *sendReq
newPeers chan *peer
donePeers chan *peer
queries chan interface{}
@ -105,6 +109,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
invoices: newInvoiceRegistry(chanDB),
utxoNursery: newUtxoNursery(notifier, wallet),
htlcSwitch: newHtlcSwitch(),
identityPriv: privKey,
@ -122,6 +127,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
newPeers: make(chan *peer, 10),
donePeers: make(chan *peer, 10),
broadcastRequests: make(chan *broadcastReq),
sendRequests: make(chan *sendReq),
queries: make(chan interface{}),
quit: make(chan struct{}),
}
@ -136,47 +144,37 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
debugPre[:], debugHash[:])
}
s.utxoNursery = newUtxoNursery(notifier, wallet)
// Create a new routing manager with ourself as the sole node within
// the graph.
selfVertex := serializedPubKey
routingMgrConfig := &routing.RoutingConfig{}
routingMgrConfig.SendMessage = func(receiver [33]byte, msg lnwire.Message) error {
receiverID := graph.NewVertex(receiver[:])
if receiverID == graph.NilVertex {
peerLog.Critical("receiverID == graph.NilVertex")
return fmt.Errorf("receiverID == graph.NilVertex")
}
var targetPeer *peer
for _, peer := range s.peersByID { // TODO: threadsafe API
nodePub := peer.addr.IdentityKey.SerializeCompressed()
nodeVertex := graph.NewVertex(nodePub[:])
// We found the target
if receiverID == nodeVertex {
targetPeer = peer
break
}
}
if targetPeer != nil {
targetPeer.queueMsg(msg, nil)
} else {
srvrLog.Errorf("Can't find peer to send message %v",
receiverID)
}
return nil
// TODO(roasbeef): add --externalip flag?
selfAddr, ok := listeners[0].Addr().(*net.TCPAddr)
if !ok {
return nil, fmt.Errorf("default listener must be TCP")
}
s.routingMgr = routing.NewRoutingManager(graph.NewVertex(selfVertex), routingMgrConfig)
s.htlcSwitch = newHtlcSwitch(serializedPubKey, s.routingMgr)
chanGraph := chanDB.ChannelGraph()
self := &channeldb.LightningNode{
LastUpdate: time.Now(),
Address: selfAddr,
PubKey: privKey.PubKey(),
// TODO(roasbeef): make alias configurable
Alias: hex.EncodeToString(serializedPubKey[:10]),
}
if err := chanGraph.SetSourceNode(self); err != nil {
return nil, err
}
s.chanRouter, err = routing.New(routing.Config{
Graph: chanGraph,
Chain: bio,
Notifier: notifier,
Broadcast: s.broadcastMessage,
SendMessages: s.sendToPeer,
})
if err != nil {
return nil, err
}
s.rpcServer = newRpcServer(s)
s.breachArbiter = newBreachArbiter(wallet, chanDB, notifier, s.htlcSwitch)
s.fundingMgr = newFundingManager(wallet, s.breachArbiter)
// TODO(roasbeef): introduce closure and config system to decouple the
@ -268,7 +266,9 @@ func (s *server) Start() error {
if err := s.breachArbiter.Start(); err != nil {
return err
}
s.routingMgr.Start()
if err := s.chanRouter.Start(); err != nil {
return err
}
s.wg.Add(1)
go s.queryHandler()
@ -289,7 +289,7 @@ func (s *server) Stop() error {
s.chainNotifier.Stop()
s.rpcServer.Stop()
s.fundingMgr.Stop()
s.routingMgr.Stop()
s.chanRouter.Stop()
s.htlcSwitch.Stop()
s.utxoNursery.Stop()
s.breachArbiter.Stop()
@ -308,6 +308,81 @@ func (s *server) WaitForShutdown() {
s.wg.Wait()
}
// broadcastReq is a message sent to the server by a related sub-system when it
// wishes to broadcast one or more messages to all connected peers. Thi
type broadcastReq struct {
ignore *btcec.PublicKey
msgs []lnwire.Message
errChan chan error // MUST be buffered.
}
// broadcastMessage sends a request to the server to broadcast a set of
// messages to all peers other than the one specified by the `skip` parameter.
func (s *server) broadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error {
errChan := make(chan error, 1)
msgsToSend := make([]lnwire.Message, 0, len(msgs))
msgsToSend = append(msgsToSend, msgs...)
broadcastReq := &broadcastReq{
ignore: skip,
msgs: msgsToSend,
errChan: errChan,
}
select {
case s.broadcastRequests <- broadcastReq:
case <-s.quit:
return errors.New("server shutting down")
}
select {
case err := <-errChan:
return err
case <-s.quit:
return errors.New("server shutting down")
}
}
// sendReq is message sent to the server by a related sub-system which it
// wishes to send a set of messages to a specified peer.
type sendReq struct {
target *btcec.PublicKey
msgs []lnwire.Message
errChan chan error
}
// sendToPeer send a message to the server telling it to send the specific set
// of message to a particular peer. If the peer connect be found, then this
// method will return a non-nil error.
func (s *server) sendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error {
errChan := make(chan error, 1)
msgsToSend := make([]lnwire.Message, 0, len(msgs))
msgsToSend = append(msgsToSend, msgs...)
sMsg := &sendReq{
target: target,
msgs: msgsToSend,
errChan: errChan,
}
select {
case s.sendRequests <- sMsg:
case <-s.quit:
return errors.New("server shutting down")
}
select {
case err := <-errChan:
return err
case <-s.quit:
return errors.New("server shutting down")
}
return nil
}
// peerConnected is a function that handles initialization a newly connected
// peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly.
@ -415,9 +490,6 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn)
// addPeer adds the passed peer to the server's global state of all active
// peers.
func (s *server) addPeer(p *peer) {
s.peersMtx.Lock()
defer s.peersMtx.Unlock()
if p == nil {
return
}
@ -428,8 +500,19 @@ func (s *server) addPeer(p *peer) {
return
}
// Track the new peer in our indexes so we can quickly look it up either
// according to its public key, or it's peer ID.
// TODO(roasbeef): pipe all requests through to the
// queryHandler/peerManager
s.peersMtx.Lock()
s.peersByID[p.id] = p
s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p
s.peersMtx.Unlock()
// Once the peer has been added to our indexes, send a message to the
// channel router so we can synchronize our view of the channel graph
// with this new peer.
s.chanRouter.SynchronizeNode(p.addr.IdentityKey)
}
// removePeer removes the passed peer from the server's state of all active
@ -508,6 +591,54 @@ out:
case p := <-s.donePeers:
s.removePeer(p)
case bMsg := <-s.broadcastRequests:
ignore := bMsg.ignore
srvrLog.Debugf("Broadcasting %v messages", len(bMsg.msgs))
s.peersMtx.RLock()
for _, peer := range s.peersByPub {
if ignore != nil &&
peer.addr.IdentityKey.IsEqual(ignore) {
srvrLog.Debugf("Skipping %v in broadcast",
ignore.SerializeCompressed())
continue
}
for _, msg := range bMsg.msgs {
peer.queueMsg(msg, nil)
}
}
s.peersMtx.RUnlock()
bMsg.errChan <- nil
case sMsg := <-s.sendRequests:
// TODO(roasbeef): use [33]byte everywhere instead
// * eliminate usage of mutexes, funnel all peer
// mutation to this goroutine
target := sMsg.target.SerializeCompressed()
srvrLog.Debugf("Attempting to send msgs %v to: %x",
len(sMsg.msgs), target)
s.peersMtx.RLock()
targetPeer, ok := s.peersByPub[string(target)]
if !ok {
s.peersMtx.RUnlock()
srvrLog.Errorf("unable to send message to %x, "+
"peer not found", target)
sMsg.errChan <- errors.New("peer not found")
continue
}
for _, msg := range sMsg.msgs {
targetPeer.queueMsg(msg, nil)
}
s.peersMtx.RUnlock()
sMsg.errChan <- nil
case query := <-s.queries:
switch msg := query.(type) {
case *connectPeerMsg:
@ -587,6 +718,9 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) {
Permanent: true,
})
// TODO(roasbeef): create goroutine to poll state so can report
// connection fails
// Finally, we store the original request keyed by the public key so we
// can dispatch the response to the RPC client once a connection has
// been initiated.