htlcswitch: eliminate internal queryHandler within packetQueue

This commit removes the internal queryHandler within the packetQueue
itself in order to make way for an upcoming commit which uses atomic
variables to report the length of the queue to outside callers.
Additionally, due to the recent change within the channeling, we no
longer need to report the total value of all pending HTLC’s to the
outside world.
This commit is contained in:
Olaoluwa Osuntokun 2017-09-25 12:34:51 -07:00
parent 64317c04f1
commit 210fc6e714
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21

@ -22,7 +22,6 @@ type packetQueue struct {
// commitment transaction.
outgoingPkts chan *htlcPacket
queries chan interface{}
wg sync.WaitGroup
quit chan struct{}
@ -33,7 +32,6 @@ func newPacketQueue() *packetQueue {
p := &packetQueue{
outgoingPkts: make(chan *htlcPacket),
queries: make(chan interface{}),
quit: make(chan struct{}),
}
@ -45,9 +43,8 @@ func newPacketQueue() *packetQueue {
// Start starts all goroutines that packetQueue needs to perform its normal
// duties.
func (p *packetQueue) Start() {
p.wg.Add(2)
p.wg.Add(1)
go p.packetCoordinator()
go p.queryHandler()
}
// Stop signals the packetQueue for a graceful shutdown, and waits for all
@ -121,62 +118,6 @@ func (p *packetQueue) packetCoordinator() {
}
}
// queueLenReq is a request sent to the queryHandler to query for the length of
// the current pending packet queue.
type queueLenReq struct {
resp chan int
}
// totalPendingReq is a request sent to the queryHandler to query for the total
// amount of satoshis pending in the queue at a given instant.
type totalPendingReq struct {
resp chan lnwire.MilliSatoshi
}
// queryHandler is a dedicated goroutine for handling queries from outside
// sub-systems to the packetQueue itself.
func (p *packetQueue) queryHandler() {
defer p.wg.Done()
for {
select {
case query := <-p.queries:
switch req := query.(type) {
case *queueLenReq:
p.queueCond.L.Lock()
select {
case req.resp <- len(p.queue):
case <-p.quit:
p.queueCond.L.Unlock()
return
}
p.queueCond.L.Unlock()
case *totalPendingReq:
p.queueCond.L.Lock()
var amount lnwire.MilliSatoshi
for _, pkt := range p.queue {
amount += pkt.amount
}
select {
case req.resp <- amount:
case <-p.quit:
p.queueCond.L.Unlock()
return
}
p.queueCond.L.Unlock()
}
case <-p.quit:
return
}
}
}
// AddPkt adds the referenced packet to the overflow queue, preserving ordering
// of the existing items.
func (p *packetQueue) AddPkt(pkt *htlcPacket) {
@ -191,44 +132,11 @@ func (p *packetQueue) AddPkt(pkt *htlcPacket) {
p.queueCond.Signal()
}
// Length returns the number of pending htlc packets present within the over
// flow queue.
func (p *packetQueue) Length() int {
lenReq := &queueLenReq{
resp: make(chan int, 1),
}
select {
case p.queries <- lenReq:
case <-p.quit:
return 0
}
select {
case len := <-lenReq.resp:
return len
case <-p.quit:
return 0
}
}
// PendingAmount returns the total sum in satoshis of all the pending
// htlcPackets within the queue.
func (p *packetQueue) PendingAmount() lnwire.MilliSatoshi {
amtReq := &totalPendingReq{
resp: make(chan lnwire.MilliSatoshi, 1),
}
select {
case p.queries <- amtReq:
case <-p.quit:
return 0
}
select {
case amt := <-amtReq.resp:
return amt
case <-p.quit:
return 0
}
}