diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go new file mode 100644 index 00000000..08d848a4 --- /dev/null +++ b/htlcswitch/queue.go @@ -0,0 +1,117 @@ +package htlcswitch + +import ( + "container/list" + "sync" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// packetQueue represent the wrapper around the original queue plus the +// functionality for releasing the queue objects in object channel. Such +// structures allows storing of all pending object in queue before the moment of +// actual releasing. +// TODO(andrew.shvv) structure not preserve the order if object failed second +// time. +type packetQueue struct { + *list.List + sync.Mutex + + // pending channel is needed in order to send the object which should + // be re-proceed. + pending chan *htlcPacket + + // grab channel represents the channel-lock which is needed in order + // to make "release" goroutines block during other release goroutine + // processing. + grab chan struct{} +} + +func newWaitingQueue() *packetQueue { + // Initialize grab channel and release one slot, otherwise release + // function will block. + done := make(chan struct{}, 1) + done <- struct{}{} + + return &packetQueue{ + pending: make(chan *htlcPacket), + grab: done, + List: list.New(), + } +} + +// consume function take the given packet and store it in queue till release +// function will be executed. +func (q *packetQueue) consume(packet *htlcPacket) { + q.Lock() + defer q.Unlock() + + q.PushBack(packet) +} + +// release function spawn the goroutine which grab the object from pending +// queue and pass it in processing channel. +func (q *packetQueue) release() { + q.Lock() + defer q.Unlock() + + if q.Len() == 0 { + return + } + + go func() { + // Grab the pending mutex so that other goroutines waits + // before grabbing the object, otherwise the objects will be + // send in the pending channel in random sequence. + <-q.grab + + defer func() { + // Release the channel-lock and give other goroutines the + // ability to + q.grab <- struct{}{} + }() + + // Fetch object after releasing the pending mutex in order to + // preserver the order of the stored objects. + q.Lock() + e := q.Front() + q.Unlock() + + if e != nil { + // Send the object in object queue and wait it to + // be processed by other side. + q.pending <- e.Value.(*htlcPacket) + + // After object have been preprocessed remove it from + // the queue. + q.Lock() + q.Remove(e) + q.Unlock() + return + } + }() +} + +// length returns the number of pending htlc packets which is stored in queue. +func (q *packetQueue) length() int { + q.Lock() + defer q.Unlock() + + return q.Len() +} + +// pendingAmount returns the amount of money which is stored in pending queue. +func (q *packetQueue) pendingAmount() btcutil.Amount { + q.Lock() + defer q.Unlock() + + var amount btcutil.Amount + for e := q.Front(); e != nil; e = e.Next() { + packet := e.Value.(*htlcPacket) + htlc := packet.htlc.(*lnwire.UpdateAddHTLC) + amount += htlc.Amount + } + + return amount +} diff --git a/htlcswitch/queue_test.go b/htlcswitch/queue_test.go new file mode 100644 index 00000000..91c18f4c --- /dev/null +++ b/htlcswitch/queue_test.go @@ -0,0 +1,43 @@ +package htlcswitch + +import ( + "reflect" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// TestWaitingQueueThreadSafety test the thread safety properties of the +// waiting queue, by executing methods in seprate goroutines which operates +// with the same data. +func TestWaitingQueueThreadSafety(t *testing.T) { + q := newWaitingQueue() + + a := make([]btcutil.Amount, 1000) + for i := 0; i < len(a); i++ { + a[i] = btcutil.Amount(i) + q.consume(&htlcPacket{ + amount: btcutil.Amount(i), + htlc: &lnwire.UpdateAddHTLC{}, + }) + } + + var b []btcutil.Amount + for i := 0; i < len(a); i++ { + q.release() + + select { + case packet := <-q.pending: + b = append(b, packet.amount) + + case <-time.After(2 * time.Second): + t.Fatal("timeout") + } + } + + if !reflect.DeepEqual(b, a) { + t.Fatal("wrong order of the objects") + } +}