From 22d90d6b35bd65bdcfd9bd7e9dd328653c551ba1 Mon Sep 17 00:00:00 2001 From: Andrey Samokhvalov Date: Wed, 24 May 2017 17:34:35 +0300 Subject: [PATCH] htlcswitch: add packet queue In this commit pending packet queue have been added. This queue consumes the htlc packet, store it inside the golang list and send it to the pending channel upon release notification. --- htlcswitch/queue.go | 117 +++++++++++++++++++++++++++++++++++++++ htlcswitch/queue_test.go | 43 ++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 htlcswitch/queue.go create mode 100644 htlcswitch/queue_test.go diff --git a/htlcswitch/queue.go b/htlcswitch/queue.go new file mode 100644 index 00000000..08d848a4 --- /dev/null +++ b/htlcswitch/queue.go @@ -0,0 +1,117 @@ +package htlcswitch + +import ( + "container/list" + "sync" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// packetQueue represent the wrapper around the original queue plus the +// functionality for releasing the queue objects in object channel. Such +// structures allows storing of all pending object in queue before the moment of +// actual releasing. +// TODO(andrew.shvv) structure not preserve the order if object failed second +// time. +type packetQueue struct { + *list.List + sync.Mutex + + // pending channel is needed in order to send the object which should + // be re-proceed. + pending chan *htlcPacket + + // grab channel represents the channel-lock which is needed in order + // to make "release" goroutines block during other release goroutine + // processing. + grab chan struct{} +} + +func newWaitingQueue() *packetQueue { + // Initialize grab channel and release one slot, otherwise release + // function will block. + done := make(chan struct{}, 1) + done <- struct{}{} + + return &packetQueue{ + pending: make(chan *htlcPacket), + grab: done, + List: list.New(), + } +} + +// consume function take the given packet and store it in queue till release +// function will be executed. +func (q *packetQueue) consume(packet *htlcPacket) { + q.Lock() + defer q.Unlock() + + q.PushBack(packet) +} + +// release function spawn the goroutine which grab the object from pending +// queue and pass it in processing channel. +func (q *packetQueue) release() { + q.Lock() + defer q.Unlock() + + if q.Len() == 0 { + return + } + + go func() { + // Grab the pending mutex so that other goroutines waits + // before grabbing the object, otherwise the objects will be + // send in the pending channel in random sequence. + <-q.grab + + defer func() { + // Release the channel-lock and give other goroutines the + // ability to + q.grab <- struct{}{} + }() + + // Fetch object after releasing the pending mutex in order to + // preserver the order of the stored objects. + q.Lock() + e := q.Front() + q.Unlock() + + if e != nil { + // Send the object in object queue and wait it to + // be processed by other side. + q.pending <- e.Value.(*htlcPacket) + + // After object have been preprocessed remove it from + // the queue. + q.Lock() + q.Remove(e) + q.Unlock() + return + } + }() +} + +// length returns the number of pending htlc packets which is stored in queue. +func (q *packetQueue) length() int { + q.Lock() + defer q.Unlock() + + return q.Len() +} + +// pendingAmount returns the amount of money which is stored in pending queue. +func (q *packetQueue) pendingAmount() btcutil.Amount { + q.Lock() + defer q.Unlock() + + var amount btcutil.Amount + for e := q.Front(); e != nil; e = e.Next() { + packet := e.Value.(*htlcPacket) + htlc := packet.htlc.(*lnwire.UpdateAddHTLC) + amount += htlc.Amount + } + + return amount +} diff --git a/htlcswitch/queue_test.go b/htlcswitch/queue_test.go new file mode 100644 index 00000000..91c18f4c --- /dev/null +++ b/htlcswitch/queue_test.go @@ -0,0 +1,43 @@ +package htlcswitch + +import ( + "reflect" + "testing" + "time" + + "github.com/lightningnetwork/lnd/lnwire" + "github.com/roasbeef/btcutil" +) + +// TestWaitingQueueThreadSafety test the thread safety properties of the +// waiting queue, by executing methods in seprate goroutines which operates +// with the same data. +func TestWaitingQueueThreadSafety(t *testing.T) { + q := newWaitingQueue() + + a := make([]btcutil.Amount, 1000) + for i := 0; i < len(a); i++ { + a[i] = btcutil.Amount(i) + q.consume(&htlcPacket{ + amount: btcutil.Amount(i), + htlc: &lnwire.UpdateAddHTLC{}, + }) + } + + var b []btcutil.Amount + for i := 0; i < len(a); i++ { + q.release() + + select { + case packet := <-q.pending: + b = append(b, packet.amount) + + case <-time.After(2 * time.Second): + t.Fatal("timeout") + } + } + + if !reflect.DeepEqual(b, a) { + t.Fatal("wrong order of the objects") + } +}