peer+server: switch to pool.Write from pool.WriteBuffer
This commit is contained in:
parent
ce1bd4be2c
commit
9a3c0b8bca
67
peer.go
67
peer.go
|
@ -18,7 +18,6 @@ import (
|
||||||
"github.com/davecgh/go-spew/spew"
|
"github.com/davecgh/go-spew/spew"
|
||||||
|
|
||||||
"github.com/lightningnetwork/lnd/brontide"
|
"github.com/lightningnetwork/lnd/brontide"
|
||||||
"github.com/lightningnetwork/lnd/buffer"
|
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/contractcourt"
|
"github.com/lightningnetwork/lnd/contractcourt"
|
||||||
|
@ -26,6 +25,7 @@ import (
|
||||||
"github.com/lightningnetwork/lnd/lnpeer"
|
"github.com/lightningnetwork/lnd/lnpeer"
|
||||||
"github.com/lightningnetwork/lnd/lnwallet"
|
"github.com/lightningnetwork/lnd/lnwallet"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
"github.com/lightningnetwork/lnd/pool"
|
||||||
"github.com/lightningnetwork/lnd/ticker"
|
"github.com/lightningnetwork/lnd/ticker"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -209,11 +209,11 @@ type peer struct {
|
||||||
// TODO(halseth): remove when link failure is properly handled.
|
// TODO(halseth): remove when link failure is properly handled.
|
||||||
failedChannels map[lnwire.ChannelID]struct{}
|
failedChannels map[lnwire.ChannelID]struct{}
|
||||||
|
|
||||||
// writeBuf is a buffer that we'll re-use in order to encode wire
|
// writePool is the task pool to that manages reuse of write buffers.
|
||||||
// messages to write out directly on the socket. By re-using this
|
// Write tasks are submitted to the pool in order to conserve the total
|
||||||
// buffer, we avoid needing to allocate more memory each time a new
|
// number of write buffers allocated at any one time, and decouple write
|
||||||
// message is to be sent to a peer.
|
// buffer allocation from the peer life cycle.
|
||||||
writeBuf *buffer.Write
|
writePool *pool.Write
|
||||||
|
|
||||||
queueQuit chan struct{}
|
queueQuit chan struct{}
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
@ -258,7 +258,7 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
|
||||||
|
|
||||||
chanActiveTimeout: chanActiveTimeout,
|
chanActiveTimeout: chanActiveTimeout,
|
||||||
|
|
||||||
writeBuf: server.writeBufferPool.Take(),
|
writePool: server.writePool,
|
||||||
|
|
||||||
queueQuit: make(chan struct{}),
|
queueQuit: make(chan struct{}),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
|
@ -608,11 +608,6 @@ func (p *peer) WaitForDisconnect(ready chan struct{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
p.wg.Wait()
|
p.wg.Wait()
|
||||||
|
|
||||||
// Now that we are certain all active goroutines which could have been
|
|
||||||
// modifying the write buffer have exited, return the buffer to the pool
|
|
||||||
// to be reused.
|
|
||||||
p.server.writeBufferPool.Return(p.writeBuf)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disconnect terminates the connection with the remote peer. Additionally, a
|
// Disconnect terminates the connection with the remote peer. Additionally, a
|
||||||
|
@ -1359,33 +1354,33 @@ func (p *peer) writeMessage(msg lnwire.Message) error {
|
||||||
|
|
||||||
p.logWireMessage(msg, false)
|
p.logWireMessage(msg, false)
|
||||||
|
|
||||||
// We'll re-slice of static write buffer to allow this new message to
|
var n int
|
||||||
// utilize all available space. We also ensure we cap the capacity of
|
err := p.writePool.Submit(func(buf *bytes.Buffer) error {
|
||||||
// this new buffer to the static buffer which is sized for the largest
|
// Using a buffer allocated by the write pool, encode the
|
||||||
// possible protocol message.
|
// message directly into the buffer.
|
||||||
b := bytes.NewBuffer(p.writeBuf[0:0:len(p.writeBuf)])
|
_, writeErr := lnwire.WriteMessage(buf, msg, 0)
|
||||||
|
if writeErr != nil {
|
||||||
|
return writeErr
|
||||||
|
}
|
||||||
|
|
||||||
// With the temp buffer created and sliced properly (length zero, full
|
// Ensure the write deadline is set before we attempt to send
|
||||||
// capacity), we'll now encode the message directly into this buffer.
|
// the message.
|
||||||
_, err := lnwire.WriteMessage(b, msg, 0)
|
writeDeadline := time.Now().Add(writeMessageTimeout)
|
||||||
if err != nil {
|
writeErr = p.conn.SetWriteDeadline(writeDeadline)
|
||||||
return err
|
if writeErr != nil {
|
||||||
|
return writeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// Finally, write the message itself in a single swoop.
|
||||||
|
n, writeErr = p.conn.Write(buf.Bytes())
|
||||||
|
return writeErr
|
||||||
|
})
|
||||||
|
|
||||||
|
// Record the number of bytes written on the wire, if any.
|
||||||
|
if n > 0 {
|
||||||
|
atomic.AddUint64(&p.bytesSent, uint64(n))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compute and set the write deadline we will impose on the remote peer.
|
|
||||||
writeDeadline := time.Now().Add(writeMessageTimeout)
|
|
||||||
err = p.conn.SetWriteDeadline(writeDeadline)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Finally, write the message itself in a single swoop.
|
|
||||||
n, err := p.conn.Write(b.Bytes())
|
|
||||||
|
|
||||||
// Regardless of the error returned, record how many bytes were written
|
|
||||||
// to the wire.
|
|
||||||
atomic.AddUint64(&p.bytesSent, uint64(n))
|
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
20
server.go
20
server.go
|
@ -171,7 +171,7 @@ type server struct {
|
||||||
|
|
||||||
sigPool *lnwallet.SigPool
|
sigPool *lnwallet.SigPool
|
||||||
|
|
||||||
writeBufferPool *pool.WriteBuffer
|
writePool *pool.Write
|
||||||
|
|
||||||
// globalFeatures feature vector which affects HTLCs and thus are also
|
// globalFeatures feature vector which affects HTLCs and thus are also
|
||||||
// advertised to other nodes.
|
// advertised to other nodes.
|
||||||
|
@ -267,12 +267,15 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
||||||
pool.DefaultWriteBufferGCInterval,
|
pool.DefaultWriteBufferGCInterval,
|
||||||
pool.DefaultWriteBufferExpiryInterval,
|
pool.DefaultWriteBufferExpiryInterval,
|
||||||
)
|
)
|
||||||
|
writePool := pool.NewWrite(
|
||||||
|
writeBufferPool, runtime.NumCPU(), pool.DefaultWorkerTimeout,
|
||||||
|
)
|
||||||
|
|
||||||
s := &server{
|
s := &server{
|
||||||
chanDB: chanDB,
|
chanDB: chanDB,
|
||||||
cc: cc,
|
cc: cc,
|
||||||
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
|
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
|
||||||
writeBufferPool: writeBufferPool,
|
writePool: writePool,
|
||||||
|
|
||||||
invoices: invoices.NewRegistry(chanDB, activeNetParams.Params),
|
invoices: invoices.NewRegistry(chanDB, activeNetParams.Params),
|
||||||
|
|
||||||
|
@ -1010,6 +1013,9 @@ func (s *server) Start() error {
|
||||||
if err := s.sigPool.Start(); err != nil {
|
if err := s.sigPool.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if err := s.writePool.Start(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := s.cc.chainNotifier.Start(); err != nil {
|
if err := s.cc.chainNotifier.Start(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -1102,7 +1108,6 @@ func (s *server) Stop() error {
|
||||||
|
|
||||||
// Shutdown the wallet, funding manager, and the rpc server.
|
// Shutdown the wallet, funding manager, and the rpc server.
|
||||||
s.chanStatusMgr.Stop()
|
s.chanStatusMgr.Stop()
|
||||||
s.sigPool.Stop()
|
|
||||||
s.cc.chainNotifier.Stop()
|
s.cc.chainNotifier.Stop()
|
||||||
s.chanRouter.Stop()
|
s.chanRouter.Stop()
|
||||||
s.htlcSwitch.Stop()
|
s.htlcSwitch.Stop()
|
||||||
|
@ -1129,6 +1134,9 @@ func (s *server) Stop() error {
|
||||||
// Wait for all lingering goroutines to quit.
|
// Wait for all lingering goroutines to quit.
|
||||||
s.wg.Wait()
|
s.wg.Wait()
|
||||||
|
|
||||||
|
s.sigPool.Stop()
|
||||||
|
s.writePool.Stop()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user