From 890559edfa37a7a9578e92e54bb3e7b365d74cfe Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 10 Nov 2017 14:38:38 -0800 Subject: [PATCH] htlcswitch: add new mailBox abstraction to the package In this commit, we add a new abstraction to the package: the mailBox. The mailBox is a non-blocking, concurrent safe, in-order queue for delivering messages to a given channelLink instance. With this abstraction in place, we can now allow the switch to no longer launch a new goroutine for each forwarded HTLC, or instantiated user payment. --- htlcswitch/mailbox.go | 248 +++++++++++++++++++++++++++++++++++++ htlcswitch/mailbox_test.go | 100 +++++++++++++++ 2 files changed, 348 insertions(+) create mode 100644 htlcswitch/mailbox.go create mode 100644 htlcswitch/mailbox_test.go diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go new file mode 100644 index 00000000..309e79d7 --- /dev/null +++ b/htlcswitch/mailbox.go @@ -0,0 +1,248 @@ +package htlcswitch + +import ( + "sync" + + "github.com/lightningnetwork/lnd/lnwire" +) + +// mailBox is an interface which represents a concurrent-safe, in-order +// delivery queue for messages from the network and also from the main switch. +// This struct servers as a buffer between incoming messages, and messages to +// the handled by the link. Each of the mutating methods within this interface +// should be implemented in a non-blocking manner. +type mailBox interface { + // AddMessage appends a new message to the end of the message queue. + AddMessage(msg lnwire.Message) error + + // AddPacket appends a new message to the end of the packet queue. + AddPacket(pkt *htlcPacket) error + + // MessageOutBox returns a channel that any new messages ready for + // delivery will be sent on. + MessageOutBox() chan lnwire.Message + + // PacketOutBox returns a channel that any new packets ready for + // delivery will be sent on. + PacketOutBox() chan *htlcPacket + + // Start starts the mailbox and any goroutines it needs to operate + // properly. + Start() error + + // Stop signals the mailbox and its goroutines for a graceful shutdown. + Stop() error +} + +// memoryMailBox is an implementation of the mailBox struct backed by purely +// in-memory queues. +type memoryMailBox struct { + wireMessages []lnwire.Message + wireMtx sync.Mutex + wireCond *sync.Cond + + messageOutbox chan lnwire.Message + + htlcPkts []*htlcPacket + pktMtx sync.Mutex + pktCond *sync.Cond + + pktOutbox chan *htlcPacket + + wg sync.WaitGroup + quit chan struct{} +} + +// newMemoryMailBox creates a new instance of the memoryMailBox. +func newMemoryMailBox() *memoryMailBox { + box := &memoryMailBox{ + quit: make(chan struct{}), + messageOutbox: make(chan lnwire.Message), + pktOutbox: make(chan *htlcPacket), + } + box.wireCond = sync.NewCond(&box.wireMtx) + box.pktCond = sync.NewCond(&box.pktMtx) + + return box +} + +// A compile time assertion to ensure that memoryMailBox meets the mailBox +// interface. +var _ mailBox = (*memoryMailBox)(nil) + +// courierType is an enum that reflects the distinct types of messages a +// mailBox can handle. Each type will be placed in an isolated mail box and +// will have a dedicated goroutine for delivering the messages. +type courierType uint8 + +const ( + // wireCourier is a type of courier that handles wire messages. + wireCourier courierType = iota + + // pktCourier is a type of courier that handles hltc packets. + pktCourier +) + +// Start starts the mailbox and any goroutines it needs to operate properly. +// +// NOTE: This method is part of the mailBox interface. +func (m *memoryMailBox) Start() error { + m.wg.Add(2) + go m.mailCourier(wireCourier) + go m.mailCourier(pktCourier) + + return nil +} + +// Stop signals the mailbox and its goroutines for a graceful shutdown. +// +// NOTE: This method is part of the mailBox interface. +func (m *memoryMailBox) Stop() error { + close(m.quit) + + m.wireCond.Signal() + m.pktCond.Signal() + + return nil +} + +// mailCourier is a dedicated goroutine whose job is to reliably deliver +// messages of a particular type. There are two types of couriers: wire +// couriers, and mail couriers. Depending on the passed courierType, this +// goroutine will assume one of two roles. +func (m *memoryMailBox) mailCourier(cType courierType) { + defer m.wg.Done() + + // TODO(roasbeef): refactor... + + for { + // First, we'll check our condition. If our target mailbox is + // empty, then we'll wait until a new item is added. + switch cType { + case wireCourier: + m.wireCond.L.Lock() + for len(m.wireMessages) == 0 { + m.wireCond.Wait() + + select { + case <-m.quit: + m.wireCond.L.Unlock() + return + default: + } + } + + case pktCourier: + m.pktCond.L.Lock() + for len(m.htlcPkts) == 0 { + m.pktCond.Wait() + + select { + case <-m.quit: + m.pktCond.L.Unlock() + return + default: + } + } + } + + // Grab the datum off the front of the queue, shifting the + // slice's reference down one in order to remove the datum from + // the queue. + var ( + nextPkt *htlcPacket + nextMsg lnwire.Message + ) + switch cType { + case wireCourier: + nextMsg = m.wireMessages[0] + m.wireMessages[0] = nil // Set to nil to prevent GC leak. + m.wireMessages = m.wireMessages[1:] + case pktCourier: + nextPkt = m.htlcPkts[0] + m.htlcPkts[0] = nil // Set to nil to prevent GC leak. + m.htlcPkts = m.htlcPkts[1:] + } + + // Now that we're done with the condition, we can unlock it to + // allow any callers to append to the end of our target queue. + switch cType { + case wireCourier: + m.wireCond.L.Unlock() + case pktCourier: + m.pktCond.L.Unlock() + } + + // With the next message obtained, we'll now select to attempt + // to deliver the message. If we receive a kill signal, then + // we'll bail out. + switch cType { + case wireCourier: + select { + case m.messageOutbox <- nextMsg: + case <-m.quit: + return + } + + case pktCourier: + select { + case m.pktOutbox <- nextPkt: + case <-m.quit: + return + } + } + + } +} + +// AddMessage appends a new message to the end of the message queue. +// +// NOTE: This method is safe for concrete use and part of the mailBox +// interface. +func (m *memoryMailBox) AddMessage(msg lnwire.Message) error { + // First, we'll lock the condition, and add the message to the end of + // the wire message inbox. + m.wireCond.L.Lock() + m.wireMessages = append(m.wireMessages, msg) + m.wireCond.L.Unlock() + + // With the message added, we signal to the mailCourier that there are + // additional messages to deliver. + m.wireCond.Signal() + + return nil +} + +// AddPacket appends a new message to the end of the packet queue. +// +// NOTE: This method is safe for concrete use and part of the mailBox +// interface. +func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error { + // First, we'll lock the condition, and add the packet to the end of + // the htlc packet inbox. + m.pktCond.L.Lock() + m.htlcPkts = append(m.htlcPkts, pkt) + m.pktCond.L.Unlock() + + // With the packet added, we signal to the mailCourier that there are + // additional packets to consume. + m.pktCond.Signal() + + return nil +} + +// MessageOutBox returns a channel that any new messages ready for delivery +// will be sent on. +// +// NOTE: This method is part of the mailBox interface. +func (m *memoryMailBox) MessageOutBox() chan lnwire.Message { + return m.messageOutbox +} + +// PacketOutBox returns a channel that any new packets ready for delivery will +// be sent on. +// +// NOTE: This method is part of the mailBox interface. +func (m *memoryMailBox) PacketOutBox() chan *htlcPacket { + return m.pktOutbox +} diff --git a/htlcswitch/mailbox_test.go b/htlcswitch/mailbox_test.go new file mode 100644 index 00000000..9c3b1044 --- /dev/null +++ b/htlcswitch/mailbox_test.go @@ -0,0 +1,100 @@ +package htlcswitch + +import ( + prand "math/rand" + "reflect" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/lnwire" +) + +// TestMailBoxCouriers tests that both aspects of the mailBox struct works +// properly. Both packets and messages should be able to added to each +// respective mailbox concurrently, and also messages/packets should also be +// able to be received concurrently. +func TestMailBoxCouriers(t *testing.T) { + t.Parallel() + + // First, we'll create new instance of the current default mailbox + // type. + mailBox := newMemoryMailBox() + mailBox.Start() + defer mailBox.Stop() + + // We'll be adding 10 message of both types to the mailbox. + const numPackets = 10 + + // We'll add a set of random packets to the mailbox. + sentPackets := make([]*htlcPacket, numPackets) + for i := 0; i < numPackets; i++ { + pkt := &htlcPacket{ + dest: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), + src: lnwire.NewShortChanIDFromInt(uint64(prand.Int63())), + amount: lnwire.MilliSatoshi(prand.Int63()), + isObfuscated: i%2 == 0, + } + sentPackets[i] = pkt + + mailBox.AddPacket(pkt) + } + + // Next, we'll do the same, but this time adding wire messages. + sentMessages := make([]lnwire.Message, numPackets) + for i := 0; i < numPackets; i++ { + msg := &lnwire.UpdateAddHTLC{ + ID: uint64(prand.Int63()), + Amount: lnwire.MilliSatoshi(prand.Int63()), + } + sentMessages[i] = msg + + mailBox.AddMessage(msg) + } + + // Now we'll attempt to read back the packets/messages we added to the + // mailbox. We'll alternative reading from the message outbox vs the + // packet outbox to ensure that they work concurrently properly. + recvdPackets := make([]*htlcPacket, 0, numPackets) + recvdMessages := make([]lnwire.Message, 0, numPackets) + for i := 0; i < numPackets*2; i++ { + timeout := time.After(time.Second * 5) + if i%2 == 0 { + select { + case <-timeout: + t.Fatalf("didn't recv pkt after timeout") + case pkt := <-mailBox.PacketOutBox(): + recvdPackets = append(recvdPackets, pkt) + } + } else { + select { + case <-timeout: + t.Fatalf("didn't recv message after timeout") + case msg := <-mailBox.MessageOutBox(): + recvdMessages = append(recvdMessages, msg) + } + } + } + + // The number of messages/packets we sent, and the number we received + // should match exactly. + if len(sentPackets) != len(recvdPackets) { + t.Fatalf("expected %v packets instead got %v", len(sentPackets), + len(recvdPackets)) + } + if len(sentMessages) != len(recvdMessages) { + t.Fatalf("expected %v messages instead got %v", len(sentMessages), + len(recvdMessages)) + } + + // Additionally, the set of packets should match exactly, as we should + // have received the packets int he exact same ordering that we added. + if !reflect.DeepEqual(sentPackets, recvdPackets) { + t.Fatalf("recvd packets mismatched: expected %v, got %v", + spew.Sdump(sentPackets), spew.Sdump(recvdPackets)) + } + if !reflect.DeepEqual(recvdMessages, recvdMessages) { + t.Fatalf("recvd messages mismatched: expected %v, got %v", + spew.Sdump(sentMessages), spew.Sdump(recvdMessages)) + } +}