htlcswitch: add packet queue

In this commit pending packet queue have been added. This queue
consumes the htlc packet, store it inside the golang list and send it
to the pending channel upon release notification.
This commit is contained in:
Andrey Samokhvalov 2017-05-24 17:34:35 +03:00 committed by Olaoluwa Osuntokun
parent 06946f3911
commit 22d90d6b35
2 changed files with 160 additions and 0 deletions

117
htlcswitch/queue.go Normal file

@ -0,0 +1,117 @@
package htlcswitch
import (
"container/list"
"sync"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcutil"
)
// packetQueue represent the wrapper around the original queue plus the
// functionality for releasing the queue objects in object channel. Such
// structures allows storing of all pending object in queue before the moment of
// actual releasing.
// TODO(andrew.shvv) structure not preserve the order if object failed second
// time.
type packetQueue struct {
*list.List
sync.Mutex
// pending channel is needed in order to send the object which should
// be re-proceed.
pending chan *htlcPacket
// grab channel represents the channel-lock which is needed in order
// to make "release" goroutines block during other release goroutine
// processing.
grab chan struct{}
}
func newWaitingQueue() *packetQueue {
// Initialize grab channel and release one slot, otherwise release
// function will block.
done := make(chan struct{}, 1)
done <- struct{}{}
return &packetQueue{
pending: make(chan *htlcPacket),
grab: done,
List: list.New(),
}
}
// consume function take the given packet and store it in queue till release
// function will be executed.
func (q *packetQueue) consume(packet *htlcPacket) {
q.Lock()
defer q.Unlock()
q.PushBack(packet)
}
// release function spawn the goroutine which grab the object from pending
// queue and pass it in processing channel.
func (q *packetQueue) release() {
q.Lock()
defer q.Unlock()
if q.Len() == 0 {
return
}
go func() {
// Grab the pending mutex so that other goroutines waits
// before grabbing the object, otherwise the objects will be
// send in the pending channel in random sequence.
<-q.grab
defer func() {
// Release the channel-lock and give other goroutines the
// ability to
q.grab <- struct{}{}
}()
// Fetch object after releasing the pending mutex in order to
// preserver the order of the stored objects.
q.Lock()
e := q.Front()
q.Unlock()
if e != nil {
// Send the object in object queue and wait it to
// be processed by other side.
q.pending <- e.Value.(*htlcPacket)
// After object have been preprocessed remove it from
// the queue.
q.Lock()
q.Remove(e)
q.Unlock()
return
}
}()
}
// length returns the number of pending htlc packets which is stored in queue.
func (q *packetQueue) length() int {
q.Lock()
defer q.Unlock()
return q.Len()
}
// pendingAmount returns the amount of money which is stored in pending queue.
func (q *packetQueue) pendingAmount() btcutil.Amount {
q.Lock()
defer q.Unlock()
var amount btcutil.Amount
for e := q.Front(); e != nil; e = e.Next() {
packet := e.Value.(*htlcPacket)
htlc := packet.htlc.(*lnwire.UpdateAddHTLC)
amount += htlc.Amount
}
return amount
}

43
htlcswitch/queue_test.go Normal file

@ -0,0 +1,43 @@
package htlcswitch
import (
"reflect"
"testing"
"time"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcutil"
)
// TestWaitingQueueThreadSafety test the thread safety properties of the
// waiting queue, by executing methods in seprate goroutines which operates
// with the same data.
func TestWaitingQueueThreadSafety(t *testing.T) {
q := newWaitingQueue()
a := make([]btcutil.Amount, 1000)
for i := 0; i < len(a); i++ {
a[i] = btcutil.Amount(i)
q.consume(&htlcPacket{
amount: btcutil.Amount(i),
htlc: &lnwire.UpdateAddHTLC{},
})
}
var b []btcutil.Amount
for i := 0; i < len(a); i++ {
q.release()
select {
case packet := <-q.pending:
b = append(b, packet.amount)
case <-time.After(2 * time.Second):
t.Fatal("timeout")
}
}
if !reflect.DeepEqual(b, a) {
t.Fatal("wrong order of the objects")
}
}