diff --git a/queue/gc_queue.go b/queue/gc_queue.go new file mode 100644 index 00000000..41366c33 --- /dev/null +++ b/queue/gc_queue.go @@ -0,0 +1,210 @@ +package queue + +import ( + "container/list" + "sync" + "time" + + "github.com/lightningnetwork/lnd/ticker" +) + +// Recycler is an interface that allows an object to be reclaimed without +// needing to be returned to the runtime. +type Recycler interface { + // Recycle resets the object to its default state. + Recycle() +} + +// gcQueueEntry is a tuple containing a Recycler and the time at which the item +// was added to the queue. The recorded time is used to determine when the entry +// becomes stale, and can be released if it has not already been taken. +type gcQueueEntry struct { + item Recycler + time time.Time +} + +// GCQueue is garbage collecting queue, which dynamically grows and contracts +// based on load. If the queue has items which have been returned, the queue +// will check every gcInterval amount of time to see if any elements are +// eligible to be released back to the runtime. Elements that have been in the +// queue for a duration of least expiryInterval will be released upon the next +// iteration of the garbage collection, thus the maximum amount of time an +// element remain in the queue is expiryInterval+gcInterval. The gc ticker will +// be disabled after all items in the queue have been taken or released to +// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in +// the steady state. +type GCQueue struct { + // takeBuffer coordinates the delivery of items taken from the queue + // such that they are delivered to requesters. + takeBuffer chan Recycler + + // returnBuffer coordinates the return of items back into the queue, + // where they will be kept until retaken or released. + returnBuffer chan Recycler + + // newItem is a constructor, used to generate new elements if none are + // otherwise available for reuse. + newItem func() Recycler + + // expiryInterval is the minimum amount of time an element will remain + // in the queue before being released. + expiryInterval time.Duration + + // recycleTicker is a resumable ticker used to trigger a sweep to + // release elements that have been in the queue longer than + // expiryInterval. + recycleTicker ticker.Ticker + + // freeList maintains a list of gcQueueEntries, sorted in order of + // increasing time of arrival. + freeList *list.List + + wg sync.WaitGroup + quit chan struct{} +} + +// NewGCQueue creates a new garbage collecting queue, which dynamically grows +// and contracts based on load. If the queue has items which have been returned, +// the queue will check every gcInterval amount of time to see if any elements +// are eligible to be released back to the runtime. Elements that have been in +// the queue for a duration of least expiryInterval will be released upon the +// next iteration of the garbage collection, thus the maximum amount of time an +// element remain in the queue is expiryInterval+gcInterval. The gc ticker will +// be disabled after all items in the queue have been taken or released to +// ensure that the GCQueue becomes quiescent, and imposes minimal overhead in +// the steady state. The returnQueueSize parameter is used to size the maximal +// number of items that can be returned without being dropped during large +// bursts in attempts to return items to the GCQUeue. +func NewGCQueue(newItem func() Recycler, returnQueueSize int, + gcInterval, expiryInterval time.Duration) *GCQueue { + + q := &GCQueue{ + takeBuffer: make(chan Recycler), + returnBuffer: make(chan Recycler, returnQueueSize), + expiryInterval: expiryInterval, + freeList: list.New(), + recycleTicker: ticker.New(gcInterval), + newItem: newItem, + quit: make(chan struct{}), + } + + go q.queueManager() + + return q +} + +// Take returns either a recycled element from the queue, or creates a new item +// if none are available. +func (q *GCQueue) Take() Recycler { + select { + case item := <-q.takeBuffer: + return item + case <-time.After(time.Millisecond): + return q.newItem() + } +} + +// Return adds the returned item to freelist if the queue's returnBuffer has +// available capacity. Under load, items may be dropped to ensure this method +// does not block. +func (q *GCQueue) Return(item Recycler) { + // Recycle the item to ensure that a dirty instance is never offered + // from Take. The call is done here so that the CPU cycles spent + // clearing the buffer are owned by the caller, and not by the queue + // itself. This makes the queue more likely to be available to deliver + // items in the free list. + item.Recycle() + + select { + case q.returnBuffer <- item: + default: + } +} + +// queueManager maintains the free list of elements by popping the head of the +// queue when items are needed, and appending them to the end of the queue when +// items are returned. The queueManager will periodically attempt to release any +// items that have been in the queue longer than the expiry interval. +// +// NOTE: This method SHOULD be run as a goroutine. +func (q *GCQueue) queueManager() { + for { + // If the pool is empty, initialize a buffer pool to serve a + // client that takes a buffer immediately. If this happens, this + // is either: + // 1) the first iteration of the loop, + // 2) after all entries were garbage collected, or + // 3) the freelist was emptied after the last entry was taken. + // + // In all of these cases, it is safe to pause the recycle ticker + // since it will be resumed as soon an entry is returned to the + // freelist. + if q.freeList.Len() == 0 { + q.freeList.PushBack(gcQueueEntry{ + item: q.newItem(), + time: time.Now(), + }) + + q.recycleTicker.Pause() + } + + next := q.freeList.Front() + + select { + + // If a client requests a new write buffer, deliver the buffer + // at the head of the freelist to them. + case q.takeBuffer <- next.Value.(gcQueueEntry).item: + q.freeList.Remove(next) + + // If a client is returning a write buffer, add it to the free + // list and resume the recycle ticker so that it can be cleared + // if the entries are not quickly reused. + case item := <-q.returnBuffer: + // Add the returned buffer to the freelist, recording + // the current time so we can determine when the entry + // expires. + q.freeList.PushBack(gcQueueEntry{ + item: item, + time: time.Now(), + }) + + // Adding the buffer implies that we now have a non-zero + // number of elements in the free list. Resume the + // recycle ticker to cleanup any entries that go unused. + q.recycleTicker.Resume() + + // If the recycle ticker fires, we will aggresively release any + // write buffers in the freelist for which the expiryInterval + // has elapsed since their insertion. If after doing so, no + // elements remain, we will pause the recylce ticker. + case <-q.recycleTicker.Ticks(): + // Since the insert time of all entries will be + // monotonically increasing, iterate over elements and + // remove all entries that have expired. + var next *list.Element + for e := q.freeList.Front(); e != nil; e = next { + // Cache the next element, since it will become + // unreachable from the current element if it is + // removed. + next = e.Next() + entry := e.Value.(gcQueueEntry) + + // Use now - insertTime > expiryInterval to + // determine if this entry has expired. + if time.Since(entry.time) > q.expiryInterval { + // Remove the expired entry from the + // linked-list. + q.freeList.Remove(e) + entry.item = nil + e.Value = nil + } else { + // If this entry hasn't expired, then + // all entries that follow will still be + // valid. + break + } + } + } + } +}