diff --git a/server.go b/server.go index 74b688cb..2b4ba6b3 100644 --- a/server.go +++ b/server.go @@ -433,6 +433,11 @@ func (s *server) genNodeAnnouncement( return *s.currentNodeAnn, err } +type nodeAddresses struct { + pubKey *btcec.PublicKey + addresses []*net.TCPAddr +} + // establishPersistentConnections attempts to establish persistent connections // to all our direct channel collaborators. In order to promote liveness of // our active channels, we instruct the connection manager to attempt to @@ -561,22 +566,20 @@ func (s *server) establishPersistentConnections() error { // BroadcastMessage sends a request to the server to broadcast a set of // messages to all peers other than the one specified by the `skip` parameter. +// // NOTE: This function is safe for concurrent access. -func (s *server) BroadcastMessage( - skip *btcec.PublicKey, +func (s *server) BroadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error { - msgsToSend := make([]lnwire.Message, 0, len(msgs)) - msgsToSend = append(msgsToSend, msgs...) - s.mu.Lock() defer s.mu.Unlock() - return s.broadcastMessages(skip, msgsToSend) + return s.broadcastMessages(skip, msgs) } // broadcastMessages is an internal method that delivers messages to all active // peers except the one specified by `skip`. +// // NOTE: This method MUST be called while the server's mutex is locked. func (s *server) broadcastMessages( skip *btcec.PublicKey, @@ -584,10 +587,10 @@ func (s *server) broadcastMessages( srvrLog.Debugf("Broadcasting %v messages", len(msgs)) - // Iterate over all known peers, dispatching a go routine to enqueue all - // messages to each of peers. We synchronize access to peersByPub - // throughout this process to ensure we deliver messages to exact set of - // peers present at the time of invocation. + // Iterate over all known peers, dispatching a go routine to enqueue + // all messages to each of peers. We synchronize access to peersByPub + // throughout this process to ensure we deliver messages to exact set + // of peers present at the time of invocation. var wg sync.WaitGroup for _, sPeer := range s.peersByPub { if skip != nil && @@ -604,6 +607,7 @@ func (s *server) broadcastMessages( s.wg.Add(1) go s.sendPeerMessages(sPeer, msgs, &wg) } + // Wait for all messages to have been dispatched before returning to // caller. wg.Wait() @@ -611,32 +615,23 @@ func (s *server) broadcastMessages( return nil } -type nodeAddresses struct { - pubKey *btcec.PublicKey - addresses []*net.TCPAddr -} - // SendToPeer send a message to the server telling it to send the specific set // of message to a particular peer. If the peer connect be found, then this // method will return a non-nil error. +// // NOTE: This function is safe for concurrent access. -func (s *server) SendToPeer( - target *btcec.PublicKey, +func (s *server) SendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { - msgsToSend := make([]lnwire.Message, 0, len(msgs)) - msgsToSend = append(msgsToSend, msgs...) - s.mu.Lock() defer s.mu.Unlock() - return s.sendToPeer(target, msgsToSend) + return s.sendToPeer(target, msgs) } // sendToPeer is an internal method that delivers messages to the specified // `target` peer. -func (s *server) sendToPeer( - target *btcec.PublicKey, +func (s *server) sendToPeer(target *btcec.PublicKey, msgs []lnwire.Message) error { // Compute the target peer's identifier. @@ -647,8 +642,8 @@ func (s *server) sendToPeer( // Lookup intended target in peersByPub, returning an error to the // caller if the peer is unknown. Access to peersByPub is synchronized - // here to ensure we consider the exact set of peers present at the time - // of invocation. + // here to ensure we consider the exact set of peers present at the + // time of invocation. targetPeer, ok := s.peersByPub[string(targetPubBytes)] if !ok { srvrLog.Errorf("unable to send message to %x, "+ @@ -669,20 +664,20 @@ func (s *server) sendToPeer( // // NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a // go routine--both `wg` and the server's WaitGroup should be incremented -// beforehand. If this method is not spawned as a go routine, the provided `wg` -// should be nil, and the server's WaitGroup should not be tracking this +// beforehand. If this method is not spawned as a go routine, the provided +// `wg` should be nil, and the server's WaitGroup should not be tracking this // invocation. func (s *server) sendPeerMessages( targetPeer *peer, msgs []lnwire.Message, wg *sync.WaitGroup) { - // If a WaitGroup is provided, we assume that this method was spawned as - // a go routine, and that it is being tracked by both the server's + // If a WaitGroup is provided, we assume that this method was spawned + // as a go routine, and that it is being tracked by both the server's // WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this // event, we defer a call to Done on both WaitGroups to 1) ensure that - // server will be able to shutdown after its go routines exit, and 2) so - // the server can return to the caller of BroadcastMessage. + // server will be able to shutdown after its go routines exit, and 2) + // so the server can return to the caller of BroadcastMessage. if wg != nil { defer s.wg.Done() defer wg.Done() @@ -696,6 +691,7 @@ func (s *server) sendPeerMessages( // FindPeer will return the peer that corresponds to the passed in public key. // This function is used by the funding manager, allowing it to update the // daemon's local representation of the remote peer. +// // NOTE: This function is safe for concurrent access. func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) { s.mu.Lock() @@ -709,6 +705,7 @@ func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) { // FindPeerByPubStr will return the peer that corresponds to the passed peerID, // which should be a string representation of the peer's serialized, compressed // public key. +// // NOTE: This function is safe for concurrent access. func (s *server) FindPeerByPubStr(peerID string) (*peer, error) { s.mu.Lock() @@ -732,6 +729,7 @@ func (s *server) findPeer(peerID string) (*peer, error) { // cleans up all resources allocated to the peer, notifies relevant sub-systems // of its demise, and finally handles re-connecting to the peer if it's // persistent. +// // NOTE: This MUST be launched as a goroutine AND the _peer's_ WaitGroup should // be incremented before spawning this method, as it will signal to the peer's // WaitGroup upon completion. @@ -810,9 +808,7 @@ func (s *server) peerTerminationWatcher(p *peer) { // peerConnected is a function that handles initialization a newly connected // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. -func (s *server) peerConnected( - conn net.Conn, - connReq *connmgr.ConnReq, +func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound bool) { brontideConn := conn.(*brontide.Conn) @@ -1095,19 +1091,19 @@ type openChanReq struct { // ConnectToPeer requests that the server connect to a Lightning Network peer // at the specified address. This function will *block* until either a // connection is established, or the initial handshake process fails. +// // NOTE: This function is safe for concurrent access. func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { targetPub := string(addr.IdentityKey.SerializeCompressed()) - // Acquire mutex, but use explicit unlocking instead of defer for better - // granularity. In certain conditions, this method requires making an - // outbound connection to a remote peer, which requires the lock to be - // released, and subsequently reacquired. + // Acquire mutex, but use explicit unlocking instead of defer for + // better granularity. In certain conditions, this method requires + // making an outbound connection to a remote peer, which requires the + // lock to be released, and subsequently reacquired. s.mu.Lock() - // Ensure we're not already connected to this - // peer. + // Ensure we're not already connected to this peer. peer, ok := s.peersByPub[targetPub] if ok { s.mu.Unlock() @@ -1164,6 +1160,7 @@ func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { // DisconnectPeer sends the request to server to close the connection with peer // identified by public key. +// // NOTE: This function is safe for concurrent access. func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { pubBytes := pubKey.SerializeCompressed() @@ -1201,10 +1198,9 @@ func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { // OpenChannel sends a request to the server to open a channel to the specified // peer identified by ID with the passed channel funding parameters. +// // NOTE: This function is safe for concurrent access. -func (s *server) OpenChannel( - peerID int32, - nodeKey *btcec.PublicKey, +func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey, localAmt btcutil.Amount, pushAmt btcutil.Amount) (chan *lnrpc.OpenStatusUpdate, chan error) { @@ -1243,8 +1239,6 @@ func (s *server) OpenChannel( // funding manager. This allows the server to continue handling queries // instead of blocking on this request which is exported as a // synchronous request to the outside world. - // TODO(roasbeef): pass in chan that's closed if/when funding succeeds - // so can track as persistent peer? req := &openChanReq{ targetPeerID: peerID, targetPubkey: nodeKey, @@ -1255,12 +1249,15 @@ func (s *server) OpenChannel( err: errChan, } + // TODO(roasbeef): pass in chan that's closed if/when funding succeeds + // so can track as persistent peer? go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) return updateChan, errChan } // Peers returns a slice of all active peers. +// // NOTE: This function is safe for concurrent access. func (s *server) Peers() []*peer { s.mu.Lock()