You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
100 lines
3.0 KiB
100 lines
3.0 KiB
package pool |
|
|
|
import ( |
|
"bytes" |
|
"time" |
|
|
|
"github.com/lightningnetwork/lnd/buffer" |
|
) |
|
|
|
// Write is a worker pool specifically designed for sharing access to |
|
// buffer.Write objects amongst a set of worker goroutines. This enables an |
|
// application to limit the total number of buffer.Write objects allocated at |
|
// any given time. |
|
type Write struct { |
|
workerPool *Worker |
|
bufferPool *WriteBuffer |
|
} |
|
|
|
// NewWrite creates a Write pool, using an underlying Writebuffer pool to |
|
// recycle buffer.Write objects across the lifetime of the Write pool's |
|
// workers. |
|
func NewWrite(writeBufferPool *WriteBuffer, numWorkers int, |
|
workerTimeout time.Duration) *Write { |
|
|
|
w := &Write{ |
|
bufferPool: writeBufferPool, |
|
} |
|
w.workerPool = NewWorker(&WorkerConfig{ |
|
NewWorkerState: w.newWorkerState, |
|
NumWorkers: numWorkers, |
|
WorkerTimeout: workerTimeout, |
|
}) |
|
|
|
return w |
|
} |
|
|
|
// Start safely spins up the Write pool. |
|
func (w *Write) Start() error { |
|
return w.workerPool.Start() |
|
} |
|
|
|
// Stop safely shuts down the Write pool. |
|
func (w *Write) Stop() error { |
|
return w.workerPool.Stop() |
|
} |
|
|
|
// Submit accepts a function closure that provides access to a fresh |
|
// bytes.Buffer backed by a buffer.Write object. The function's execution will |
|
// be allocated to one of the underlying Worker pool's goroutines. |
|
func (w *Write) Submit(inner func(*bytes.Buffer) error) error { |
|
return w.workerPool.Submit(func(s WorkerState) error { |
|
state := s.(*writeWorkerState) |
|
return inner(state.buf) |
|
}) |
|
} |
|
|
|
// writeWorkerState is the per-goroutine state maintained by a Write pool's |
|
// goroutines. |
|
type writeWorkerState struct { |
|
// bufferPool is the pool to which the writeBuf will be returned when |
|
// the goroutine exits. |
|
bufferPool *WriteBuffer |
|
|
|
// writeBuf is the buffer taken from the bufferPool on initialization, |
|
// which will be used to back the buf object provided to any tasks that |
|
// the goroutine processes before exiting. |
|
writeBuf *buffer.Write |
|
|
|
// buf is a buffer backed by writeBuf, that can be written to by tasks |
|
// submitted to the Write pool. The buf will be reset between each task |
|
// processed by a goroutine before exiting, and allows the task |
|
// submitters to interact with the writeBuf as if it were an io.Writer. |
|
buf *bytes.Buffer |
|
} |
|
|
|
// newWorkerState initializes a new writeWorkerState, which will be called |
|
// whenever a new goroutine is allocated to begin processing write tasks. |
|
func (w *Write) newWorkerState() WorkerState { |
|
writeBuf := w.bufferPool.Take() |
|
|
|
return &writeWorkerState{ |
|
bufferPool: w.bufferPool, |
|
writeBuf: writeBuf, |
|
buf: bytes.NewBuffer(writeBuf[0:0:len(writeBuf)]), |
|
} |
|
} |
|
|
|
// Cleanup returns the writeBuf to the underlying buffer pool, and removes the |
|
// goroutine's reference to the writeBuf and encapsulating buf. |
|
func (w *writeWorkerState) Cleanup() { |
|
w.bufferPool.Return(w.writeBuf) |
|
w.writeBuf = nil |
|
w.buf = nil |
|
} |
|
|
|
// Reset resets the bytes.Buffer so that it is zero-length and has the capacity |
|
// of the underlying buffer.Write.k |
|
func (w *writeWorkerState) Reset() { |
|
w.buf.Reset() |
|
}
|
|
|