Merge pull request #2385 from cfromknecht/peer-write-buffer

peer: write buffer pool
This commit is contained in:
Olaoluwa Osuntokun 2019-02-01 17:04:30 -08:00 committed by GitHub
commit bceb048a76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 540 additions and 4 deletions

@ -0,0 +1,79 @@
package lnpeer
import (
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
)
const (
// DefaultGCInterval is the default interval that the WriteBufferPool
// will perform a sweep to see which expired buffers can be released to
// the runtime.
DefaultGCInterval = 15 * time.Second
// DefaultExpiryInterval is the default, minimum interval that must
// elapse before a WriteBuffer will be released. The maximum time before
// the buffer can be released is equal to the expiry interval plus the
// gc interval.
DefaultExpiryInterval = 30 * time.Second
)
// WriteBuffer is static byte array occupying to maximum-allowed
// plaintext-message size.
type WriteBuffer [lnwire.MaxMessagePayload]byte
// Recycle zeroes the WriteBuffer, making it fresh for another use.
// Zeroing the buffer using a logarithmic number of calls to the optimized copy
// method. Benchmarking shows this to be ~30 times faster than a for loop that
// sets each index to 0 for this buffer size. Inspired by:
// https://stackoverflow.com/questions/30614165/is-there-analog-of-memset-in-go
//
// This is part of the queue.Recycler interface.
func (b *WriteBuffer) Recycle() {
b[0] = 0
for i := 1; i < lnwire.MaxMessagePayload; i *= 2 {
copy(b[i:], b[:i])
}
}
// newRecyclableWriteBuffer is a constructor that returns a WriteBuffer typed as
// a queue.Recycler.
func newRecyclableWriteBuffer() queue.Recycler {
return new(WriteBuffer)
}
// A compile-time constraint to ensure that *WriteBuffer implements the
// queue.Recycler interface.
var _ queue.Recycler = (*WriteBuffer)(nil)
// WriteBufferPool acts a global pool of WriteBuffers, that dynamically
// allocates and reclaims buffers in response to load.
type WriteBufferPool struct {
pool *queue.GCQueue
}
// NewWriteBufferPool returns a freshly instantiated WriteBufferPool, using the
// given gcInterval and expiryIntervals.
func NewWriteBufferPool(
gcInterval, expiryInterval time.Duration) *WriteBufferPool {
return &WriteBufferPool{
pool: queue.NewGCQueue(
newRecyclableWriteBuffer, 100,
gcInterval, expiryInterval,
),
}
}
// Take returns a fresh WriteBuffer to the caller.
func (p *WriteBufferPool) Take() *WriteBuffer {
return p.pool.Take().(*WriteBuffer)
}
// Return returns the WriteBuffer to the pool, so that it can be recycled or
// released.
func (p *WriteBufferPool) Return(buf *WriteBuffer) {
p.pool.Return(buf)
}

@ -0,0 +1,67 @@
package lnpeer_test
import (
"testing"
"time"
"github.com/lightningnetwork/lnd/lnpeer"
)
// TestWriteBufferPool verifies that buffer pool properly resets used write
// buffers.
func TestWriteBufferPool(t *testing.T) {
const (
gcInterval = time.Second
expiryInterval = 250 * time.Millisecond
)
bp := lnpeer.NewWriteBufferPool(gcInterval, expiryInterval)
// Take a fresh write buffer from the pool.
writeBuf := bp.Take()
// Dirty the write buffer.
for i := range writeBuf[:] {
writeBuf[i] = 0xff
}
// Return the buffer to the pool.
bp.Return(writeBuf)
// Take buffers from the pool until we find the original. We expect at
// most two, in the even that a fresh buffer is populated after the
// first is taken.
for i := 0; i < 2; i++ {
// Wait a small duration to ensure the tests behave reliable,
// and don't activate the non-blocking case unintentionally.
<-time.After(time.Millisecond)
// Take a buffer, skipping those whose pointer does not match
// the one we dirtied.
writeBuf2 := bp.Take()
if writeBuf2 != writeBuf {
continue
}
// Finally, verify that the buffer has been properly cleaned.
for i := range writeBuf2[:] {
if writeBuf2[i] != 0 {
t.Fatalf("buffer was not recycled")
}
}
return
}
t.Fatalf("original buffer not found")
}
// BenchmarkWriteBufferRecycle tests how quickly a WriteBuffer can be zeroed.
func BenchmarkWriteBufferRecycle(b *testing.B) {
b.ReportAllocs()
buffer := new(lnpeer.WriteBuffer)
for i := 0; i < b.N; i++ {
buffer.Recycle()
}
}

@ -198,7 +198,7 @@ type peer struct {
// messages to write out directly on the socket. By re-using this
// buffer, we avoid needing to allocate more memory each time a new
// message is to be sent to a peer.
writeBuf [lnwire.MaxMessagePayload]byte
writeBuf *lnpeer.WriteBuffer
queueQuit chan struct{}
quit chan struct{}
@ -239,6 +239,8 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
chanCloseMsgs: make(chan *closeMsg),
failedChannels: make(map[lnwire.ChannelID]struct{}),
writeBuf: server.writeBufferPool.Take(),
queueQuit: make(chan struct{}),
quit: make(chan struct{}),
}
@ -613,6 +615,11 @@ func (p *peer) WaitForDisconnect(ready chan struct{}) {
}
p.wg.Wait()
// Now that we are certain all active goroutines which could have been
// modifying the write buffer have exited, return the buffer to the pool
// to be reused.
p.server.writeBufferPool.Return(p.writeBuf)
}
// Disconnect terminates the connection with the remote peer. Additionally, a

210
queue/gc_queue.go Normal file

@ -0,0 +1,210 @@
package queue
import (
"container/list"
"sync"
"time"
"github.com/lightningnetwork/lnd/ticker"
)
// Recycler is an interface that allows an object to be reclaimed without
// needing to be returned to the runtime.
type Recycler interface {
// Recycle resets the object to its default state.
Recycle()
}
// gcQueueEntry is a tuple containing a Recycler and the time at which the item
// was added to the queue. The recorded time is used to determine when the entry
// becomes stale, and can be released if it has not already been taken.
type gcQueueEntry struct {
item Recycler
time time.Time
}
// GCQueue is garbage collecting queue, which dynamically grows and contracts
// based on load. If the queue has items which have been returned, the queue
// will check every gcInterval amount of time to see if any elements are
// eligible to be released back to the runtime. Elements that have been in the
// queue for a duration of least expiryInterval will be released upon the next
// iteration of the garbage collection, thus the maximum amount of time an
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
// be disabled after all items in the queue have been taken or released to
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
// the steady state.
type GCQueue struct {
// takeBuffer coordinates the delivery of items taken from the queue
// such that they are delivered to requesters.
takeBuffer chan Recycler
// returnBuffer coordinates the return of items back into the queue,
// where they will be kept until retaken or released.
returnBuffer chan Recycler
// newItem is a constructor, used to generate new elements if none are
// otherwise available for reuse.
newItem func() Recycler
// expiryInterval is the minimum amount of time an element will remain
// in the queue before being released.
expiryInterval time.Duration
// recycleTicker is a resumable ticker used to trigger a sweep to
// release elements that have been in the queue longer than
// expiryInterval.
recycleTicker ticker.Ticker
// freeList maintains a list of gcQueueEntries, sorted in order of
// increasing time of arrival.
freeList *list.List
wg sync.WaitGroup
quit chan struct{}
}
// NewGCQueue creates a new garbage collecting queue, which dynamically grows
// and contracts based on load. If the queue has items which have been returned,
// the queue will check every gcInterval amount of time to see if any elements
// are eligible to be released back to the runtime. Elements that have been in
// the queue for a duration of least expiryInterval will be released upon the
// next iteration of the garbage collection, thus the maximum amount of time an
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
// be disabled after all items in the queue have been taken or released to
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
// the steady state. The returnQueueSize parameter is used to size the maximal
// number of items that can be returned without being dropped during large
// bursts in attempts to return items to the GCQUeue.
func NewGCQueue(newItem func() Recycler, returnQueueSize int,
gcInterval, expiryInterval time.Duration) *GCQueue {
q := &GCQueue{
takeBuffer: make(chan Recycler),
returnBuffer: make(chan Recycler, returnQueueSize),
expiryInterval: expiryInterval,
freeList: list.New(),
recycleTicker: ticker.New(gcInterval),
newItem: newItem,
quit: make(chan struct{}),
}
go q.queueManager()
return q
}
// Take returns either a recycled element from the queue, or creates a new item
// if none are available.
func (q *GCQueue) Take() Recycler {
select {
case item := <-q.takeBuffer:
return item
case <-time.After(time.Millisecond):
return q.newItem()
}
}
// Return adds the returned item to freelist if the queue's returnBuffer has
// available capacity. Under load, items may be dropped to ensure this method
// does not block.
func (q *GCQueue) Return(item Recycler) {
// Recycle the item to ensure that a dirty instance is never offered
// from Take. The call is done here so that the CPU cycles spent
// clearing the buffer are owned by the caller, and not by the queue
// itself. This makes the queue more likely to be available to deliver
// items in the free list.
item.Recycle()
select {
case q.returnBuffer <- item:
default:
}
}
// queueManager maintains the free list of elements by popping the head of the
// queue when items are needed, and appending them to the end of the queue when
// items are returned. The queueManager will periodically attempt to release any
// items that have been in the queue longer than the expiry interval.
//
// NOTE: This method SHOULD be run as a goroutine.
func (q *GCQueue) queueManager() {
for {
// If the pool is empty, initialize a buffer pool to serve a
// client that takes a buffer immediately. If this happens, this
// is either:
// 1) the first iteration of the loop,
// 2) after all entries were garbage collected, or
// 3) the freelist was emptied after the last entry was taken.
//
// In all of these cases, it is safe to pause the recycle ticker
// since it will be resumed as soon an entry is returned to the
// freelist.
if q.freeList.Len() == 0 {
q.freeList.PushBack(gcQueueEntry{
item: q.newItem(),
time: time.Now(),
})
q.recycleTicker.Pause()
}
next := q.freeList.Front()
select {
// If a client requests a new write buffer, deliver the buffer
// at the head of the freelist to them.
case q.takeBuffer <- next.Value.(gcQueueEntry).item:
q.freeList.Remove(next)
// If a client is returning a write buffer, add it to the free
// list and resume the recycle ticker so that it can be cleared
// if the entries are not quickly reused.
case item := <-q.returnBuffer:
// Add the returned buffer to the freelist, recording
// the current time so we can determine when the entry
// expires.
q.freeList.PushBack(gcQueueEntry{
item: item,
time: time.Now(),
})
// Adding the buffer implies that we now have a non-zero
// number of elements in the free list. Resume the
// recycle ticker to cleanup any entries that go unused.
q.recycleTicker.Resume()
// If the recycle ticker fires, we will aggresively release any
// write buffers in the freelist for which the expiryInterval
// has elapsed since their insertion. If after doing so, no
// elements remain, we will pause the recylce ticker.
case <-q.recycleTicker.Ticks():
// Since the insert time of all entries will be
// monotonically increasing, iterate over elements and
// remove all entries that have expired.
var next *list.Element
for e := q.freeList.Front(); e != nil; e = next {
// Cache the next element, since it will become
// unreachable from the current element if it is
// removed.
next = e.Next()
entry := e.Value.(gcQueueEntry)
// Use now - insertTime > expiryInterval to
// determine if this entry has expired.
if time.Since(entry.time) > q.expiryInterval {
// Remove the expired entry from the
// linked-list.
q.freeList.Remove(e)
entry.item = nil
e.Value = nil
} else {
// If this entry hasn't expired, then
// all entries that follow will still be
// valid.
break
}
}
}
}
}

167
queue/gc_queue_test.go Normal file

@ -0,0 +1,167 @@
package queue_test
import (
"testing"
"time"
"github.com/lightningnetwork/lnd/queue"
)
// mockRecycler implements the queue.Recycler interface using a NOP.
type mockRecycler bool
func (*mockRecycler) Recycle() {}
// TestGCQueueGCCycle asserts that items that are kept in the GCQueue past their
// expiration will be released by a subsequent gc cycle.
func TestGCQueueGCCycle(t *testing.T) {
t.Parallel()
const (
gcInterval = time.Second
expiryInterval = 250 * time.Millisecond
numItems = 6
)
newItem := func() queue.Recycler { return new(mockRecycler) }
bp := queue.NewGCQueue(newItem, 100, gcInterval, expiryInterval)
// Take numItems items from the queue, and immediately return them.
// Returning the items will trigger the gc ticker to start.
itemSet1 := takeN(t, bp, numItems)
returnAll(bp, itemSet1)
// Allow enough time for all expired items to be released by the queue.
<-time.After(gcInterval + expiryInterval)
// Take another set of numItems items from the queue.
itemSet2 := takeN(t, bp, numItems)
// Since the gc ticker should have elapsed, we expect the intersection
// of sets 1 and 2 to be empty.
for item := range itemSet2 {
if _, ok := itemSet1[item]; ok {
t.Fatalf("items taken should not have been reused")
}
}
}
// TestGCQueuePartialGCCycle asserts that the GCQueue will only garbage collect
// the items in its queue that have fully expired. We test this by adding items
// into the queue such that the garbage collection will occur before the items
// expire. Taking items after the gc cycle should return the items that were not
// released by the gc cycle.
func TestGCQueuePartialGCCycle(t *testing.T) {
t.Parallel()
const (
gcInterval = time.Second
expiryInterval = 250 * time.Millisecond
numItems = 6
)
newItem := func() queue.Recycler { return new(mockRecycler) }
bp := queue.NewGCQueue(newItem, 100, gcInterval, expiryInterval)
// Take numItems items from the gc queue.
itemSet1 := takeN(t, bp, numItems)
// Immediately return half of the items, and construct a set of items
// consisting of the half that were not returned.
halfItemSet1 := returnN(t, bp, itemSet1, numItems/2)
// Wait long enough to ensure that adding subsequent items will not be
// released in the next gc cycle.
<-time.After(gcInterval - expiryInterval/2)
// Return the remaining items from itemSet1.
returnAll(bp, halfItemSet1)
// Wait until the gc cycle as done a sweep of the items and released all
// those that have expired.
<-time.After(expiryInterval / 2)
// Retrieve numItems items from the gc queue.
itemSet2 := takeN(t, bp, numItems)
// Tally the number of items returned from Take that are in the second
// half of items returned.
var numReused int
for item := range itemSet2 {
if _, ok := halfItemSet1[item]; ok {
numReused++
}
}
// We expect the number of reused items to be equal to half numItems.
if numReused != numItems/2 {
t.Fatalf("expected %d items to be reused, got %d",
numItems/2, numReused)
}
}
// takeN draws n items from the provided GCQueue. This method also asserts that
// n unique items are drawn, and then returns the resulting set.
func takeN(t *testing.T, q *queue.GCQueue, n int) map[queue.Recycler]struct{} {
t.Helper()
items := make(map[queue.Recycler]struct{})
for i := 0; i < n; i++ {
// Wait a small duration to ensure the tests behave reliable,
// and don't activate the non-blocking case unintentionally.
<-time.After(time.Millisecond)
items[q.Take()] = struct{}{}
}
if len(items) != n {
t.Fatalf("items taken from gc queue should be distinct, "+
"want %d unique items, got %d", n, len(items))
}
return items
}
// returnAll returns the items of the given set back to the GCQueue.
func returnAll(q *queue.GCQueue, items map[queue.Recycler]struct{}) {
for item := range items {
q.Return(item)
// Wait a small duration to ensure the tests behave reliable,
// and don't activate the non-blocking case unintentionally.
<-time.After(time.Millisecond)
}
}
// returnN returns n items at random from the set of items back to the GCQueue.
// This method fails if the set's cardinality is smaller than n.
func returnN(t *testing.T, q *queue.GCQueue,
items map[queue.Recycler]struct{}, n int) map[queue.Recycler]struct{} {
t.Helper()
var remainingItems = make(map[queue.Recycler]struct{})
var numReturned int
for item := range items {
if numReturned < n {
q.Return(item)
numReturned++
// Wait a small duration to ensure the tests behave
// reliable, and don't activate the non-blocking case
// unintentionally.
<-time.After(time.Millisecond)
} else {
remainingItems[item] = struct{}{}
}
}
if numReturned < n {
t.Fatalf("insufficient number of items to return, need %d, "+
"got %d", n, numReturned)
}
return remainingItems
}

@ -165,6 +165,8 @@ type server struct {
sigPool *lnwallet.SigPool
writeBufferPool *lnpeer.WriteBufferPool
// globalFeatures feature vector which affects HTLCs and thus are also
// advertised to other nodes.
globalFeatures *lnwire.FeatureVector
@ -260,11 +262,15 @@ func newServer(listenAddrs []net.Addr, chanDB *channeldb.DB, cc *chainControl,
sharedSecretPath := filepath.Join(graphDir, "sphinxreplay.db")
replayLog := htlcswitch.NewDecayedLog(sharedSecretPath, cc.chainNotifier)
sphinxRouter := sphinx.NewRouter(privKey, activeNetParams.Params, replayLog)
writeBufferPool := lnpeer.NewWriteBufferPool(
lnpeer.DefaultGCInterval, lnpeer.DefaultExpiryInterval,
)
s := &server{
chanDB: chanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
chanDB: chanDB,
cc: cc,
sigPool: lnwallet.NewSigPool(runtime.NumCPU()*2, cc.signer),
writeBufferPool: writeBufferPool,
invoices: invoices.NewRegistry(chanDB, activeNetParams.Params),