server: fix some minor style deviations in new server code

This commit is contained in:
Olaoluwa Osuntokun 2017-08-10 21:20:51 -07:00
parent bee7ed7758
commit 472d9967e5
No known key found for this signature in database
GPG Key ID: 3D0A94DB79743DF5

@ -433,6 +433,11 @@ func (s *server) genNodeAnnouncement(
return *s.currentNodeAnn, err return *s.currentNodeAnn, err
} }
type nodeAddresses struct {
pubKey *btcec.PublicKey
addresses []*net.TCPAddr
}
// establishPersistentConnections attempts to establish persistent connections // establishPersistentConnections attempts to establish persistent connections
// to all our direct channel collaborators. In order to promote liveness of // to all our direct channel collaborators. In order to promote liveness of
// our active channels, we instruct the connection manager to attempt to // our active channels, we instruct the connection manager to attempt to
@ -561,22 +566,20 @@ func (s *server) establishPersistentConnections() error {
// BroadcastMessage sends a request to the server to broadcast a set of // BroadcastMessage sends a request to the server to broadcast a set of
// messages to all peers other than the one specified by the `skip` parameter. // messages to all peers other than the one specified by the `skip` parameter.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) BroadcastMessage( func (s *server) BroadcastMessage(skip *btcec.PublicKey,
skip *btcec.PublicKey,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
msgsToSend := make([]lnwire.Message, 0, len(msgs))
msgsToSend = append(msgsToSend, msgs...)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
return s.broadcastMessages(skip, msgsToSend) return s.broadcastMessages(skip, msgs)
} }
// broadcastMessages is an internal method that delivers messages to all active // broadcastMessages is an internal method that delivers messages to all active
// peers except the one specified by `skip`. // peers except the one specified by `skip`.
//
// NOTE: This method MUST be called while the server's mutex is locked. // NOTE: This method MUST be called while the server's mutex is locked.
func (s *server) broadcastMessages( func (s *server) broadcastMessages(
skip *btcec.PublicKey, skip *btcec.PublicKey,
@ -584,10 +587,10 @@ func (s *server) broadcastMessages(
srvrLog.Debugf("Broadcasting %v messages", len(msgs)) srvrLog.Debugf("Broadcasting %v messages", len(msgs))
// Iterate over all known peers, dispatching a go routine to enqueue all // Iterate over all known peers, dispatching a go routine to enqueue
// messages to each of peers. We synchronize access to peersByPub // all messages to each of peers. We synchronize access to peersByPub
// throughout this process to ensure we deliver messages to exact set of // throughout this process to ensure we deliver messages to exact set
// peers present at the time of invocation. // of peers present at the time of invocation.
var wg sync.WaitGroup var wg sync.WaitGroup
for _, sPeer := range s.peersByPub { for _, sPeer := range s.peersByPub {
if skip != nil && if skip != nil &&
@ -604,6 +607,7 @@ func (s *server) broadcastMessages(
s.wg.Add(1) s.wg.Add(1)
go s.sendPeerMessages(sPeer, msgs, &wg) go s.sendPeerMessages(sPeer, msgs, &wg)
} }
// Wait for all messages to have been dispatched before returning to // Wait for all messages to have been dispatched before returning to
// caller. // caller.
wg.Wait() wg.Wait()
@ -611,32 +615,23 @@ func (s *server) broadcastMessages(
return nil return nil
} }
type nodeAddresses struct {
pubKey *btcec.PublicKey
addresses []*net.TCPAddr
}
// SendToPeer send a message to the server telling it to send the specific set // SendToPeer send a message to the server telling it to send the specific set
// of message to a particular peer. If the peer connect be found, then this // of message to a particular peer. If the peer connect be found, then this
// method will return a non-nil error. // method will return a non-nil error.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) SendToPeer( func (s *server) SendToPeer(target *btcec.PublicKey,
target *btcec.PublicKey,
msgs ...lnwire.Message) error { msgs ...lnwire.Message) error {
msgsToSend := make([]lnwire.Message, 0, len(msgs))
msgsToSend = append(msgsToSend, msgs...)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
return s.sendToPeer(target, msgsToSend) return s.sendToPeer(target, msgs)
} }
// sendToPeer is an internal method that delivers messages to the specified // sendToPeer is an internal method that delivers messages to the specified
// `target` peer. // `target` peer.
func (s *server) sendToPeer( func (s *server) sendToPeer(target *btcec.PublicKey,
target *btcec.PublicKey,
msgs []lnwire.Message) error { msgs []lnwire.Message) error {
// Compute the target peer's identifier. // Compute the target peer's identifier.
@ -647,8 +642,8 @@ func (s *server) sendToPeer(
// Lookup intended target in peersByPub, returning an error to the // Lookup intended target in peersByPub, returning an error to the
// caller if the peer is unknown. Access to peersByPub is synchronized // caller if the peer is unknown. Access to peersByPub is synchronized
// here to ensure we consider the exact set of peers present at the time // here to ensure we consider the exact set of peers present at the
// of invocation. // time of invocation.
targetPeer, ok := s.peersByPub[string(targetPubBytes)] targetPeer, ok := s.peersByPub[string(targetPubBytes)]
if !ok { if !ok {
srvrLog.Errorf("unable to send message to %x, "+ srvrLog.Errorf("unable to send message to %x, "+
@ -669,20 +664,20 @@ func (s *server) sendToPeer(
// //
// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a // NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a
// go routine--both `wg` and the server's WaitGroup should be incremented // go routine--both `wg` and the server's WaitGroup should be incremented
// beforehand. If this method is not spawned as a go routine, the provided `wg` // beforehand. If this method is not spawned as a go routine, the provided
// should be nil, and the server's WaitGroup should not be tracking this // `wg` should be nil, and the server's WaitGroup should not be tracking this
// invocation. // invocation.
func (s *server) sendPeerMessages( func (s *server) sendPeerMessages(
targetPeer *peer, targetPeer *peer,
msgs []lnwire.Message, msgs []lnwire.Message,
wg *sync.WaitGroup) { wg *sync.WaitGroup) {
// If a WaitGroup is provided, we assume that this method was spawned as // If a WaitGroup is provided, we assume that this method was spawned
// a go routine, and that it is being tracked by both the server's // as a go routine, and that it is being tracked by both the server's
// WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this // WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this
// event, we defer a call to Done on both WaitGroups to 1) ensure that // event, we defer a call to Done on both WaitGroups to 1) ensure that
// server will be able to shutdown after its go routines exit, and 2) so // server will be able to shutdown after its go routines exit, and 2)
// the server can return to the caller of BroadcastMessage. // so the server can return to the caller of BroadcastMessage.
if wg != nil { if wg != nil {
defer s.wg.Done() defer s.wg.Done()
defer wg.Done() defer wg.Done()
@ -696,6 +691,7 @@ func (s *server) sendPeerMessages(
// FindPeer will return the peer that corresponds to the passed in public key. // FindPeer will return the peer that corresponds to the passed in public key.
// This function is used by the funding manager, allowing it to update the // This function is used by the funding manager, allowing it to update the
// daemon's local representation of the remote peer. // daemon's local representation of the remote peer.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) { func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) {
s.mu.Lock() s.mu.Lock()
@ -709,6 +705,7 @@ func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) {
// FindPeerByPubStr will return the peer that corresponds to the passed peerID, // FindPeerByPubStr will return the peer that corresponds to the passed peerID,
// which should be a string representation of the peer's serialized, compressed // which should be a string representation of the peer's serialized, compressed
// public key. // public key.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) FindPeerByPubStr(peerID string) (*peer, error) { func (s *server) FindPeerByPubStr(peerID string) (*peer, error) {
s.mu.Lock() s.mu.Lock()
@ -732,6 +729,7 @@ func (s *server) findPeer(peerID string) (*peer, error) {
// cleans up all resources allocated to the peer, notifies relevant sub-systems // cleans up all resources allocated to the peer, notifies relevant sub-systems
// of its demise, and finally handles re-connecting to the peer if it's // of its demise, and finally handles re-connecting to the peer if it's
// persistent. // persistent.
//
// NOTE: This MUST be launched as a goroutine AND the _peer's_ WaitGroup should // NOTE: This MUST be launched as a goroutine AND the _peer's_ WaitGroup should
// be incremented before spawning this method, as it will signal to the peer's // be incremented before spawning this method, as it will signal to the peer's
// WaitGroup upon completion. // WaitGroup upon completion.
@ -810,9 +808,7 @@ func (s *server) peerTerminationWatcher(p *peer) {
// peerConnected is a function that handles initialization a newly connected // peerConnected is a function that handles initialization a newly connected
// peer by adding it to the server's global list of all active peers, and // peer by adding it to the server's global list of all active peers, and
// starting all the goroutines the peer needs to function properly. // starting all the goroutines the peer needs to function properly.
func (s *server) peerConnected( func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq,
conn net.Conn,
connReq *connmgr.ConnReq,
inbound bool) { inbound bool) {
brontideConn := conn.(*brontide.Conn) brontideConn := conn.(*brontide.Conn)
@ -1095,19 +1091,19 @@ type openChanReq struct {
// ConnectToPeer requests that the server connect to a Lightning Network peer // ConnectToPeer requests that the server connect to a Lightning Network peer
// at the specified address. This function will *block* until either a // at the specified address. This function will *block* until either a
// connection is established, or the initial handshake process fails. // connection is established, or the initial handshake process fails.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
targetPub := string(addr.IdentityKey.SerializeCompressed()) targetPub := string(addr.IdentityKey.SerializeCompressed())
// Acquire mutex, but use explicit unlocking instead of defer for better // Acquire mutex, but use explicit unlocking instead of defer for
// granularity. In certain conditions, this method requires making an // better granularity. In certain conditions, this method requires
// outbound connection to a remote peer, which requires the lock to be // making an outbound connection to a remote peer, which requires the
// released, and subsequently reacquired. // lock to be released, and subsequently reacquired.
s.mu.Lock() s.mu.Lock()
// Ensure we're not already connected to this // Ensure we're not already connected to this peer.
// peer.
peer, ok := s.peersByPub[targetPub] peer, ok := s.peersByPub[targetPub]
if ok { if ok {
s.mu.Unlock() s.mu.Unlock()
@ -1164,6 +1160,7 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error {
// DisconnectPeer sends the request to server to close the connection with peer // DisconnectPeer sends the request to server to close the connection with peer
// identified by public key. // identified by public key.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
pubBytes := pubKey.SerializeCompressed() pubBytes := pubKey.SerializeCompressed()
@ -1201,10 +1198,9 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error {
// OpenChannel sends a request to the server to open a channel to the specified // OpenChannel sends a request to the server to open a channel to the specified
// peer identified by ID with the passed channel funding parameters. // peer identified by ID with the passed channel funding parameters.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) OpenChannel( func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey,
peerID int32,
nodeKey *btcec.PublicKey,
localAmt btcutil.Amount, localAmt btcutil.Amount,
pushAmt btcutil.Amount) (chan *lnrpc.OpenStatusUpdate, chan error) { pushAmt btcutil.Amount) (chan *lnrpc.OpenStatusUpdate, chan error) {
@ -1243,8 +1239,6 @@ func (s *server) OpenChannel(
// funding manager. This allows the server to continue handling queries // funding manager. This allows the server to continue handling queries
// instead of blocking on this request which is exported as a // instead of blocking on this request which is exported as a
// synchronous request to the outside world. // synchronous request to the outside world.
// TODO(roasbeef): pass in chan that's closed if/when funding succeeds
// so can track as persistent peer?
req := &openChanReq{ req := &openChanReq{
targetPeerID: peerID, targetPeerID: peerID,
targetPubkey: nodeKey, targetPubkey: nodeKey,
@ -1255,12 +1249,15 @@ func (s *server) OpenChannel(
err: errChan, err: errChan,
} }
// TODO(roasbeef): pass in chan that's closed if/when funding succeeds
// so can track as persistent peer?
go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req)
return updateChan, errChan return updateChan, errChan
} }
// Peers returns a slice of all active peers. // Peers returns a slice of all active peers.
//
// NOTE: This function is safe for concurrent access. // NOTE: This function is safe for concurrent access.
func (s *server) Peers() []*peer { func (s *server) Peers() []*peer {
s.mu.Lock() s.mu.Lock()