diff --git a/fundingmanager.go b/fundingmanager.go index 0cbc5912..b8a0aa21 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -1,14 +1,16 @@ package main import ( + "bytes" + "encoding/hex" "sync" "sync/atomic" + "time" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing/rt/graph" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" @@ -135,6 +137,8 @@ type fundingManager struct { // requests from a local sub-system within the daemon. fundingRequests chan *initFundingMsg + fakeProof *channelProof + quit chan struct{} wg sync.WaitGroup } @@ -142,10 +146,23 @@ type fundingManager struct { // newFundingManager creates and initializes a new instance of the // fundingManager. func newFundingManager(w *lnwallet.LightningWallet, b *breachArbiter) *fundingManager { + // TODO(roasbeef): remove once we actually sign the funding_locked + // stuffs + s := "30450221008ce2bc69281ce27da07e6683571319d18e949ddfa2965fb6caa" + + "1bf0314f882d70220299105481d63e0f4bc2a88121167221b6700d72a0e" + + "ad154c03be696a292d24ae" + fakeSigHex, _ := hex.DecodeString(s) + fakeSig, _ := btcec.ParseSignature(fakeSigHex, btcec.S256()) + return &fundingManager{ wallet: w, breachAribter: b, + fakeProof: &channelProof{ + nodeSig: fakeSig, + bitcoinSig: fakeSig, + }, + activeReservations: make(map[int32]pendingChannels), fundingMsgs: make(chan interface{}, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize), @@ -179,7 +196,6 @@ func (f *fundingManager) Stop() error { fndgLog.Infof("Funding manager shutting down") close(f.quit) - f.wg.Wait() return nil @@ -229,7 +245,8 @@ func (f *fundingManager) PendingChannels() []*pendingChannel { // // NOTE: This MUST be run as a goroutine. func (f *fundingManager) reservationCoordinator() { -out: + defer f.wg.Done() + for { select { case msg := <-f.fundingMsgs: @@ -257,11 +274,9 @@ out: f.handlePendingChannels(msg) } case <-f.quit: - break out + return } } - - f.wg.Done() } // handleNumPending handles a request for the total number of pending channels. @@ -561,11 +576,109 @@ func (f *fundingManager) processFundingSignComplete(msg *lnwire.SingleFundingSig f.fundingMsgs <- &fundingSignCompleteMsg{msg, peer} } +// channelProof is one half of the proof necessary to create an authenticated +// announcement on the network. The two signatures individually sign a +// statement of the existence of a channel. +type channelProof struct { + nodeSig *btcec.Signature + bitcoinSig *btcec.Signature +} + +// chanAnnouncement encapsulates the two authenticated announcements that we +// send out to the network after a new channel has been created locally. +type chanAnnouncement struct { + chanAnn *lnwire.ChannelAnnouncement + edgeUpdate *lnwire.ChannelUpdateAnnouncement +} + +// newChanAnnouncement creates the authenticated channel announcement messages +// required to broadcast a newly created channel to the network. The +// announcement is two part: the first part authenticates the existence of the +// channel and contains four signatures binding the funding pub keys and +// identity pub keys of both parties to the channel, and the second segment is +// authenticated only by us an contains our directional routing policy for the +// channel. +func newChanAnnouncement(localIdentity *btcec.PublicKey, + channel *lnwallet.LightningChannel, chanID lnwire.ChannelID, + localProof, remoteProof *channelProof) *chanAnnouncement { + + // First obtain the remote party's identity public key, this will be + // used to determine the order of the keys and signatures in the + // channel announcement. + chanInfo := channel.StateSnapshot() + remotePub := chanInfo.RemoteIdentity + localPub := localIdentity + + // The unconditional section of the announcement is the ChannelID + // itself which compactly encodes the location of the funding output + // within the blockchain. + chanAnn := &lnwire.ChannelAnnouncement{ + ChannelID: chanID, + } + + // The chanFlags field indicates which directed edge of the channel is + // being updated within the ChannelUpdateAnnouncement announcement + // below. A value of zero means it's the edge of the "first" node and 1 + // being the other node. + var chanFlags uint16 + + // The lexicographical ordering of the two identity public keys of the + // nodes indicates which of the nodes is "first". If our serialized + // identity key is lower than theirs then we're the "first" node and + // second otherwise. + selfBytes := localIdentity.SerializeCompressed() + remoteBytes := remotePub.SerializeCompressed() + if bytes.Compare(selfBytes, remoteBytes) == -1 { + chanAnn.FirstNodeID = localPub + chanAnn.SecondNodeID = &remotePub + chanAnn.FirstNodeSig = localProof.nodeSig + chanAnn.SecondNodeSig = remoteProof.nodeSig + chanAnn.FirstBitcoinSig = localProof.nodeSig + chanAnn.SecondBitcoinSig = remoteProof.nodeSig + chanAnn.FirstBitcoinKey = channel.LocalFundingKey + chanAnn.SecondBitcoinKey = channel.RemoteFundingKey + + // If we're the first node then update the chanFlags to + // indicate the "direction" of the update. + chanFlags = 0 + } else { + chanAnn.FirstNodeID = &remotePub + chanAnn.SecondNodeID = localPub + chanAnn.FirstNodeSig = remoteProof.nodeSig + chanAnn.SecondNodeSig = localProof.nodeSig + chanAnn.FirstBitcoinSig = remoteProof.nodeSig + chanAnn.SecondBitcoinSig = localProof.nodeSig + chanAnn.FirstBitcoinKey = channel.RemoteFundingKey + chanAnn.SecondBitcoinKey = channel.LocalFundingKey + + // If we're the second node then update the chanFlags to + // indicate the "direction" of the update. + chanFlags = 1 + } + + // TODO(roasbeef): add real sig, populate proper FeeSchema + chanUpdateAnn := &lnwire.ChannelUpdateAnnouncement{ + Signature: localProof.nodeSig, + ChannelID: chanID, + Timestamp: uint32(time.Now().Unix()), + Flags: chanFlags, + Expiry: 1, + HtlcMinimumMstat: 0, + FeeBaseMstat: 0, + FeeProportionalMillionths: 0, + } + + return &chanAnnouncement{ + chanAnn: chanAnn, + edgeUpdate: chanUpdateAnn, + } +} + // handleFundingSignComplete processes the final message received in a single // funder workflow. Once this message is processed, the funding transaction is // broadcast. Once the funding transaction reaches a sufficient number of -// confirmations, a message is sent to the responding peer along with an SPV -// proofs of transaction inclusion. +// confirmations, a message is sent to the responding peer along with a compact +// encoding of the location of the channel within the block chain. func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) { chanID := fmsg.msg.ChannelID peerID := fmsg.peer.id @@ -609,73 +722,88 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) // once it reaches a sufficient number of confirmations. // TODO(roasbeef): semaphore to limit active chan open goroutines go func() { - select { // TODO(roasbeef): need to persist pending broadcast channels, // send chan open proof during scan of blocks mined while down. - case openChan := <-resCtx.reservation.DispatchChan(): - // This reservation is no longer pending as the funding - // transaction has been fully confirmed. - f.deleteReservationCtx(peerID, chanID) + openChan, confHeight, confBlockIndex := resCtx.reservation.DispatchChan() + // This reservation is no longer pending as the funding + // transaction has been fully confirmed. + f.deleteReservationCtx(peerID, chanID) - fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active", - fundingPoint, peerID) + fndgLog.Infof("ChannelPoint(%v) with peerID(%v) is now active", + fundingPoint, peerID) - // Now that the channel is open, we need to notify a - // number of parties of this event. + // Now that the channel is open, we need to notify a number of + // parties of this event. - // First we send the newly opened channel to the source - // server peer. - fmsg.peer.newChannels <- openChan + // First we send the newly opened channel to the source server + // peer. + fmsg.peer.newChannels <- openChan - // Afterwards we send the breach arbiter the new - // channel so it can watch for attempts to breach the - // channel's contract by the remote party. - f.breachAribter.newContracts <- openChan + // Afterwards we send the breach arbiter the new channel so it + // can watch for attempts to breach the channel's contract by + // the remote party. + f.breachAribter.newContracts <- openChan - // Next, we queue a message to notify the remote peer - // that the channel is open. We additionally provide an - // SPV proof allowing them to verify the transaction - // inclusion. - // TODO(roasbeef): obtain SPV proof from sub-system. - // * ChainNotifier constructs proof also? - spvProof := []byte("fake proof") - fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, spvProof) - fmsg.peer.queueMsg(fundingOpen, nil) + // With the block height and the transaction index known, we + // can construct the compact chainID which is used on the + // network to unique identify channels. + chainID := lnwire.ChannelID{ + BlockHeight: confHeight, + TxIndex: confBlockIndex, + TxPosition: uint16(fundingPoint.Index), + } - // Register the new link with the L3 routing manager - // so this new channel can be utilized during path - // finding. - chanInfo := openChan.StateSnapshot() - capacity := int64(chanInfo.LocalBalance + chanInfo.RemoteBalance) - pubSerialized := fmsg.peer.addr.IdentityKey.SerializeCompressed() - fmsg.peer.server.routingMgr.OpenChannel( - graph.NewVertex(pubSerialized), - graph.NewEdgeID(*fundingPoint), - &graph.ChannelInfo{ - Cpt: capacity, - }, - ) + // Next, we queue a message to notify the remote peer that the + // channel is open. We additionally provide the compact + // channelID so they can advertise the channel. + fundingOpen := lnwire.NewSingleFundingOpenProof(chanID, chainID) + fmsg.peer.queueMsg(fundingOpen, nil) - // Finally give the caller a final update notifying - // them that the channel is now open. - // TODO(roasbeef): helper funcs for proto construction - resCtx.updates <- &lnrpc.OpenStatusUpdate{ - Update: &lnrpc.OpenStatusUpdate_ChanOpen{ - ChanOpen: &lnrpc.ChannelOpenUpdate{ - ChannelPoint: &lnrpc.ChannelPoint{ - FundingTxid: fundingPoint.Hash[:], - OutputIndex: fundingPoint.Index, - }, + // Register the new link with the L3 routing manager so this + // new channel can be utilized during path + // finding. + // TODO(roasbeef): should include sigs from funding + // locked + // * should be moved to after funding locked is recv'd + f.announceChannel(fmsg.peer.server, openChan, chainID, f.fakeProof, + f.fakeProof) + + // Finally give the caller a final update notifying them that + // the channel is now open. + // TODO(roasbeef): helper funcs for proto construction + resCtx.updates <- &lnrpc.OpenStatusUpdate{ + Update: &lnrpc.OpenStatusUpdate_ChanOpen{ + ChanOpen: &lnrpc.ChannelOpenUpdate{ + ChannelPoint: &lnrpc.ChannelPoint{ + FundingTxid: fundingPoint.Hash[:], + OutputIndex: fundingPoint.Index, }, }, - } - return - case <-f.quit: - return + }, } + return }() } +// announceChannel announces a newly created channel to the rest of the network +// by crafting the two authenticated announcement required for the peers on the +// network to recognize the legitimacy of the channel. The crafted +// announcements are then send to the channel router to handle broadcasting to +// the network during its next trickle. +func (f *fundingManager) announceChannel(s *server, + channel *lnwallet.LightningChannel, chanID lnwire.ChannelID, + localProof, remoteProof *channelProof) { + + // TODO(roasbeef): need a Signer.SignMessage method to finalize + // advertisements + localIdentity := s.identityPriv.PubKey() + chanAnnouncement := newChanAnnouncement(localIdentity, channel, + chanID, localProof, remoteProof) + + s.chanRouter.ProcessRoutingMessage(chanAnnouncement.chanAnn, localIdentity) + s.chanRouter.ProcessRoutingMessage(chanAnnouncement.edgeUpdate, localIdentity) +} + // processFundingOpenProof sends a message to the fundingManager allowing it // to process the final message recieved when the daemon is on the responding // side of a single funder channel workflow. @@ -684,9 +812,7 @@ func (f *fundingManager) processFundingOpenProof(msg *lnwire.SingleFundingOpenPr } // handleFundingOpen processes the final message when the daemon is the -// responder to a single funder channel workflow. The SPV proofs supplied by -// the initiating node is verified, which if correct, marks the channel as open -// to the source peer. +// responder to a single funder channel workflow. func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { chanID := fmsg.msg.ChannelID peerID := fmsg.peer.id @@ -721,19 +847,15 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { resCtx.reservation.FundingOutpoint, peerID) // Notify the L3 routing manager of the newly active channel link. - capacity := int64(resCtx.reservation.OurContribution().FundingAmount + - resCtx.reservation.TheirContribution().FundingAmount) - vertex := fmsg.peer.addr.IdentityKey.SerializeCompressed() - fmsg.peer.server.routingMgr.OpenChannel( - graph.NewVertex(vertex), - graph.NewEdgeID(*resCtx.reservation.FundingOutpoint()), - &graph.ChannelInfo{ - Cpt: capacity, - }, - ) + // TODO(roasbeef): should have sigs, only after funding_locked is + // recv'd + // * also ensure fault tolerance, scan opened chan on start up check + // for graph existence + f.announceChannel(fmsg.peer.server, openChan, fmsg.msg.ChanChainID, + f.fakeProof, f.fakeProof) // Send the newly opened channel to the breach arbiter to it can watch - // for uncopperative channel breaches, potentially punishing the + // for uncooperative channel breaches, potentially punishing the // counter-party for attempting to cheat us. f.breachAribter.newContracts <- openChan diff --git a/htlcswitch.go b/htlcswitch.go index 067deb97..18807005 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -13,8 +13,6 @@ import ( "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing" - "github.com/lightningnetwork/lnd/routing/rt/graph" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" @@ -98,7 +96,7 @@ type paymentCircuit struct { settle *link } -// HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's. +// htlcSwitch is a central messaging bus for all incoming/outgoing HTLC's. // Connected peers with active channels are treated as named interfaces which // refer to active channels as links. A link is the switche's message // communication point with the goroutine that manages an active channel. New @@ -151,11 +149,6 @@ type htlcSwitch struct { // fully locked in. htlcPlex chan *htlcPacket - gateway []byte - router *routing.RoutingManager - - // TODO(roasbeef): messaging chan to/from upper layer (routing - L3) - // TODO(roasbeef): sampler to log sat/sec and tx/sec wg sync.WaitGroup @@ -163,10 +156,8 @@ type htlcSwitch struct { } // newHtlcSwitch creates a new htlcSwitch. -func newHtlcSwitch(gateway []byte, r *routing.RoutingManager) *htlcSwitch { +func newHtlcSwitch() *htlcSwitch { return &htlcSwitch{ - router: r, - gateway: gateway, chanIndex: make(map[wire.OutPoint]*link), interfaces: make(map[wire.ShaHash][]*link), onionIndex: make(map[[ripemd160.Size]byte][]*link), @@ -495,7 +486,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { // A request with a nil channel point indicates that all the current // links for this channel should be cleared. - chansRemoved := make([]*wire.OutPoint, 0, len(links)) if req.chanPoint == nil { hswcLog.Debugf("purging all active links for interface %v", hex.EncodeToString(chanInterface[:])) @@ -504,9 +494,8 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { h.chanIndexMtx.Lock() delete(h.chanIndex, *link.chanPoint) h.chanIndexMtx.Unlock() - - chansRemoved = append(chansRemoved, link.chanPoint) } + links = nil } else { h.chanIndexMtx.Lock() @@ -516,8 +505,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { for i := 0; i < len(links); i++ { chanLink := links[i] if chanLink.chanPoint == req.chanPoint { - chansRemoved = append(chansRemoved, req.chanPoint) - // We perform an in-place delete by sliding // every element down one, then slicing off the // last element. Additionally, we update the @@ -534,22 +521,6 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { } } - // Purge the now inactive channels from the routing table. - // TODO(roasbeef): routing layer should only see the links as a - // summation of their capacity/etc - // * distinction between connection close and channel close - for _, linkChan := range chansRemoved { - err := h.router.RemoveChannel( - graph.NewVertex(h.gateway), - graph.NewVertex(req.remoteID), - graph.NewEdgeID(*linkChan), - ) - if err != nil { - hswcLog.Errorf("unable to remove channel from "+ - "routing table: %v", err) - } - } - // TODO(roasbeef): clean up/modify onion links // * just have the interfaces index be keyed on hash160? diff --git a/log.go b/log.go index 86abafc9..f8a3e1a7 100644 --- a/log.go +++ b/log.go @@ -10,6 +10,7 @@ import ( "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/routing" ) // Loggers per subsystem. Note that backendLog is a seelog logger that all of @@ -30,6 +31,7 @@ var ( utxnLog = btclog.Disabled brarLog = btclog.Disabled cmgrLog = btclog.Disabled + crtrLog = btclog.Disabled ) // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -46,6 +48,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "UTXN": utxnLog, "BRAR": brarLog, "CMGR": cmgrLog, + "CRTR": crtrLog, } // useLogger updates the logger references for subsystemID to logger. Invalid @@ -96,6 +99,10 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "CMGR": cmgrLog = logger connmgr.UseLogger(logger) + + case "CRTR": + crtrLog = logger + routing.UseLogger(crtrLog) } } diff --git a/peer.go b/peer.go index 2bce0a9b..f9f7ccb1 100644 --- a/peer.go +++ b/peer.go @@ -19,7 +19,6 @@ import ( "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" - "github.com/lightningnetwork/lnd/routing/rt/graph" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" @@ -233,17 +232,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint) - // Notify the routing table of this newly loaded channel. - chanInfo := lnChan.StateSnapshot() - capacity := int64(chanInfo.LocalBalance + chanInfo.RemoteBalance) - pubSerialized := p.addr.IdentityKey.SerializeCompressed() - p.server.routingMgr.OpenChannel( - graph.NewVertex(pubSerialized), - graph.NewEdgeID(*chanInfo.ChannelPoint), - &graph.ChannelInfo{ - Cpt: capacity, - }, - ) + // TODO(roasbeef): register active channel with breach observer // Register this new channel link with the HTLC Switch. This is // necessary to properly route multi-hop payments, and forward @@ -379,7 +368,6 @@ out: case *lnwire.Ping: p.queueMsg(lnwire.NewPong(msg.Nonce), nil) - // TODO(roasbeef): consolidate into predicate (single vs dual) case *lnwire.SingleFundingRequest: p.server.fundingMgr.processFundingRequest(msg, p) case *lnwire.SingleFundingResponse: @@ -392,11 +380,10 @@ out: p.server.fundingMgr.processFundingOpenProof(msg, p) case *lnwire.CloseRequest: p.remoteCloseChanReqs <- msg - // TODO(roasbeef): interface for htlc update msgs - // * .(CommitmentUpdater) case *lnwire.ErrorGeneric: p.server.fundingMgr.processErrorGeneric(msg, p) + case *lnwire.HTLCAddRequest: isChanUpdate = true targetChan = msg.ChannelPoint @@ -409,14 +396,13 @@ out: case *lnwire.CommitSignature: isChanUpdate = true targetChan = msg.ChannelPoint - case *lnwire.NeighborAckMessage, - *lnwire.NeighborHelloMessage, - *lnwire.NeighborRstMessage, - *lnwire.NeighborUpdMessage: - // Convert to base routing message and set sender and receiver - vertex := p.addr.IdentityKey.SerializeCompressed() - p.server.routingMgr.ReceiveRoutingMessage(msg, graph.NewVertex(vertex)) + case *lnwire.NodeAnnouncement, + *lnwire.ChannelAnnouncement, + *lnwire.ChannelUpdateAnnouncement: + + p.server.chanRouter.ProcessRoutingMessage(msg, + p.addr.IdentityKey) } if isChanUpdate { diff --git a/server.go b/server.go index 2325cc00..fef7c75e 100644 --- a/server.go +++ b/server.go @@ -1,6 +1,8 @@ package main import ( + "encoding/hex" + "errors" "fmt" "net" "sync" @@ -20,7 +22,6 @@ import ( "github.com/roasbeef/btcutil" "github.com/lightningnetwork/lnd/routing" - "github.com/lightningnetwork/lnd/routing/rt/graph" ) // server is the main server of the Lightning Network Daemon. The server houses @@ -57,7 +58,7 @@ type server struct { invoices *invoiceRegistry breachArbiter *breachArbiter - routingMgr *routing.RoutingManager + chanRouter *routing.ChannelRouter utxoNursery *utxoNursery @@ -69,6 +70,9 @@ type server struct { persistentConnReqs map[string]*connmgr.ConnReq pendingConnRequests map[string]*connectPeerMsg + broadcastRequests chan *broadcastReq + sendRequests chan *sendReq + newPeers chan *peer donePeers chan *peer queries chan interface{} @@ -105,6 +109,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, invoices: newInvoiceRegistry(chanDB), utxoNursery: newUtxoNursery(notifier, wallet), + htlcSwitch: newHtlcSwitch(), identityPriv: privKey, @@ -122,6 +127,9 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, newPeers: make(chan *peer, 10), donePeers: make(chan *peer, 10), + broadcastRequests: make(chan *broadcastReq), + sendRequests: make(chan *sendReq), + queries: make(chan interface{}), quit: make(chan struct{}), } @@ -136,47 +144,37 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, debugPre[:], debugHash[:]) } - s.utxoNursery = newUtxoNursery(notifier, wallet) - - // Create a new routing manager with ourself as the sole node within - // the graph. - selfVertex := serializedPubKey - routingMgrConfig := &routing.RoutingConfig{} - routingMgrConfig.SendMessage = func(receiver [33]byte, msg lnwire.Message) error { - receiverID := graph.NewVertex(receiver[:]) - if receiverID == graph.NilVertex { - peerLog.Critical("receiverID == graph.NilVertex") - return fmt.Errorf("receiverID == graph.NilVertex") - } - - var targetPeer *peer - for _, peer := range s.peersByID { // TODO: threadsafe API - nodePub := peer.addr.IdentityKey.SerializeCompressed() - nodeVertex := graph.NewVertex(nodePub[:]) - - // We found the target - if receiverID == nodeVertex { - targetPeer = peer - break - } - } - - if targetPeer != nil { - targetPeer.queueMsg(msg, nil) - } else { - srvrLog.Errorf("Can't find peer to send message %v", - receiverID) - } - return nil + // TODO(roasbeef): add --externalip flag? + selfAddr, ok := listeners[0].Addr().(*net.TCPAddr) + if !ok { + return nil, fmt.Errorf("default listener must be TCP") } - s.routingMgr = routing.NewRoutingManager(graph.NewVertex(selfVertex), routingMgrConfig) - s.htlcSwitch = newHtlcSwitch(serializedPubKey, s.routingMgr) + chanGraph := chanDB.ChannelGraph() + self := &channeldb.LightningNode{ + LastUpdate: time.Now(), + Address: selfAddr, + PubKey: privKey.PubKey(), + // TODO(roasbeef): make alias configurable + Alias: hex.EncodeToString(serializedPubKey[:10]), + } + if err := chanGraph.SetSourceNode(self); err != nil { + return nil, err + } + + s.chanRouter, err = routing.New(routing.Config{ + Graph: chanGraph, + Chain: bio, + Notifier: notifier, + Broadcast: s.broadcastMessage, + SendMessages: s.sendToPeer, + }) + if err != nil { + return nil, err + } s.rpcServer = newRpcServer(s) - s.breachArbiter = newBreachArbiter(wallet, chanDB, notifier, s.htlcSwitch) - s.fundingMgr = newFundingManager(wallet, s.breachArbiter) // TODO(roasbeef): introduce closure and config system to decouple the @@ -268,7 +266,9 @@ func (s *server) Start() error { if err := s.breachArbiter.Start(); err != nil { return err } - s.routingMgr.Start() + if err := s.chanRouter.Start(); err != nil { + return err + } s.wg.Add(1) go s.queryHandler() @@ -289,7 +289,7 @@ func (s *server) Stop() error { s.chainNotifier.Stop() s.rpcServer.Stop() s.fundingMgr.Stop() - s.routingMgr.Stop() + s.chanRouter.Stop() s.htlcSwitch.Stop() s.utxoNursery.Stop() s.breachArbiter.Stop() @@ -308,6 +308,81 @@ func (s *server) WaitForShutdown() { s.wg.Wait() } +// broadcastReq is a message sent to the server by a related sub-system when it +// wishes to broadcast one or more messages to all connected peers. Thi +type broadcastReq struct { + ignore *btcec.PublicKey + msgs []lnwire.Message + + errChan chan error // MUST be buffered. +} + +// broadcastMessage sends a request to the server to broadcast a set of +// messages to all peers other than the one specified by the `skip` parameter. +func (s *server) broadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error { + errChan := make(chan error, 1) + + msgsToSend := make([]lnwire.Message, 0, len(msgs)) + msgsToSend = append(msgsToSend, msgs...) + broadcastReq := &broadcastReq{ + ignore: skip, + msgs: msgsToSend, + errChan: errChan, + } + + select { + case s.broadcastRequests <- broadcastReq: + case <-s.quit: + return errors.New("server shutting down") + } + + select { + case err := <-errChan: + return err + case <-s.quit: + return errors.New("server shutting down") + } +} + +// sendReq is message sent to the server by a related sub-system which it +// wishes to send a set of messages to a specified peer. +type sendReq struct { + target *btcec.PublicKey + msgs []lnwire.Message + + errChan chan error +} + +// sendToPeer send a message to the server telling it to send the specific set +// of message to a particular peer. If the peer connect be found, then this +// method will return a non-nil error. +func (s *server) sendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { + errChan := make(chan error, 1) + + msgsToSend := make([]lnwire.Message, 0, len(msgs)) + msgsToSend = append(msgsToSend, msgs...) + sMsg := &sendReq{ + target: target, + msgs: msgsToSend, + errChan: errChan, + } + + select { + case s.sendRequests <- sMsg: + case <-s.quit: + return errors.New("server shutting down") + } + + select { + case err := <-errChan: + return err + case <-s.quit: + return errors.New("server shutting down") + } + + return nil +} + // peerConnected is a function that handles initialization a newly connected // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. @@ -415,9 +490,6 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // addPeer adds the passed peer to the server's global state of all active // peers. func (s *server) addPeer(p *peer) { - s.peersMtx.Lock() - defer s.peersMtx.Unlock() - if p == nil { return } @@ -428,8 +500,19 @@ func (s *server) addPeer(p *peer) { return } + // Track the new peer in our indexes so we can quickly look it up either + // according to its public key, or it's peer ID. + // TODO(roasbeef): pipe all requests through to the + // queryHandler/peerManager + s.peersMtx.Lock() s.peersByID[p.id] = p s.peersByPub[string(p.addr.IdentityKey.SerializeCompressed())] = p + s.peersMtx.Unlock() + + // Once the peer has been added to our indexes, send a message to the + // channel router so we can synchronize our view of the channel graph + // with this new peer. + s.chanRouter.SynchronizeNode(p.addr.IdentityKey) } // removePeer removes the passed peer from the server's state of all active @@ -508,6 +591,54 @@ out: case p := <-s.donePeers: s.removePeer(p) + case bMsg := <-s.broadcastRequests: + ignore := bMsg.ignore + + srvrLog.Debugf("Broadcasting %v messages", len(bMsg.msgs)) + + s.peersMtx.RLock() + for _, peer := range s.peersByPub { + if ignore != nil && + peer.addr.IdentityKey.IsEqual(ignore) { + + srvrLog.Debugf("Skipping %v in broadcast", + ignore.SerializeCompressed()) + + continue + } + + for _, msg := range bMsg.msgs { + peer.queueMsg(msg, nil) + } + } + s.peersMtx.RUnlock() + + bMsg.errChan <- nil + case sMsg := <-s.sendRequests: + // TODO(roasbeef): use [33]byte everywhere instead + // * eliminate usage of mutexes, funnel all peer + // mutation to this goroutine + target := sMsg.target.SerializeCompressed() + + srvrLog.Debugf("Attempting to send msgs %v to: %x", + len(sMsg.msgs), target) + + s.peersMtx.RLock() + targetPeer, ok := s.peersByPub[string(target)] + if !ok { + s.peersMtx.RUnlock() + srvrLog.Errorf("unable to send message to %x, "+ + "peer not found", target) + sMsg.errChan <- errors.New("peer not found") + continue + } + + for _, msg := range sMsg.msgs { + targetPeer.queueMsg(msg, nil) + } + s.peersMtx.RUnlock() + + sMsg.errChan <- nil case query := <-s.queries: switch msg := query.(type) { case *connectPeerMsg: @@ -587,6 +718,9 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) { Permanent: true, }) + // TODO(roasbeef): create goroutine to poll state so can report + // connection fails + // Finally, we store the original request keyed by the public key so we // can dispatch the response to the RPC client once a connection has // been initiated.