From 9a3c0b8bca53076f3aa586cf4ac1f1dfd99c18b6 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 21 Feb 2019 20:10:51 -0800 Subject: [PATCH] peer+server: switch to pool.Write from pool.WriteBuffer --- peer.go | 67 +++++++++++++++++++++++++------------------------------ server.go | 20 ++++++++++++----- 2 files changed, 45 insertions(+), 42 deletions(-) diff --git a/peer.go b/peer.go index 41a55892..34b44020 100644 --- a/peer.go +++ b/peer.go @@ -18,7 +18,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/brontide" - "github.com/lightningnetwork/lnd/buffer" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" @@ -26,6 +25,7 @@ import ( "github.com/lightningnetwork/lnd/lnpeer" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/pool" "github.com/lightningnetwork/lnd/ticker" ) @@ -209,11 +209,11 @@ type peer struct { // TODO(halseth): remove when link failure is properly handled. failedChannels map[lnwire.ChannelID]struct{} - // writeBuf is a buffer that we'll re-use in order to encode wire - // messages to write out directly on the socket. By re-using this - // buffer, we avoid needing to allocate more memory each time a new - // message is to be sent to a peer. - writeBuf *buffer.Write + // writePool is the task pool to that manages reuse of write buffers. + // Write tasks are submitted to the pool in order to conserve the total + // number of write buffers allocated at any one time, and decouple write + // buffer allocation from the peer life cycle. + writePool *pool.Write queueQuit chan struct{} quit chan struct{} @@ -258,7 +258,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, chanActiveTimeout: chanActiveTimeout, - writeBuf: server.writeBufferPool.Take(), + writePool: server.writePool, queueQuit: make(chan struct{}), quit: make(chan struct{}), @@ -608,11 +608,6 @@ func (p *peer) WaitForDisconnect(ready chan struct{}) { } p.wg.Wait() - - // Now that we are certain all active goroutines which could have been - // modifying the write buffer have exited, return the buffer to the pool - // to be reused. - p.server.writeBufferPool.Return(p.writeBuf) } // Disconnect terminates the connection with the remote peer. Additionally, a @@ -1359,33 +1354,33 @@ func (p *peer) writeMessage(msg lnwire.Message) error { p.logWireMessage(msg, false) - // We'll re-slice of static write buffer to allow this new message to - // utilize all available space. We also ensure we cap the capacity of - // this new buffer to the static buffer which is sized for the largest - // possible protocol message. - b := bytes.NewBuffer(p.writeBuf[0:0:len(p.writeBuf)]) + var n int + err := p.writePool.Submit(func(buf *bytes.Buffer) error { + // Using a buffer allocated by the write pool, encode the + // message directly into the buffer. + _, writeErr := lnwire.WriteMessage(buf, msg, 0) + if writeErr != nil { + return writeErr + } - // With the temp buffer created and sliced properly (length zero, full - // capacity), we'll now encode the message directly into this buffer. - _, err := lnwire.WriteMessage(b, msg, 0) - if err != nil { - return err + // Ensure the write deadline is set before we attempt to send + // the message. + writeDeadline := time.Now().Add(writeMessageTimeout) + writeErr = p.conn.SetWriteDeadline(writeDeadline) + if writeErr != nil { + return writeErr + } + + // Finally, write the message itself in a single swoop. + n, writeErr = p.conn.Write(buf.Bytes()) + return writeErr + }) + + // Record the number of bytes written on the wire, if any. + if n > 0 { + atomic.AddUint64(&p.bytesSent, uint64(n)) } - // Compute and set the write deadline we will impose on the remote peer. - writeDeadline := time.Now().Add(writeMessageTimeout) - err = p.conn.SetWriteDeadline(writeDeadline) - if err != nil { - return err - } - - // Finally, write the message itself in a single swoop. - n, err := p.conn.Write(b.Bytes()) - - // Regardless of the error returned, record how many bytes were written - // to the wire. - atomic.AddUint64(&p.bytesSent, uint64(n)) - return err } diff --git a/server.go b/server.go index 6b4cb7a4..1a501bae 100644 --- a/server.go +++ b/server.go @@ -171,7 +171,7 @@ type server struct { sigPool *lnwallet.SigPool - writeBufferPool *pool.WriteBuffer + writePool *pool.Write // globalFeatures feature vector which affects HTLCs and thus are also // advertised to other nodes. @@ -267,12 +267,15 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, pool.DefaultWriteBufferGCInterval, pool.DefaultWriteBufferExpiryInterval, ) + writePool := pool.NewWrite( + writeBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout, + ) s := &server{ - chanDB: chanDB, - cc: cc, - sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer), - writeBufferPool: writeBufferPool, + chanDB: chanDB, + cc: cc, + sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer), + writePool: writePool, invoices: invoices.NewRegistry(chanDB, activeNetParams.Params), @@ -1010,6 +1013,9 @@ func (s *server) Start() error { if err := s.sigPool.Start(); err != nil { return err } + if err := s.writePool.Start(); err != nil { + return err + } if err := s.cc.chainNotifier.Start(); err != nil { return err } @@ -1102,7 +1108,6 @@ func (s *server) Stop() error { // Shutdown the wallet, funding manager, and the rpc server. s.chanStatusMgr.Stop() - s.sigPool.Stop() s.cc.chainNotifier.Stop() s.chanRouter.Stop() s.htlcSwitch.Stop() @@ -1129,6 +1134,9 @@ func (s *server) Stop() error { // Wait for all lingering goroutines to quit. s.wg.Wait() + s.sigPool.Stop() + s.writePool.Stop() + return nil }