peer: changes to satisfy lnpeer.Peer

This commit adds:
 - variadic SendMessage method (w/ sync bool reorder)
 - passes peer object directly to ProcessRemoteAnnouncment
 - adds IdentityKey() method for lnpeer.Peer
This commit is contained in:
Conner Fromknecht 2018-06-07 20:07:30 -07:00
parent 769f0f0a94
commit edf08458c1
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF

57
peer.go

@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/connmgr"
"github.com/roasbeef/btcd/txscript"
@ -92,7 +93,6 @@ type peer struct {
bytesReceived uint64
bytesSent uint64
// pingTime is a rough estimate of the RTT (round-trip-time) between us
// and the connected peer. This time is expressed in micro seconds.
// To be used atomically.
@ -502,10 +502,10 @@ func (p *peer) addLink(chanPoint *wire.OutPoint,
if linkErr.SendData != nil {
data = linkErr.SendData
}
err := p.SendMessage(&lnwire.Error{
err := p.SendMessage(true, &lnwire.Error{
ChanID: chanID,
Data: data,
}, true)
})
if err != nil {
peerLog.Errorf("unable to send msg to "+
"remote peer: %v", err)
@ -833,8 +833,7 @@ func newDiscMsgStream(p *peer) *msgStream {
"Update stream for gossiper exited",
1000,
func(msg lnwire.Message) {
p.server.authGossiper.ProcessRemoteAnnouncement(msg,
p.addr.IdentityKey)
p.server.authGossiper.ProcessRemoteAnnouncement(msg, p)
},
)
}
@ -1958,23 +1957,38 @@ func (p *peer) sendInitMsg() error {
return p.writeMessage(msg)
}
// SendMessage sends message to remote peer. The second argument denotes if the
// method should block until the message has been sent to the remote peer.
func (p *peer) SendMessage(msg lnwire.Message, sync bool) error {
if !sync {
p.queueMsg(msg, nil)
return nil
// SendMessage sends a variadic number of message to remote peer. The first
// argument denotes if the method should block until the message has been sent
// to the remote peer.
func (p *peer) SendMessage(sync bool, msgs ...lnwire.Message) error {
// Add all incoming messages to the outgoing queue. A list of error
// chans is populated for each message if the caller requested a sync
// send.
var errChans []chan error
for _, msg := range msgs {
// If a sync send was requested, create an error chan to listen
// for an ack from the writeHandler.
var errChan chan error
if sync {
errChan = make(chan error, 1)
errChans = append(errChans, errChan)
}
p.queueMsg(msg, errChan)
}
errChan := make(chan error, 1)
p.queueMsg(msg, errChan)
select {
case err := <-errChan:
return err
case <-p.quit:
return fmt.Errorf("peer shutting down")
// Wait for all replies from the writeHandler. For async sends, this
// will be a NOP as the list of error chans is nil.
for _, errChan := range errChans {
select {
case err := <-errChan:
return err
case <-p.quit:
return ErrPeerExiting
}
}
return nil
}
// PubKey returns the pubkey of the peer in compressed serialized format.
@ -1982,6 +1996,11 @@ func (p *peer) PubKey() [33]byte {
return p.pubKeyBytes
}
// IdentityKey returns the public key of the remote peer.
func (p *peer) IdentityKey() *btcec.PublicKey {
return p.addr.IdentityKey
}
// TODO(roasbeef): make all start/stop mutexes a CAS
// fetchLastChanUpdate returns a function which is able to retrieve the last