From 603601a4c8ea8151524d0f3d6f3ca0abfa0aef92 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Thu, 21 Feb 2019 20:11:33 -0800 Subject: [PATCH] peer+server: use peer-level readPool --- peer.go | 36 +++++++++++++++++++++++++++++++++++- server.go | 18 ++++++++++++++++++ 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/peer.go b/peer.go index 34b44020..91510852 100644 --- a/peer.go +++ b/peer.go @@ -18,6 +18,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/brontide" + "github.com/lightningnetwork/lnd/buffer" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/contractcourt" @@ -46,6 +47,10 @@ const ( // writeMessageTimeout is the timeout used when writing a message to peer. writeMessageTimeout = 50 * time.Second + // readMessageTimeout is the timeout used when reading a message from a + // peer. + readMessageTimeout = 5 * time.Second + // handshakeTimeout is the timeout used when waiting for peer init message. handshakeTimeout = 15 * time.Second @@ -215,6 +220,8 @@ type peer struct { // buffer allocation from the peer life cycle. writePool *pool.Write + readPool *pool.Read + queueQuit chan struct{} quit chan struct{} wg sync.WaitGroup @@ -259,6 +266,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server, chanActiveTimeout: chanActiveTimeout, writePool: server.writePool, + readPool: server.readPool, queueQuit: make(chan struct{}), quit: make(chan struct{}), @@ -639,11 +647,37 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { return nil, fmt.Errorf("brontide.Conn required to read messages") } + err := noiseConn.SetReadDeadline(time.Time{}) + if err != nil { + return nil, err + } + + pktLen, err := noiseConn.ReadNextHeader() + if err != nil { + return nil, err + } + // First we'll read the next _full_ message. We do this rather than // reading incrementally from the stream as the Lightning wire protocol // is message oriented and allows nodes to pad on additional data to // the message stream. - rawMsg, err := noiseConn.ReadNextMessage() + var rawMsg []byte + err = p.readPool.Submit(func(buf *buffer.Read) error { + // Before reading the body of the message, set the read timeout + // accordingly to ensure we don't block other readers using the + // pool. We do so only after the task has been scheduled to + // ensure the deadline doesn't expire while the message is in + // the process of being scheduled. + readDeadline := time.Now().Add(readMessageTimeout) + readErr := noiseConn.SetReadDeadline(readDeadline) + if readErr != nil { + return readErr + } + + rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen]) + return readErr + }) + atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg))) if err != nil { return nil, err diff --git a/server.go b/server.go index 1a501bae..3385c8ce 100644 --- a/server.go +++ b/server.go @@ -173,6 +173,8 @@ type server struct { writePool *pool.Write + readPool *pool.Read + // globalFeatures feature vector which affects HTLCs and thus are also // advertised to other nodes. globalFeatures *lnwire.FeatureVector @@ -263,19 +265,31 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl, sharedSecretPath := filepath.Join(graphDir, "sphinxreplay.db") replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier) sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog) + writeBufferPool := pool.NewWriteBuffer( pool.DefaultWriteBufferGCInterval, pool.DefaultWriteBufferExpiryInterval, ) + writePool := pool.NewWrite( writeBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout, ) + readBufferPool := pool.NewReadBuffer( + pool.DefaultReadBufferGCInterval, + pool.DefaultReadBufferExpiryInterval, + ) + + readPool := pool.NewRead( + readBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout, + ) + s := &server{ chanDB: chanDB, cc: cc, sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer), writePool: writePool, + readPool: readPool, invoices: invoices.NewRegistry(chanDB, activeNetParams.Params), @@ -1016,6 +1030,9 @@ func (s *server) Start() error { if err := s.writePool.Start(); err != nil { return err } + if err := s.readPool.Start(); err != nil { + return err + } if err := s.cc.chainNotifier.Start(); err != nil { return err } @@ -1136,6 +1153,7 @@ func (s *server) Stop() error { s.sigPool.Stop() s.writePool.Stop() + s.readPool.Stop() return nil }