peer+server: use server-wide writeBufferPool for peer write buffers
This commit is contained in:
parent
40c5e5e19f
commit
24dcd3c44e
9
peer.go
9
peer.go
|
@ -198,7 +198,7 @@ type peer struct {
|
||||||
// messages to write out directly on the socket. By re-using this
|
// messages to write out directly on the socket. By re-using this
|
||||||
// buffer, we avoid needing to allocate more memory each time a new
|
// buffer, we avoid needing to allocate more memory each time a new
|
||||||
// message is to be sent to a peer.
|
// message is to be sent to a peer.
|
||||||
writeBuf [lnwire.MaxMessagePayload]byte
|
writeBuf *lnpeer.WriteBuffer
|
||||||
|
|
||||||
queueQuit chan struct{}
|
queueQuit chan struct{}
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
@ -239,6 +239,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
|
||||||
chanCloseMsgs: make(chan *closeMsg),
|
chanCloseMsgs: make(chan *closeMsg),
|
||||||
failedChannels: make(map[lnwire.ChannelID]struct{}),
|
failedChannels: make(map[lnwire.ChannelID]struct{}),
|
||||||
|
|
||||||
|
writeBuf: server.writeBufferPool.Take(),
|
||||||
|
|
||||||
queueQuit: make(chan struct{}),
|
queueQuit: make(chan struct{}),
|
||||||
quit: make(chan struct{}),
|
quit: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
@ -613,6 +615,11 @@ func (p *peer) WaitForDisconnect(ready chan struct{}) {
|
||||||
}
|
}
|
||||||
|
|
||||||
p.wg.Wait()
|
p.wg.Wait()
|
||||||
|
|
||||||
|
// Now that we are certain all active goroutines which could have been
|
||||||
|
// modifying the write buffer have exited, return the buffer to the pool
|
||||||
|
// to be reused.
|
||||||
|
p.server.writeBufferPool.Return(p.writeBuf)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Disconnect terminates the connection with the remote peer. Additionally, a
|
// Disconnect terminates the connection with the remote peer. Additionally, a
|
||||||
|
|
12
server.go
12
server.go
|
@ -165,6 +165,8 @@ type server struct {
|
||||||
|
|
||||||
sigPool *lnwallet.SigPool
|
sigPool *lnwallet.SigPool
|
||||||
|
|
||||||
|
writeBufferPool *lnpeer.WriteBufferPool
|
||||||
|
|
||||||
// globalFeatures feature vector which affects HTLCs and thus are also
|
// globalFeatures feature vector which affects HTLCs and thus are also
|
||||||
// advertised to other nodes.
|
// advertised to other nodes.
|
||||||
globalFeatures *lnwire.FeatureVector
|
globalFeatures *lnwire.FeatureVector
|
||||||
|
@ -260,11 +262,15 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
|
||||||
sharedSecretPath := filepath.Join(graphDir, "sphinxreplay.db")
|
sharedSecretPath := filepath.Join(graphDir, "sphinxreplay.db")
|
||||||
replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier)
|
replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier)
|
||||||
sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog)
|
sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog)
|
||||||
|
writeBufferPool := lnpeer.NewWriteBufferPool(
|
||||||
|
lnpeer.DefaultGCInterval, lnpeer.DefaultExpiryInterval,
|
||||||
|
)
|
||||||
|
|
||||||
s := &server{
|
s := &server{
|
||||||
chanDB: chanDB,
|
chanDB: chanDB,
|
||||||
cc: cc,
|
cc: cc,
|
||||||
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
|
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
|
||||||
|
writeBufferPool: writeBufferPool,
|
||||||
|
|
||||||
invoices: invoices.NewRegistry(chanDB, activeNetParams.Params),
|
invoices: invoices.NewRegistry(chanDB, activeNetParams.Params),
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user