multi: remove internal peer_id usage

This commit is contained in:
MeshCollider 2018-02-14 18:48:42 +13:00 committed by Olaoluwa Osuntokun
parent 4ed5ba0d26
commit 915c4201b9
6 changed files with 36 additions and 75 deletions

View File

@ -434,7 +434,6 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
// Create a funding request and start the workflow. // Create a funding request and start the workflow.
errChan := make(chan error, 1) errChan := make(chan error, 1)
initReq := &openChanReq{ initReq := &openChanReq{
targetPeerID: int32(1),
targetPubkey: bob.privKey.PubKey(), targetPubkey: bob.privKey.PubKey(),
chainHash: *activeNetParams.GenesisHash, chainHash: *activeNetParams.GenesisHash,
localFundingAmt: localFundingAmt, localFundingAmt: localFundingAmt,

View File

@ -839,13 +839,6 @@ func (m *ConnectPeerResponse) String() string { return proto.CompactT
func (*ConnectPeerResponse) ProtoMessage() {} func (*ConnectPeerResponse) ProtoMessage() {}
func (*ConnectPeerResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} } func (*ConnectPeerResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} }
func (m *ConnectPeerResponse) GetPeerId() int32 {
if m != nil {
return m.PeerId
}
return 0
}
type DisconnectPeerRequest struct { type DisconnectPeerRequest struct {
// / The pubkey of the node to disconnect from // / The pubkey of the node to disconnect from
PubKey string `protobuf:"bytes,1,opt,name=pub_key" json:"pub_key,omitempty"` PubKey string `protobuf:"bytes,1,opt,name=pub_key" json:"pub_key,omitempty"`
@ -1139,13 +1132,6 @@ func (m *Peer) GetPubKey() string {
return "" return ""
} }
func (m *Peer) GetPeerId() int32 {
if m != nil {
return m.PeerId
}
return 0
}
func (m *Peer) GetAddress() string { func (m *Peer) GetAddress() string {
if m != nil { if m != nil {
return m.Address return m.Address
@ -1654,13 +1640,6 @@ func (m *OpenChannelRequest) String() string { return proto.CompactTe
func (*OpenChannelRequest) ProtoMessage() {} func (*OpenChannelRequest) ProtoMessage() {}
func (*OpenChannelRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{41} } func (*OpenChannelRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{41} }
func (m *OpenChannelRequest) GetTargetPeerId() int32 {
if m != nil {
return m.TargetPeerId
}
return 0
}
func (m *OpenChannelRequest) GetNodePubkey() []byte { func (m *OpenChannelRequest) GetNodePubkey() []byte {
if m != nil { if m != nil {
return m.NodePubkey return m.NodePubkey

10
peer.go
View File

@ -106,7 +106,6 @@ type peer struct {
pubKeyBytes [33]byte pubKeyBytes [33]byte
inbound bool inbound bool
id int32
// This mutex protects all the stats below it. // This mutex protects all the stats below it.
sync.RWMutex sync.RWMutex
@ -179,7 +178,6 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
conn: conn, conn: conn,
addr: addr, addr: addr,
id: atomic.AddInt32(&numNodes, 1),
inbound: inbound, inbound: inbound,
connReq: connReq, connReq: connReq,
@ -276,7 +274,7 @@ func (p *peer) Start() error {
// registering them with the switch and launching the necessary // registering them with the switch and launching the necessary
// goroutines required to operate them. // goroutines required to operate them.
peerLog.Debugf("Loaded %v active channels from database with "+ peerLog.Debugf("Loaded %v active channels from database with "+
"peerID(%v)", len(activeChans), p.id) "peerIDKey(%x)", len(activeChans), p.PubKey())
if err := p.loadActiveChannels(activeChans); err != nil { if err := p.loadActiveChannels(activeChans); err != nil {
return fmt.Errorf("unable to load channels: %v", err) return fmt.Errorf("unable to load channels: %v", err)
} }
@ -310,7 +308,7 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
p.activeChannels[chanID] = lnChan p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
peerLog.Infof("peerID(%v) loading ChannelPoint(%v)", p.id, chanPoint) peerLog.Infof("peerIDKey(%x) loading ChannelPoint(%v)", p.PubKey(), chanPoint)
// Skip adding any permanently irreconcilable channels to the // Skip adding any permanently irreconcilable channels to the
// htlcswitch. // htlcswitch.
@ -1247,7 +1245,7 @@ out:
p.activeChanMtx.Unlock() p.activeChanMtx.Unlock()
peerLog.Infof("New channel active ChannelPoint(%v) "+ peerLog.Infof("New channel active ChannelPoint(%v) "+
"with peerId(%v)", chanPoint, p.id) "with peerIDKey(%x)", chanPoint, p.PubKey())
// Next, we'll assemble a ChannelLink along with the // Next, we'll assemble a ChannelLink along with the
// necessary items it needs to function. // necessary items it needs to function.
@ -1304,7 +1302,7 @@ out:
// local payments and also passively forward payments. // local payments and also passively forward payments.
if err := p.server.htlcSwitch.AddLink(link); err != nil { if err := p.server.htlcSwitch.AddLink(link); err != nil {
peerLog.Errorf("can't register new channel "+ peerLog.Errorf("can't register new channel "+
"link(%v) with peerId(%v)", chanPoint, p.id) "link(%v) with peerIdKey(%x)", chanPoint, p.PubKey())
} }
close(newChanReq.done) close(newChanReq.done)

View File

@ -93,7 +93,7 @@ func (c *chanController) OpenChannel(target *btcec.PublicKey,
// TODO(halseth): make configurable? // TODO(halseth): make configurable?
minHtlc := lnwire.NewMSatFromSatoshis(1) minHtlc := lnwire.NewMSatFromSatoshis(1)
updateStream, errChan := c.server.OpenChannel(-1, target, amt, 0, updateStream, errChan := c.server.OpenChannel(target, amt, 0,
minHtlc, feePerWeight, false) minHtlc, feePerWeight, false)
select { select {

View File

@ -668,8 +668,8 @@ func (r *rpcServer) DisconnectPeer(ctx context.Context,
func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest, func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
updateStream lnrpc.Lightning_OpenChannelServer) error { updateStream lnrpc.Lightning_OpenChannelServer) error {
rpcsLog.Tracef("[openchannel] request to peerid(%v) "+ rpcsLog.Tracef("[openchannel] request to identityPub(%v) "+
"allocation(us=%v, them=%v)", in.TargetPeerId, "allocation(us=%v, them=%v)", in.NodePubkeyString,
in.LocalFundingAmount, in.PushSat) in.LocalFundingAmount, in.PushSat)
if !r.server.Started() { if !r.server.Started() {
@ -720,24 +720,21 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
// TODO(roasbeef): also return channel ID? // TODO(roasbeef): also return channel ID?
// If the node key is set, then we'll parse the raw bytes into a pubkey // Parse the raw bytes of the node key into a pubkey object so we
// object so we can easily manipulate it. If this isn't set, then we // can easily manipulate it.
// expected the TargetPeerId to be set accordingly. nodePubKey, err = btcec.ParsePubKey(in.NodePubkey, btcec.S256())
if len(in.NodePubkey) != 0 { if err != nil {
nodePubKey, err = btcec.ParsePubKey(in.NodePubkey, btcec.S256()) return err
if err != nil {
return err
}
// Making a channel to ourselves wouldn't be of any use, so we
// explicitly disallow them.
if nodePubKey.IsEqual(r.server.identityPriv.PubKey()) {
return fmt.Errorf("cannot open channel to self")
}
nodePubKeyBytes = nodePubKey.SerializeCompressed()
} }
// Making a channel to ourselves wouldn't be of any use, so we
// explicitly disallow them.
if nodePubKey.IsEqual(r.server.identityPriv.PubKey()) {
return fmt.Errorf("cannot open channel to self")
}
nodePubKeyBytes = nodePubKey.SerializeCompressed()
// Based on the passed fee related parameters, we'll determine an // Based on the passed fee related parameters, we'll determine an
// appropriate fee rate for the funding transaction. // appropriate fee rate for the funding transaction.
feePerByte, err := determineFeePerByte( feePerByte, err := determineFeePerByte(
@ -754,7 +751,7 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
// open a new channel. A stream is returned in place, this stream will // open a new channel. A stream is returned in place, this stream will
// be used to consume updates of the state of the pending channel. // be used to consume updates of the state of the pending channel.
updateChan, errChan := r.server.OpenChannel( updateChan, errChan := r.server.OpenChannel(
in.TargetPeerId, nodePubKey, localFundingAmt, nodePubKey, localFundingAmt,
lnwire.NewMSatFromSatoshis(remoteInitialBalance), lnwire.NewMSatFromSatoshis(remoteInitialBalance),
minHtlc, feePerByte, in.Private, minHtlc, feePerByte, in.Private,
) )
@ -764,9 +761,8 @@ out:
for { for {
select { select {
case err := <-errChan: case err := <-errChan:
rpcsLog.Errorf("unable to open channel to "+ rpcsLog.Errorf("unable to open channel to identityPub(%x): %v",
"identityPub(%x) nor peerID(%v): %v", nodePubKeyBytes, err)
nodePubKeyBytes, in.TargetPeerId, err)
return err return err
case fundingUpdate := <-updateChan: case fundingUpdate := <-updateChan:
rpcsLog.Tracef("[openchannel] sending update: %v", rpcsLog.Tracef("[openchannel] sending update: %v",
@ -802,8 +798,8 @@ out:
} }
} }
rpcsLog.Tracef("[openchannel] success peerid(%v), ChannelPoint(%v)", rpcsLog.Tracef("[openchannel] success identityPub(%x), ChannelPoint(%v)",
in.TargetPeerId, outpoint) nodePubKeyBytes, outpoint)
return nil return nil
} }
@ -814,8 +810,8 @@ out:
func (r *rpcServer) OpenChannelSync(ctx context.Context, func (r *rpcServer) OpenChannelSync(ctx context.Context,
in *lnrpc.OpenChannelRequest) (*lnrpc.ChannelPoint, error) { in *lnrpc.OpenChannelRequest) (*lnrpc.ChannelPoint, error) {
rpcsLog.Tracef("[openchannel] request to peerid(%v) "+ rpcsLog.Tracef("[openchannel] request to identityPub(%v) "+
"allocation(us=%v, them=%v)", in.TargetPeerId, "allocation(us=%v, them=%v)", in.NodePubkeyString,
in.LocalFundingAmount, in.PushSat) in.LocalFundingAmount, in.PushSat)
// We don't allow new channels to be open while the server is still // We don't allow new channels to be open while the server is still
@ -874,7 +870,7 @@ func (r *rpcServer) OpenChannelSync(ctx context.Context,
int64(feePerByte)) int64(feePerByte))
updateChan, errChan := r.server.OpenChannel( updateChan, errChan := r.server.OpenChannel(
in.TargetPeerId, nodepubKey, localFundingAmt, nodepubKey, localFundingAmt,
lnwire.NewMSatFromSatoshis(remoteInitialBalance), lnwire.NewMSatFromSatoshis(remoteInitialBalance),
minHtlc, feePerByte, in.Private, minHtlc, feePerByte, in.Private,
) )
@ -882,9 +878,8 @@ func (r *rpcServer) OpenChannelSync(ctx context.Context,
select { select {
// If an error occurs them immediately return the error to the client. // If an error occurs them immediately return the error to the client.
case err := <-errChan: case err := <-errChan:
rpcsLog.Errorf("unable to open channel to "+ rpcsLog.Errorf("unable to open channel to identityPub(%x): %v",
"identityPub(%x) nor peerID(%v): %v", nodepubKey, err)
nodepubKey, in.TargetPeerId, err)
return nil, err return nil, err
// Otherwise, wait for the first channel update. The first update sent // Otherwise, wait for the first channel update. The first update sent
@ -1237,7 +1232,6 @@ func (r *rpcServer) ListPeers(ctx context.Context,
nodePub := serverPeer.addr.IdentityKey.SerializeCompressed() nodePub := serverPeer.addr.IdentityKey.SerializeCompressed()
peer := &lnrpc.Peer{ peer := &lnrpc.Peer{
PubKey: hex.EncodeToString(nodePub), PubKey: hex.EncodeToString(nodePub),
PeerId: serverPeer.id,
Address: serverPeer.conn.RemoteAddr().String(), Address: serverPeer.conn.RemoteAddr().String(),
Inbound: !serverPeer.inbound, // Flip for display Inbound: !serverPeer.inbound, // Flip for display
BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived), BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived),

View File

@ -75,7 +75,6 @@ type server struct {
lightningID [32]byte lightningID [32]byte
mu sync.RWMutex mu sync.RWMutex
peersByID map[int32]*peer
peersByPub map[string]*peer peersByPub map[string]*peer
inboundPeers map[string]*peer inboundPeers map[string]*peer
@ -173,7 +172,6 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl,
persistentConnReqs: make(map[string][]*connmgr.ConnReq), persistentConnReqs: make(map[string][]*connmgr.ConnReq),
ignorePeerTermination: make(map[*peer]struct{}), ignorePeerTermination: make(map[*peer]struct{}),
peersByID: make(map[int32]*peer),
peersByPub: make(map[string]*peer), peersByPub: make(map[string]*peer),
inboundPeers: make(map[string]*peer), inboundPeers: make(map[string]*peer),
outboundPeers: make(map[string]*peer), outboundPeers: make(map[string]*peer),
@ -1575,7 +1573,6 @@ func (s *server) addPeer(p *peer) {
pubStr := string(p.addr.IdentityKey.SerializeCompressed()) pubStr := string(p.addr.IdentityKey.SerializeCompressed())
s.peersByID[p.id] = p
s.peersByPub[pubStr] = p s.peersByPub[pubStr] = p
if p.inbound { if p.inbound {
@ -1632,7 +1629,6 @@ func (s *server) removePeer(p *peer) {
pubStr := string(p.addr.IdentityKey.SerializeCompressed()) pubStr := string(p.addr.IdentityKey.SerializeCompressed())
delete(s.peersByID, p.id)
delete(s.peersByPub, pubStr) delete(s.peersByPub, pubStr)
if p.inbound { if p.inbound {
@ -1646,7 +1642,6 @@ func (s *server) removePeer(p *peer) {
// initiation of a channel funding workflow to the peer with either the // initiation of a channel funding workflow to the peer with either the
// specified relative peer ID, or a global lightning ID. // specified relative peer ID, or a global lightning ID.
type openChanReq struct { type openChanReq struct {
targetPeerID int32
targetPubkey *btcec.PublicKey targetPubkey *btcec.PublicKey
chainHash chainhash.Hash chainHash chainhash.Hash
@ -1778,10 +1773,10 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
} }
// OpenChannel sends a request to the server to open a channel to the specified // OpenChannel sends a request to the server to open a channel to the specified
// peer identified by ID with the passed channel funding parameters. // peer identified by Public Key with the passed channel funding parameters.
// //
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey, func (s *server) OpenChannel(nodeKey *btcec.PublicKey,
localAmt btcutil.Amount, pushAmt lnwire.MilliSatoshi, localAmt btcutil.Amount, pushAmt lnwire.MilliSatoshi,
minHtlc lnwire.MilliSatoshi, minHtlc lnwire.MilliSatoshi,
fundingFeePerByte btcutil.Amount, fundingFeePerByte btcutil.Amount,
@ -1806,16 +1801,13 @@ func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey,
// First attempt to locate the target peer to open a channel with, if // First attempt to locate the target peer to open a channel with, if
// we're unable to locate the peer then this request will fail. // we're unable to locate the peer then this request will fail.
s.mu.RLock() s.mu.RLock()
if peer, ok := s.peersByID[peerID]; ok { if peer, ok := s.peersByPub[string(pubKeyBytes)]; ok {
targetPeer = peer
} else if peer, ok := s.peersByPub[string(pubKeyBytes)]; ok {
targetPeer = peer targetPeer = peer
} }
s.mu.RUnlock() s.mu.RUnlock()
if targetPeer == nil { if targetPeer == nil {
errChan <- fmt.Errorf("unable to find peer nodeID(%x), "+ errChan <- fmt.Errorf("unable to find peer nodeID(%x)", pubKeyBytes)
"peerID(%v)", pubKeyBytes, peerID)
return updateChan, errChan return updateChan, errChan
} }
@ -1839,7 +1831,6 @@ func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey,
// instead of blocking on this request which is exported as a // instead of blocking on this request which is exported as a
// synchronous request to the outside world. // synchronous request to the outside world.
req := &openChanReq{ req := &openChanReq{
targetPeerID: peerID,
targetPubkey: nodeKey, targetPubkey: nodeKey,
chainHash: *activeNetParams.GenesisHash, chainHash: *activeNetParams.GenesisHash,
localFundingAmt: localAmt, localFundingAmt: localAmt,
@ -1865,8 +1856,8 @@ func (s *server) Peers() []*peer {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
peers := make([]*peer, 0, len(s.peersByID)) peers := make([]*peer, 0, len(s.peersByPub))
for _, peer := range s.peersByID { for _, peer := range s.peersByPub {
peers = append(peers, peer) peers = append(peers, peer)
} }