peer: ensure latest version of htlcswitch.Peer interface is implemented

This commit is contained in:
Olaoluwa Osuntokun 2017-06-17 00:11:02 +02:00
parent 11f7a227ab
commit e15604f7b5
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 24 additions and 24 deletions

39
peer.go

@ -2,7 +2,6 @@ package main
import ( import (
"container/list" "container/list"
"crypto/sha256"
"fmt" "fmt"
"net" "net"
"sync" "sync"
@ -12,8 +11,6 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/brontide" "github.com/lightningnetwork/lnd/brontide"
"github.com/btcsuite/fastsha256"
"bytes" "bytes"
"github.com/go-errors/errors" "github.com/go-errors/errors"
@ -94,7 +91,7 @@ type peer struct {
conn net.Conn conn net.Conn
addr *lnwire.NetAddress addr *lnwire.NetAddress
lightningID chainhash.Hash pubKeyBytes [33]byte
inbound bool inbound bool
id int32 id int32
@ -166,7 +163,6 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
p := &peer{ p := &peer{
conn: conn, conn: conn,
lightningID: chainhash.Hash(sha256.Sum256(nodePub.SerializeCompressed())),
addr: addr, addr: addr,
id: atomic.AddInt32(&numNodes, 1), id: atomic.AddInt32(&numNodes, 1),
@ -193,6 +189,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
queueQuit: make(chan struct{}), queueQuit: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
copy(p.pubKeyBytes[:], nodePub.SerializeCompressed())
return p, nil return p, nil
} }
@ -311,14 +308,17 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// new payments triggered by RPC clients. // new payments triggered by RPC clients.
sphinxDecoder := htlcswitch.NewSphinxDecoder(p.server.sphinx) sphinxDecoder := htlcswitch.NewSphinxDecoder(p.server.sphinx)
link := htlcswitch.NewChannelLink( link := htlcswitch.NewChannelLink(
&htlcswitch.ChannelLinkConfig{ htlcswitch.ChannelLinkConfig{
Peer: p, Peer: p,
DecodeOnion: sphinxDecoder.Decode, DecodeOnion: sphinxDecoder.Decode,
SettledContracts: p.server.breachArbiter.settledContracts, SettledContracts: p.server.breachArbiter.settledContracts,
DebugHTLC: cfg.DebugHTLC, DebugHTLC: cfg.DebugHTLC,
Registry: p.server.invoices, Registry: p.server.invoices,
Switch: p.server.htlcSwitch, Switch: p.server.htlcSwitch,
}, lnChan) FwrdingPolicy: p.server.cc.routingPolicy,
},
lnChan,
)
if err := p.server.htlcSwitch.AddLink(link); err != nil { if err := p.server.htlcSwitch.AddLink(link); err != nil {
return err return err
@ -333,7 +333,6 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// irrecoverable protocol error has been encountered. // irrecoverable protocol error has been encountered.
func (p *peer) WaitForDisconnect() { func (p *peer) WaitForDisconnect() {
<-p.quit <-p.quit
} }
// Disconnect terminates the connection with the remote peer. Additionally, a // Disconnect terminates the connection with the remote peer. Additionally, a
@ -350,7 +349,6 @@ func (p *peer) Disconnect() {
p.conn.Close() p.conn.Close()
close(p.quit) close(p.quit)
} }
// String returns the string representation of this peer. // String returns the string representation of this peer.
@ -788,14 +786,17 @@ out:
decoder := htlcswitch.NewSphinxDecoder(p.server.sphinx) decoder := htlcswitch.NewSphinxDecoder(p.server.sphinx)
link := htlcswitch.NewChannelLink( link := htlcswitch.NewChannelLink(
&htlcswitch.ChannelLinkConfig{ htlcswitch.ChannelLinkConfig{
Peer: p, Peer: p,
DecodeOnion: decoder.Decode, DecodeOnion: decoder.Decode,
SettledContracts: p.server.breachArbiter.settledContracts, SettledContracts: p.server.breachArbiter.settledContracts,
DebugHTLC: cfg.DebugHTLC, DebugHTLC: cfg.DebugHTLC,
Registry: p.server.invoices, Registry: p.server.invoices,
Switch: p.server.htlcSwitch, Switch: p.server.htlcSwitch,
}, newChanReq.channel) FwrdingPolicy: p.server.cc.routingPolicy,
},
newChanReq.channel,
)
err := p.server.htlcSwitch.AddLink(link) err := p.server.htlcSwitch.AddLink(link)
if err != nil { if err != nil {
@ -1070,6 +1071,8 @@ func (p *peer) handleInitClosingSigned(req *htlcswitch.ChanClose, msg *lnwire.Cl
return return
} }
// TODO(roasbeef): also add closure height to summary
// Clear out the current channel state, marking the channel as being // Clear out the current channel state, marking the channel as being
// closed within the database. // closed within the database.
closingTxid := closeTx.TxHash() closingTxid := closeTx.TxHash()
@ -1348,21 +1351,15 @@ func (p *peer) sendInitMsg() error {
return p.writeMessage(msg) return p.writeMessage(msg)
} }
// SendMessage sends message to the remote peer which represented by // SendMessage queues a message for sending to the target peer.
// this peer.
func (p *peer) SendMessage(msg lnwire.Message) error { func (p *peer) SendMessage(msg lnwire.Message) error {
p.queueMsg(msg, nil) p.queueMsg(msg, nil)
return nil return nil
} }
// ID returns the lightning network peer id. // PubKey returns the pubkey of the peer in compressed serialized format.
func (p *peer) ID() [sha256.Size]byte { func (p *peer) PubKey() [33]byte {
return fastsha256.Sum256(p.PubKey()) return p.pubKeyBytes
}
// PubKey returns the peer public key.
func (p *peer) PubKey() []byte {
return p.addr.IdentityKey.SerializeCompressed()
} }
// TODO(roasbeef): make all start/stop mutexes a CAS // TODO(roasbeef): make all start/stop mutexes a CAS

@ -761,7 +761,8 @@ func (r *rpcServer) GetInfo(ctx context.Context,
pendingChannels, err := r.server.fundingMgr.NumPendingChannels() pendingChannels, err := r.server.fundingMgr.NumPendingChannels()
if err != nil { if err != nil {
return nil, fmt.Errorf("unable to get number of pending channels: %v", err) return nil, fmt.Errorf("unable to get number of pending "+
"channels: %v", err)
} }
idPub := r.server.identityPriv.PubKey().SerializeCompressed() idPub := r.server.identityPriv.PubKey().SerializeCompressed()
@ -1135,6 +1136,8 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
errChan := make(chan error, 1) errChan := make(chan error, 1)
payChan := make(chan *lnrpc.SendRequest) payChan := make(chan *lnrpc.SendRequest)
// TODO(roasbeef): check payment filter to see if already used?
// In order to limit the level of concurrency and prevent a client from // In order to limit the level of concurrency and prevent a client from
// attempting to OOM the server, we'll set up a semaphore to create an // attempting to OOM the server, we'll set up a semaphore to create an
// upper ceiling on the number of outstanding payments. // upper ceiling on the number of outstanding payments.