queue/gc_queue: adds GCQueue to back recyclable pools
This commit is contained in:
parent
9d20ca4a51
commit
717672ad83
210
queue/gc_queue.go
Normal file
210
queue/gc_queue.go
Normal file
@ -0,0 +1,210 @@
|
||||
package queue
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/lightningnetwork/lnd/ticker"
|
||||
)
|
||||
|
||||
// Recycler is an interface that allows an object to be reclaimed without
|
||||
// needing to be returned to the runtime.
|
||||
type Recycler interface {
|
||||
// Recycle resets the object to its default state.
|
||||
Recycle()
|
||||
}
|
||||
|
||||
// gcQueueEntry is a tuple containing a Recycler and the time at which the item
|
||||
// was added to the queue. The recorded time is used to determine when the entry
|
||||
// becomes stale, and can be released if it has not already been taken.
|
||||
type gcQueueEntry struct {
|
||||
item Recycler
|
||||
time time.Time
|
||||
}
|
||||
|
||||
// GCQueue is garbage collecting queue, which dynamically grows and contracts
|
||||
// based on load. If the queue has items which have been returned, the queue
|
||||
// will check every gcInterval amount of time to see if any elements are
|
||||
// eligible to be released back to the runtime. Elements that have been in the
|
||||
// queue for a duration of least expiryInterval will be released upon the next
|
||||
// iteration of the garbage collection, thus the maximum amount of time an
|
||||
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
|
||||
// be disabled after all items in the queue have been taken or released to
|
||||
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
|
||||
// the steady state.
|
||||
type GCQueue struct {
|
||||
// takeBuffer coordinates the delivery of items taken from the queue
|
||||
// such that they are delivered to requesters.
|
||||
takeBuffer chan Recycler
|
||||
|
||||
// returnBuffer coordinates the return of items back into the queue,
|
||||
// where they will be kept until retaken or released.
|
||||
returnBuffer chan Recycler
|
||||
|
||||
// newItem is a constructor, used to generate new elements if none are
|
||||
// otherwise available for reuse.
|
||||
newItem func() Recycler
|
||||
|
||||
// expiryInterval is the minimum amount of time an element will remain
|
||||
// in the queue before being released.
|
||||
expiryInterval time.Duration
|
||||
|
||||
// recycleTicker is a resumable ticker used to trigger a sweep to
|
||||
// release elements that have been in the queue longer than
|
||||
// expiryInterval.
|
||||
recycleTicker ticker.Ticker
|
||||
|
||||
// freeList maintains a list of gcQueueEntries, sorted in order of
|
||||
// increasing time of arrival.
|
||||
freeList *list.List
|
||||
|
||||
wg sync.WaitGroup
|
||||
quit chan struct{}
|
||||
}
|
||||
|
||||
// NewGCQueue creates a new garbage collecting queue, which dynamically grows
|
||||
// and contracts based on load. If the queue has items which have been returned,
|
||||
// the queue will check every gcInterval amount of time to see if any elements
|
||||
// are eligible to be released back to the runtime. Elements that have been in
|
||||
// the queue for a duration of least expiryInterval will be released upon the
|
||||
// next iteration of the garbage collection, thus the maximum amount of time an
|
||||
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
|
||||
// be disabled after all items in the queue have been taken or released to
|
||||
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
|
||||
// the steady state. The returnQueueSize parameter is used to size the maximal
|
||||
// number of items that can be returned without being dropped during large
|
||||
// bursts in attempts to return items to the GCQUeue.
|
||||
func NewGCQueue(newItem func() Recycler, returnQueueSize int,
|
||||
gcInterval, expiryInterval time.Duration) *GCQueue {
|
||||
|
||||
q := &GCQueue{
|
||||
takeBuffer: make(chan Recycler),
|
||||
returnBuffer: make(chan Recycler, returnQueueSize),
|
||||
expiryInterval: expiryInterval,
|
||||
freeList: list.New(),
|
||||
recycleTicker: ticker.New(gcInterval),
|
||||
newItem: newItem,
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
go q.queueManager()
|
||||
|
||||
return q
|
||||
}
|
||||
|
||||
// Take returns either a recycled element from the queue, or creates a new item
|
||||
// if none are available.
|
||||
func (q *GCQueue) Take() Recycler {
|
||||
select {
|
||||
case item := <-q.takeBuffer:
|
||||
return item
|
||||
case <-time.After(time.Millisecond):
|
||||
return q.newItem()
|
||||
}
|
||||
}
|
||||
|
||||
// Return adds the returned item to freelist if the queue's returnBuffer has
|
||||
// available capacity. Under load, items may be dropped to ensure this method
|
||||
// does not block.
|
||||
func (q *GCQueue) Return(item Recycler) {
|
||||
// Recycle the item to ensure that a dirty instance is never offered
|
||||
// from Take. The call is done here so that the CPU cycles spent
|
||||
// clearing the buffer are owned by the caller, and not by the queue
|
||||
// itself. This makes the queue more likely to be available to deliver
|
||||
// items in the free list.
|
||||
item.Recycle()
|
||||
|
||||
select {
|
||||
case q.returnBuffer <- item:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// queueManager maintains the free list of elements by popping the head of the
|
||||
// queue when items are needed, and appending them to the end of the queue when
|
||||
// items are returned. The queueManager will periodically attempt to release any
|
||||
// items that have been in the queue longer than the expiry interval.
|
||||
//
|
||||
// NOTE: This method SHOULD be run as a goroutine.
|
||||
func (q *GCQueue) queueManager() {
|
||||
for {
|
||||
// If the pool is empty, initialize a buffer pool to serve a
|
||||
// client that takes a buffer immediately. If this happens, this
|
||||
// is either:
|
||||
// 1) the first iteration of the loop,
|
||||
// 2) after all entries were garbage collected, or
|
||||
// 3) the freelist was emptied after the last entry was taken.
|
||||
//
|
||||
// In all of these cases, it is safe to pause the recycle ticker
|
||||
// since it will be resumed as soon an entry is returned to the
|
||||
// freelist.
|
||||
if q.freeList.Len() == 0 {
|
||||
q.freeList.PushBack(gcQueueEntry{
|
||||
item: q.newItem(),
|
||||
time: time.Now(),
|
||||
})
|
||||
|
||||
q.recycleTicker.Pause()
|
||||
}
|
||||
|
||||
next := q.freeList.Front()
|
||||
|
||||
select {
|
||||
|
||||
// If a client requests a new write buffer, deliver the buffer
|
||||
// at the head of the freelist to them.
|
||||
case q.takeBuffer <- next.Value.(gcQueueEntry).item:
|
||||
q.freeList.Remove(next)
|
||||
|
||||
// If a client is returning a write buffer, add it to the free
|
||||
// list and resume the recycle ticker so that it can be cleared
|
||||
// if the entries are not quickly reused.
|
||||
case item := <-q.returnBuffer:
|
||||
// Add the returned buffer to the freelist, recording
|
||||
// the current time so we can determine when the entry
|
||||
// expires.
|
||||
q.freeList.PushBack(gcQueueEntry{
|
||||
item: item,
|
||||
time: time.Now(),
|
||||
})
|
||||
|
||||
// Adding the buffer implies that we now have a non-zero
|
||||
// number of elements in the free list. Resume the
|
||||
// recycle ticker to cleanup any entries that go unused.
|
||||
q.recycleTicker.Resume()
|
||||
|
||||
// If the recycle ticker fires, we will aggresively release any
|
||||
// write buffers in the freelist for which the expiryInterval
|
||||
// has elapsed since their insertion. If after doing so, no
|
||||
// elements remain, we will pause the recylce ticker.
|
||||
case <-q.recycleTicker.Ticks():
|
||||
// Since the insert time of all entries will be
|
||||
// monotonically increasing, iterate over elements and
|
||||
// remove all entries that have expired.
|
||||
var next *list.Element
|
||||
for e := q.freeList.Front(); e != nil; e = next {
|
||||
// Cache the next element, since it will become
|
||||
// unreachable from the current element if it is
|
||||
// removed.
|
||||
next = e.Next()
|
||||
entry := e.Value.(gcQueueEntry)
|
||||
|
||||
// Use now - insertTime > expiryInterval to
|
||||
// determine if this entry has expired.
|
||||
if time.Since(entry.time) > q.expiryInterval {
|
||||
// Remove the expired entry from the
|
||||
// linked-list.
|
||||
q.freeList.Remove(e)
|
||||
entry.item = nil
|
||||
e.Value = nil
|
||||
} else {
|
||||
// If this entry hasn't expired, then
|
||||
// all entries that follow will still be
|
||||
// valid.
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user