peer+server: use peer-level readPool

This commit is contained in:
Conner Fromknecht 2019-02-21 20:11:33 -08:00
parent 8ac8d95b54
commit 603601a4c8
No known key found for this signature in database
GPG Key ID: E7D737B67FA592C7
2 changed files with 53 additions and 1 deletions

36
peer.go
View File

@ -18,6 +18,7 @@ import (
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/brontide"
"github.com/lightningnetwork/lnd/buffer"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/contractcourt"
@ -46,6 +47,10 @@ const (
// writeMessageTimeout is the timeout used when writing a message to peer.
writeMessageTimeout = 50 * time.Second
// readMessageTimeout is the timeout used when reading a message from a
// peer.
readMessageTimeout = 5 * time.Second
// handshakeTimeout is the timeout used when waiting for peer init message.
handshakeTimeout = 15 * time.Second
@ -215,6 +220,8 @@ type peer struct {
// buffer allocation from the peer life cycle.
writePool *pool.Write
readPool *pool.Read
queueQuit chan struct{}
quit chan struct{}
wg sync.WaitGroup
@ -259,6 +266,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
chanActiveTimeout: chanActiveTimeout,
writePool: server.writePool,
readPool: server.readPool,
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
@ -639,11 +647,37 @@ func (p *peer) readNextMessage() (lnwire.Message, error) {
return nil, fmt.Errorf("brontide.Conn required to read messages")
}
err := noiseConn.SetReadDeadline(time.Time{})
if err != nil {
return nil, err
}
pktLen, err := noiseConn.ReadNextHeader()
if err != nil {
return nil, err
}
// First we'll read the next _full_ message. We do this rather than
// reading incrementally from the stream as the Lightning wire protocol
// is message oriented and allows nodes to pad on additional data to
// the message stream.
rawMsg, err := noiseConn.ReadNextMessage()
var rawMsg []byte
err = p.readPool.Submit(func(buf *buffer.Read) error {
// Before reading the body of the message, set the read timeout
// accordingly to ensure we don't block other readers using the
// pool. We do so only after the task has been scheduled to
// ensure the deadline doesn't expire while the message is in
// the process of being scheduled.
readDeadline := time.Now().Add(readMessageTimeout)
readErr := noiseConn.SetReadDeadline(readDeadline)
if readErr != nil {
return readErr
}
rawMsg, readErr = noiseConn.ReadNextBody(buf[:pktLen])
return readErr
})
atomic.AddUint64(&p.bytesReceived, uint64(len(rawMsg)))
if err != nil {
return nil, err

View File

@ -173,6 +173,8 @@ type server struct {
writePool *pool.Write
readPool *pool.Read
// globalFeatures feature vector which affects HTLCs and thus are also
// advertised to other nodes.
globalFeatures *lnwire.FeatureVector
@ -263,19 +265,31 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
sharedSecretPath := filepath.Join(graphDir, "sphinxreplay.db")
replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier)
sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog)
writeBufferPool := pool.NewWriteBuffer(
pool.DefaultWriteBufferGCInterval,
pool.DefaultWriteBufferExpiryInterval,
)
writePool := pool.NewWrite(
writeBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout,
)
readBufferPool := pool.NewReadBuffer(
pool.DefaultReadBufferGCInterval,
pool.DefaultReadBufferExpiryInterval,
)
readPool := pool.NewRead(
readBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout,
)
s := &server{
chanDB: chanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
writePool: writePool,
readPool: readPool,
invoices: invoices.NewRegistry(chanDB, activeNetParams.Params),
@ -1016,6 +1030,9 @@ func (s *server) Start() error {
if err := s.writePool.Start(); err != nil {
return err
}
if err := s.readPool.Start(); err != nil {
return err
}
if err := s.cc.chainNotifier.Start(); err != nil {
return err
}
@ -1136,6 +1153,7 @@ func (s *server) Stop() error {
s.sigPool.Stop()
s.writePool.Stop()
s.readPool.Stop()
return nil
}