lnd.xprv/queue/gc_queue.go

197 lines
6.8 KiB
Go
Raw Normal View History

package queue
import (
"container/list"
"sync"
"time"
"github.com/lightningnetwork/lnd/ticker"
)
// GCQueue is garbage collecting queue, which dynamically grows and contracts
// based on load. If the queue has items which have been returned, the queue
// will check every gcInterval amount of time to see if any elements are
// eligible to be released back to the runtime. Elements that have been in the
// queue for a duration of least expiryInterval will be released upon the next
// iteration of the garbage collection, thus the maximum amount of time an
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
// be disabled after all items in the queue have been taken or released to
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
// the steady state.
type GCQueue struct {
// takeBuffer coordinates the delivery of items taken from the queue
// such that they are delivered to requesters.
takeBuffer chan interface{}
// returnBuffer coordinates the return of items back into the queue,
// where they will be kept until retaken or released.
returnBuffer chan interface{}
// newItem is a constructor, used to generate new elements if none are
// otherwise available for reuse.
newItem func() interface{}
// expiryInterval is the minimum amount of time an element will remain
// in the queue before being released.
expiryInterval time.Duration
// recycleTicker is a resumable ticker used to trigger a sweep to
// release elements that have been in the queue longer than
// expiryInterval.
recycleTicker ticker.Ticker
// freeList maintains a list of gcQueueEntries, sorted in order of
// increasing time of arrival.
freeList *list.List
wg sync.WaitGroup
quit chan struct{}
}
// NewGCQueue creates a new garbage collecting queue, which dynamically grows
// and contracts based on load. If the queue has items which have been returned,
// the queue will check every gcInterval amount of time to see if any elements
// are eligible to be released back to the runtime. Elements that have been in
// the queue for a duration of least expiryInterval will be released upon the
// next iteration of the garbage collection, thus the maximum amount of time an
// element remain in the queue is expiryInterval+gcInterval. The gc ticker will
// be disabled after all items in the queue have been taken or released to
// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in
// the steady state. The returnQueueSize parameter is used to size the maximal
// number of items that can be returned without being dropped during large
// bursts in attempts to return items to the GCQUeue.
func NewGCQueue(newItem func() interface{}, returnQueueSize int,
gcInterval, expiryInterval time.Duration) *GCQueue {
q := &GCQueue{
takeBuffer: make(chan interface{}),
returnBuffer: make(chan interface{}, returnQueueSize),
expiryInterval: expiryInterval,
freeList: list.New(),
recycleTicker: ticker.New(gcInterval),
newItem: newItem,
quit: make(chan struct{}),
}
go q.queueManager()
return q
}
// Take returns either a recycled element from the queue, or creates a new item
// if none are available.
func (q *GCQueue) Take() interface{} {
select {
case item := <-q.takeBuffer:
return item
case <-time.After(time.Millisecond):
return q.newItem()
}
}
// Return adds the returned item to freelist if the queue's returnBuffer has
// available capacity. Under load, items may be dropped to ensure this method
// does not block.
func (q *GCQueue) Return(item interface{}) {
select {
case q.returnBuffer <- item:
default:
}
}
// gcQueueEntry is a tuple containing an interface{} and the time at which the
// item was added to the queue. The recorded time is used to determine when the
// entry becomes stale, and can be released if it has not already been taken.
type gcQueueEntry struct {
item interface{}
time time.Time
}
// queueManager maintains the free list of elements by popping the head of the
// queue when items are needed, and appending them to the end of the queue when
// items are returned. The queueManager will periodically attempt to release any
// items that have been in the queue longer than the expiry interval.
//
// NOTE: This method SHOULD be run as a goroutine.
func (q *GCQueue) queueManager() {
for {
// If the pool is empty, initialize a buffer pool to serve a
// client that takes a buffer immediately. If this happens, this
// is either:
// 1) the first iteration of the loop,
// 2) after all entries were garbage collected, or
// 3) the freelist was emptied after the last entry was taken.
//
// In all of these cases, it is safe to pause the recycle ticker
// since it will be resumed as soon an entry is returned to the
// freelist.
if q.freeList.Len() == 0 {
q.freeList.PushBack(gcQueueEntry{
item: q.newItem(),
time: time.Now(),
})
q.recycleTicker.Pause()
}
next := q.freeList.Front()
select {
// If a client requests a new write buffer, deliver the buffer
// at the head of the freelist to them.
case q.takeBuffer <- next.Value.(gcQueueEntry).item:
q.freeList.Remove(next)
// If a client is returning a write buffer, add it to the free
// list and resume the recycle ticker so that it can be cleared
// if the entries are not quickly reused.
case item := <-q.returnBuffer:
// Add the returned buffer to the freelist, recording
// the current time so we can determine when the entry
// expires.
q.freeList.PushBack(gcQueueEntry{
item: item,
time: time.Now(),
})
// Adding the buffer implies that we now have a non-zero
// number of elements in the free list. Resume the
// recycle ticker to cleanup any entries that go unused.
q.recycleTicker.Resume()
// If the recycle ticker fires, we will aggresively release any
// write buffers in the freelist for which the expiryInterval
// has elapsed since their insertion. If after doing so, no
// elements remain, we will pause the recylce ticker.
case <-q.recycleTicker.Ticks():
// Since the insert time of all entries will be
// monotonically increasing, iterate over elements and
// remove all entries that have expired.
var next *list.Element
for e := q.freeList.Front(); e != nil; e = next {
// Cache the next element, since it will become
// unreachable from the current element if it is
// removed.
next = e.Next()
entry := e.Value.(gcQueueEntry)
// Use now - insertTime <= expiryInterval to
// determine if this entry has not expired.
if time.Since(entry.time) <= q.expiryInterval {
// If this entry hasn't expired, then
// all entries that follow will still be
// valid.
break
}
// Otherwise, remove the expired entry from the
// linked-list.
q.freeList.Remove(e)
entry.item = nil
e.Value = nil
}
}
}
}