diff --git a/fundingmanager.go b/fundingmanager.go index 575766b4..8a9f94a2 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -200,7 +200,7 @@ func (f *fundingManager) NumPendingChannels() uint32 { type pendingChannel struct { peerId int32 - lightningID [32]byte + identityPub *btcec.PublicKey channelPoint *wire.OutPoint capacity btcutil.Amount localBalance btcutil.Amount @@ -285,7 +285,7 @@ func (f *fundingManager) handlePendingChannels(msg *pendingChansReq) { pendingChan := &pendingChannel{ peerId: peerID, - lightningID: peer.lightningID, + identityPub: peer.addr.IdentityKey, channelPoint: res.FundingOutpoint(), capacity: localFund + remoteFund, localBalance: localFund, @@ -340,7 +340,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) { // channel ourselves. // TODO(roasbeef): passing num confs 1 is irrelevant here, make signed? reservation, err := f.wallet.InitChannelReservation(amt, 0, - fmsg.peer.identityPub, fmsg.peer.lightningAddr.NetAddr, 1, delay) + fmsg.peer.addr.IdentityKey, fmsg.peer.addr.Address, 1, delay) if err != nil { // TODO(roasbeef): push ErrorGeneric message fndgLog.Errorf("Unable to initialize reservation: %v", err) @@ -614,7 +614,8 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) // finding. chanInfo := openChan.StateSnapshot() capacity := int64(chanInfo.LocalBalance + chanInfo.RemoteBalance) - vertex := hex.EncodeToString(fmsg.peer.identityPub.SerializeCompressed()) + pubSerialized := fmsg.peer.addr.IdentityKey.SerializeCompressed() + vertex := hex.EncodeToString(pubSerialized) fmsg.peer.server.routingMgr.OpenChannel( graph.NewID(vertex), graph.NewEdgeID(fundingPoint.String()), @@ -686,7 +687,7 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { // Notify the L3 routing manager of the newly active channel link. capacity := int64(resCtx.reservation.OurContribution().FundingAmount + resCtx.reservation.TheirContribution().FundingAmount) - vertex := hex.EncodeToString(fmsg.peer.identityPub.SerializeCompressed()) + vertex := hex.EncodeToString(fmsg.peer.addr.IdentityKey.SerializeCompressed()) fmsg.peer.server.routingMgr.OpenChannel( graph.NewID(vertex), graph.NewEdgeID(resCtx.reservation.FundingOutpoint().String()), @@ -715,7 +716,7 @@ func (f *fundingManager) initFundingWorkflow(targetPeer *peer, req *openChanReq) func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { var ( // TODO(roasbeef): add delay - nodeID = msg.peer.identityPub + nodeID = msg.peer.addr.IdentityKey localAmt = msg.localFundingAmt remoteAmt = msg.remoteFundingAmt capacity = localAmt + remoteAmt @@ -724,13 +725,13 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { fndgLog.Infof("Initiating fundingRequest(localAmt=%v, remoteAmt=%v, "+ "capacity=%v, numConfs=%v, addr=%v)", localAmt, remoteAmt, - capacity, numConfs, msg.peer.lightningAddr.NetAddr) + capacity, numConfs, msg.peer.addr.Address) // Initialize a funding reservation with the local wallet. If the // wallet doesn't have enough funds to commit to this channel, then // the request will fail, and be aborted. reservation, err := f.wallet.InitChannelReservation(capacity, localAmt, - nodeID, msg.peer.lightningAddr.NetAddr, uint16(numConfs), 4) + nodeID, msg.peer.addr.Address, uint16(numConfs), 4) if err != nil { msg.err <- err return diff --git a/htlcswitch.go b/htlcswitch.go index 5628d059..86066b39 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -125,8 +125,10 @@ type htlcSwitch struct { interfaceMtx sync.RWMutex interfaces map[wire.ShaHash][]*link - // onionIndex is a secondary index used to properly forward a message - // to the next hop within a Sphinx circuit. + // onionIndex is an index used to properly forward a message + // to the next hop within a Sphinx circuit. Within the sphinx packets, + // the "next-hop" destination is encoded as the hash160 of the node's + // public key serialized in compressed format. onionMtx sync.RWMutex onionIndex map[[ripemd160.Size]byte][]*link @@ -442,6 +444,10 @@ func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) { chanPoint: chanPoint, } + // First update the channel index with this new channel point. The + // channel index will be used to quickly lookup channels in order to: + // close them, update their link capacity, or possibly during multi-hop + // HTLC forwarding. h.chanIndexMtx.Lock() h.chanIndex[*chanPoint] = newLink h.chanIndexMtx.Unlock() @@ -452,8 +458,11 @@ func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) { h.interfaces[interfaceID] = append(h.interfaces[interfaceID], newLink) h.interfaceMtx.Unlock() + // Next, update the onion index which is used to look up the + // settle/clear links during multi-hop payments and to dispatch + // outgoing payments initiated by a local sub-system. var onionId [ripemd160.Size]byte - copy(onionId[:], btcutil.Hash160(req.peer.identityPub.SerializeCompressed())) + copy(onionId[:], btcutil.Hash160(req.peer.addr.IdentityKey.SerializeCompressed())) h.onionMtx.Lock() h.onionIndex[onionId] = h.interfaces[interfaceID] diff --git a/peer.go b/peer.go index b2004735..3c8b331c 100644 --- a/peer.go +++ b/peer.go @@ -15,7 +15,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -66,9 +65,8 @@ type peer struct { conn net.Conn - identityPub *btcec.PublicKey - lightningAddr *lndc.LNAdr - lightningID wire.ShaHash + addr *lnwire.NetAddress + lightningID wire.ShaHash inbound bool id int32 @@ -88,9 +86,6 @@ type peer struct { satoshisSent uint64 satoshisReceived uint64 - // chainNet is the Bitcoin network to which this peer is anchored to. - chainNet wire.BitcoinNet - // sendQueue is the channel which is used to queue outgoing to be // written onto the wire. Note that this channel is unbuffered. sendQueue chan outgoinMsg @@ -150,17 +145,18 @@ type peer struct { // newPeer creates a new peer from an establish connection object, and a // pointer to the main server. -func newPeer(conn net.Conn, server *server, btcNet wire.BitcoinNet, inbound bool) (*peer, error) { - lndcConn := conn.(*lndc.LNDConn) - nodePub := lndcConn.RemotePub +func newPeer(conn net.Conn, server *server, addr *lnwire.NetAddress, + inbound bool) (*peer, error) { + + nodePub := addr.IdentityKey p := &peer{ conn: conn, - identityPub: nodePub, lightningID: wire.ShaHash(fastsha256.Sum256(nodePub.SerializeCompressed())), - id: atomic.AddInt32(&numNodes, 1), - chainNet: btcNet, - inbound: inbound, + addr: addr, + + id: atomic.AddInt32(&numNodes, 1), + inbound: inbound, server: server, @@ -184,15 +180,6 @@ func newPeer(conn net.Conn, server *server, btcNet wire.BitcoinNet, inbound bool quit: make(chan struct{}), } - // TODO(roasbeef): re-write after lnaddr revamp, shouldn't need to use - // type assertions - var err error - tcpAddr := lndcConn.Conn.(*net.TCPConn).RemoteAddr().(*net.TCPAddr) - p.lightningAddr, err = lndc.NewLnAdr(tcpAddr, nodePub, activeNetParams.Params) - if err != nil { - return nil, err - } - // Initiate the pending channel identifier properly depending on if this // node is inbound or outbound. This value will be used in an increasing // manner to track pending channels. @@ -204,7 +191,7 @@ func newPeer(conn net.Conn, server *server, btcNet wire.BitcoinNet, inbound bool // Fetch and then load all the active channels we have with this // remote peer from the database. - activeChans, err := server.chanDB.FetchOpenChannels(p.identityPub) + activeChans, err := server.chanDB.FetchOpenChannels(p.addr.IdentityKey) if err != nil { peerLog.Errorf("unable to fetch active chans "+ "for peer %v: %v", p, err) @@ -313,7 +300,7 @@ func (p *peer) Disconnect() { // Tell the switch to unregister all links associated with this // peer. Passing nil as the target link indicates that all links // associated with this interface should be closed. - p.server.htlcSwitch.UnregisterLink(p.identityPub, nil) + p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, nil) p.server.donePeers <- p }() @@ -328,7 +315,8 @@ func (p *peer) String() string { // any additional raw payload. func (p *peer) readNextMessage() (lnwire.Message, []byte, error) { // TODO(roasbeef): use our own net magic? - n, nextMsg, rawPayload, err := lnwire.ReadMessage(p.conn, 0, p.chainNet) + n, nextMsg, rawPayload, err := lnwire.ReadMessage(p.conn, 0, + p.addr.ChainNet) atomic.AddUint64(&p.bytesReceived, uint64(n)) if err != nil { return nil, nil, err @@ -403,7 +391,7 @@ out: *lnwire.RoutingTableTransferMessage: // Convert to base routing message and set sender and receiver - vertex := hex.EncodeToString(p.identityPub.SerializeCompressed()) + vertex := hex.EncodeToString(p.addr.IdentityKey.SerializeCompressed()) p.server.routingMgr.ReceiveRoutingMessage(msg, graph.NewID(vertex)) } @@ -457,7 +445,7 @@ func (p *peer) writeMessage(msg lnwire.Message) error { return spew.Sdump(msg) })) - n, err := lnwire.WriteMessage(p.conn, msg, 0, p.chainNet) + n, err := lnwire.WriteMessage(p.conn, msg, 0, p.addr.ChainNet) atomic.AddUint64(&p.bytesSent, uint64(n)) return err @@ -846,7 +834,7 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error { // Instruct the Htlc Switch to close this link as the channel is no // longer active. - p.server.htlcSwitch.UnregisterLink(p.identityPub, chanID) + p.server.htlcSwitch.UnregisterLink(p.addr.IdentityKey, chanID) htlcWireLink, ok := p.htlcManagers[*chanID] if !ok { return nil @@ -1274,6 +1262,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { for _, htlc := range htlcsToForward { // We don't need to forward any HTLC's that we // just settled above. + // TODO(roasbeef): key by index insteaad? if _, ok := settledPayments[htlc.RHash]; ok { continue } @@ -1322,6 +1311,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // Notify the invoiceRegistry of the invoices we just settled // with this latest commitment update. + // TODO(roasbeef): wait until next transition? for invoice, _ := range settledPayments { err := p.server.invoices.SettleInvoice(wire.ShaHash(invoice)) if err != nil { diff --git a/rpcserver.go b/rpcserver.go index bbb74307..7f601bf9 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "io" + "net" "time" "sync" @@ -15,7 +16,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" @@ -194,14 +194,25 @@ func (r *rpcServer) ConnectPeer(ctx context.Context, return nil, fmt.Errorf("need: lnc pubkeyhash@hostname") } - idAtHost := fmt.Sprintf("%v@%v", in.Addr.PubKeyHash, in.Addr.Host) - rpcsLog.Debugf("[connectpeer] peer=%v", idAtHost) - - peerAddr, err := lndc.LnAddrFromString(idAtHost, activeNetParams.Params) + pubkeyHex, err := hex.DecodeString(in.Addr.Pubkey) if err != nil { - rpcsLog.Errorf("(connectpeer): error parsing ln addr: %v", err) return nil, err } + pubkey, err := btcec.ParsePubKey(pubkeyHex, btcec.S256()) + if err != nil { + return nil, err + } + + host, err := net.ResolveTCPAddr("tcp", in.Addr.Host) + if err != nil { + return nil, err + } + + peerAddr := &lnwire.NetAddress{ + IdentityKey: pubkey, + Address: host, + ChainNet: activeNetParams.Net, + } peerID, err := r.server.ConnectToPeer(peerAddr) if err != nil { @@ -209,6 +220,7 @@ func (r *rpcServer) ConnectPeer(ctx context.Context, return nil, err } + // TODO(roasbeef): add pubkey return rpcsLog.Debugf("Connected to peer: %v", peerAddr.String()) return &lnrpc.ConnectPeerResponse{peerID}, nil } @@ -224,8 +236,13 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest, localFundingAmt := btcutil.Amount(in.LocalFundingAmount) remoteFundingAmt := btcutil.Amount(in.RemoteFundingAmount) + nodepubKey, err := btcec.ParsePubKey(in.NodePubkey, btcec.S256()) + if err != nil { + return err + } + updateChan, errChan := r.server.OpenChannel(in.TargetPeerId, - in.TargetNode, localFundingAmt, remoteFundingAmt, in.NumConfs) + nodepubKey, localFundingAmt, remoteFundingAmt, in.NumConfs) var outpoint wire.OutPoint out: @@ -233,8 +250,8 @@ out: select { case err := <-errChan: rpcsLog.Errorf("unable to open channel to "+ - "lightningID(%x) nor peerID(%v): %v", - in.TargetNode, in.TargetPeerId, err) + "identityPub(%x) nor peerID(%v): %v", + nodepubKey, in.TargetPeerId, err) return err case fundingUpdate := <-updateChan: rpcsLog.Tracef("[openchannel] sending update: %v", @@ -333,17 +350,11 @@ func (r *rpcServer) GetInfo(ctx context.Context, } pendingChannels := r.server.fundingMgr.NumPendingChannels() - idPub := r.server.identityPriv.PubKey().SerializeCompressed() - idAddr, err := btcutil.NewAddressPubKeyHash(btcutil.Hash160(idPub), activeNetParams.Params) - if err != nil { - return nil, err - } return &lnrpc.GetInfoResponse{ LightningId: hex.EncodeToString(r.server.lightningID[:]), IdentityPubkey: hex.EncodeToString(idPub), - IdentityAddress: idAddr.String(), NumPendingChannels: pendingChannels, NumActiveChannels: activeChannels, NumPeers: uint32(len(serverPeers)), @@ -364,7 +375,7 @@ func (r *rpcServer) ListPeers(ctx context.Context, for _, serverPeer := range serverPeers { // TODO(roasbeef): add a snapshot method which grabs peer read mtx - nodePub := serverPeer.identityPub.SerializeCompressed() + nodePub := serverPeer.addr.IdentityKey.SerializeCompressed() peer := &lnrpc.Peer{ PubKey: hex.EncodeToString(nodePub), PeerId: serverPeer.id, @@ -432,9 +443,10 @@ func (r *rpcServer) PendingChannels(ctx context.Context, pendingOpenChans := r.server.fundingMgr.PendingChannels() for _, pendingOpen := range pendingOpenChans { // TODO(roasbeef): add confirmation progress + pub := pendingOpen.identityPub.SerializeCompressed() pendingChan := &lnrpc.PendingChannelResponse_PendingChannel{ PeerId: pendingOpen.peerId, - LightningId: hex.EncodeToString(pendingOpen.lightningID[:]), + IdentityKey: hex.EncodeToString(pub), ChannelPoint: pendingOpen.channelPoint.String(), Capacity: int64(pendingOpen.capacity), LocalBalance: int64(pendingOpen.localBalance), diff --git a/server.go b/server.go index 5d214152..f4a2c510 100644 --- a/server.go +++ b/server.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "encoding/hex" "fmt" "net" @@ -10,11 +9,12 @@ import ( "github.com/btcsuite/fastsha256" "github.com/lightningnetwork/lightning-onion" + "github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" - "github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwallet" + "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcutil" @@ -81,7 +81,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, listeners := make([]net.Listener, len(listenAddrs)) for i, addr := range listenAddrs { - listeners[i], err = lndc.NewListener(privKey, addr) + listeners[i], err = brontide.NewListener(privKey, addr) if err != nil { return nil, err } @@ -250,7 +250,7 @@ func (s *server) removePeer(p *peer) { // particular peer. This message also houses an error channel which will be // used to report success/failure. type connectPeerMsg struct { - addr *lndc.LNAdr + addr *lnwire.NetAddress resp chan int32 err chan error } @@ -266,7 +266,7 @@ type listPeersMsg struct { // relative peer ID, or a global lightning ID. type openChanReq struct { targetPeerID int32 - targetNodeID [32]byte + targetPubkey *btcec.PublicKey // TODO(roasbeef): make enums in lnwire channelType uint8 @@ -318,7 +318,7 @@ out: var targetPeer *peer for _, peer := range s.peers { // TODO: threadsafe api - nodePub := peer.identityPub.SerializeCompressed() + nodePub := peer.addr.IdentityKey.SerializeCompressed() idStr := hex.EncodeToString(nodePub) // We found the the target @@ -361,51 +361,39 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) { // Ensure we're not already connected to this // peer. + targetPub := msg.addr.IdentityKey for _, peer := range s.peers { - if peer.lightningAddr.String() == addr.String() { + if peer.addr.IdentityKey.IsEqual(targetPub) { msg.err <- fmt.Errorf( "already connected to peer: %v", - peer.lightningAddr, + peer.addr, ) msg.resp <- -1 return } } - // Launch a goroutine to connect to the requested - // peer so we can continue to handle queries. + // Launch a goroutine to connect to the requested peer so we can + // continue to handle queries. + // // TODO(roasbeef): semaphore to limit the number of goroutines for // async requests. go func() { - // For the lndc crypto handshake, we - // either need a compressed pubkey, or a - // 20-byte pkh. - var remoteId []byte - if addr.PubKey == nil { - remoteId = addr.Base58Adr.ScriptAddress() - } else { - remoteId = addr.PubKey.SerializeCompressed() - } + srvrLog.Debugf("connecting to %v", addr) - srvrLog.Debugf("connecting to %v", hex.EncodeToString(remoteId)) - // Attempt to connect to the remote - // node. If the we can't make the - // connection, or the crypto negotation - // breaks down, then return an error to the - // caller. - ipAddr := addr.NetAddr.String() - conn := lndc.NewConn(nil) - if err := conn.Dial( - s.identityPriv, ipAddr, remoteId); err != nil { + // Attempt to connect to the remote node. If the we can't make + // the connection, or the crypto negotation breaks down, then + // return an error to the caller. + conn, err := brontide.Dial(s.identityPriv, addr) + if err != nil { msg.err <- err msg.resp <- -1 return } - // Now that we've established a connection, - // create a peer, and it to the set of - // currently active peers. - peer, err := newPeer(conn, s, activeNetParams.Net, false) + // Now that we've established a connection, create a peer, and + // it to the set of currently active peers. + peer, err := newPeer(conn, s, msg.addr, false) if err != nil { srvrLog.Errorf("unable to create peer %v", err) conn.Close() @@ -414,6 +402,9 @@ func (s *server) handleConnectPeer(msg *connectPeerMsg) { return } + // TODO(roasbeef): update IP address for link-node + // * also mark last-seen, do it one single transaction? + peer.Start() s.newPeers <- peer @@ -431,16 +422,17 @@ func (s *server) handleOpenChanReq(req *openChanReq) { var targetPeer *peer for _, peer := range s.peers { // TODO(roasbeef): threadsafe api // We found the the target - if req.targetPeerID == peer.id || - bytes.Equal(req.targetNodeID[:], peer.lightningID[:]) { + if peer.addr.IdentityKey.IsEqual(req.targetPubkey) || + req.targetPeerID == peer.id { targetPeer = peer break } } if targetPeer == nil { - req.err <- fmt.Errorf("unable to find peer lightningID(%v), "+ - "peerID(%v)", req.targetNodeID, req.targetPeerID) + req.err <- fmt.Errorf("unable to find peer nodeID(%x), "+ + "peerID(%v)", req.targetPubkey.SerializeCompressed(), + req.targetPeerID) return } @@ -455,7 +447,7 @@ func (s *server) handleOpenChanReq(req *openChanReq) { // ConnectToPeer requests that the server connect to a Lightning Network peer // at the specified address. This function will *block* until either a // connection is established, or the initial handshake process fails. -func (s *server) ConnectToPeer(addr *lndc.LNAdr) (int32, error) { +func (s *server) ConnectToPeer(addr *lnwire.NetAddress) (int32, error) { reply := make(chan int32, 1) errChan := make(chan error, 1) @@ -466,7 +458,8 @@ func (s *server) ConnectToPeer(addr *lndc.LNAdr) (int32, error) { // OpenChannel sends a request to the server to open a channel to the specified // peer identified by ID with the passed channel funding paramters. -func (s *server) OpenChannel(peerID int32, nodeID []byte, localAmt, remoteAmt btcutil.Amount, +func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey, + localAmt, remoteAmt btcutil.Amount, numConfs uint32) (chan *lnrpc.OpenStatusUpdate, chan error) { errChan := make(chan error, 1) @@ -474,13 +467,13 @@ func (s *server) OpenChannel(peerID int32, nodeID []byte, localAmt, remoteAmt bt req := &openChanReq{ targetPeerID: peerID, + targetPubkey: nodeKey, localFundingAmt: localAmt, remoteFundingAmt: remoteAmt, numConfs: numConfs, updates: updateChan, err: errChan, } - copy(req.targetNodeID[:], nodeID) s.queries <- req @@ -514,13 +507,24 @@ func (s *server) listener(l net.Listener) { } srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr()) - peer, err := newPeer(conn, s, activeNetParams.Net, true) + + brontideConn := conn.(*brontide.Conn) + peerAddr := &lnwire.NetAddress{ + IdentityKey: brontideConn.RemotePub(), + Address: conn.RemoteAddr().(*net.TCPAddr), + ChainNet: activeNetParams.Net, + } + + peer, err := newPeer(conn, s, peerAddr, true) if err != nil { srvrLog.Errorf("unable to create peer: %v", err) conn.Close() continue } + // TODO(roasbeef): update IP address for link-node + // * also mark last-seen, do it one single transaction? + peer.Start() s.newPeers <- peer }