diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 01474bee..8a596db1 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -402,3 +402,174 @@ func (m *memoryMailBox) MessageOutBox() chan lnwire.Message { func (m *memoryMailBox) PacketOutBox() chan *htlcPacket { return m.pktOutbox } + +// mailOrchestrator is responsible for coordinating the creation and lifecycle +// of mailboxes used within the switch. It supports the ability to create +// mailboxes, reassign their short channel id's, deliver htlc packets, and +// queue packets for mailboxes that have not been created due to a link's late +// registration. +type mailOrchestrator struct { + mu sync.RWMutex + + // mailboxes caches exactly one mailbox for all known channels. + mailboxes map[lnwire.ChannelID]MailBox + + // liveIndex maps a live short chan id to the primary mailbox key. + // An index in liveIndex map is only entered under two conditions: + // 1. A link has a non-zero short channel id at time of AddLink. + // 2. A link receives a non-zero short channel via UpdateShortChanID. + liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID + + // TODO(conner): add another pair of indexes: + // chan_id -> short_chan_id + // short_chan_id -> mailbox + // so that Deliver can lookup mailbox directly once live, + // but still queriable by channel_id. + + // unclaimedPackets maps a live short chan id to queue of packets if no + // mailbox has been created. + unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket +} + +// newMailOrchestrator initializes a fresh mailOrchestrator. +func newMailOrchestrator() *mailOrchestrator { + return &mailOrchestrator{ + mailboxes: make(map[lnwire.ChannelID]MailBox), + liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID), + unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket), + } +} + +// Stop instructs the orchestrator to stop all active mailboxes. +func (mo *mailOrchestrator) Stop() { + for _, mailbox := range mo.mailboxes { + mailbox.Stop() + } +} + +// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or +// creates and returns a new mailbox if none is found. +func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox { + // First, try lookup the mailbox directly using only the shared mutex. + mo.mu.RLock() + mailbox, ok := mo.mailboxes[chanID] + if ok { + mo.mu.RUnlock() + return mailbox + } + mo.mu.RUnlock() + + // Otherwise, we will try again with exclusive lock, creating a mailbox + // if one still has not been created. + mo.mu.Lock() + mailbox = mo.exclusiveGetOrCreateMailBox(chanID) + mo.mu.Unlock() + + return mailbox +} + +// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the +// given channel id. If none is found, a new one is creates, started, and +// recorded. +// +// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock. +func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox( + chanID lnwire.ChannelID) MailBox { + + mailbox, ok := mo.mailboxes[chanID] + if !ok { + mailbox = newMemoryMailBox() + mailbox.Start() + mo.mailboxes[chanID] = mailbox + } + + return mailbox +} + +// BindLiveShortChanID registers that messages bound for a particular short +// channel id should be forwarded to the mailbox corresponding to the given +// channel id. This method also checks to see if there are any unclaimed +// packets for this short_chan_id. If any are found, they are delivered to the +// mailbox and removed (marked as claimed). +func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox, + cid lnwire.ChannelID, sid lnwire.ShortChannelID) { + + mo.mu.Lock() + // Update the mapping from short channel id to mailbox's channel id. + mo.liveIndex[sid] = cid + + // Retrieve any unclaimed packets destined for this mailbox. + pkts := mo.unclaimedPackets[sid] + delete(mo.unclaimedPackets, sid) + mo.mu.Unlock() + + // Deliver the unclaimed packets. + for _, pkt := range pkts { + mailbox.AddPacket(pkt) + } +} + +// Deliver lookups the target mailbox using the live index from short_chan_id +// to channel_id. If the mailbox is found, the message is delivered directly. +// Otherwise the packet is recorded as unclaimed, and will be delivered to the +// mailbox upon the subsequent call to BindLiveShortChanID. +func (mo *mailOrchestrator) Deliver( + sid lnwire.ShortChannelID, pkt *htlcPacket) error { + + var ( + mailbox MailBox + found bool + ) + + // First, try to find the channel id for the target short_chan_id. If + // the link is live, we will also look up the created mailbox. + mo.mu.RLock() + chanID, isLive := mo.liveIndex[sid] + if isLive { + mailbox, found = mo.mailboxes[chanID] + } + mo.mu.RUnlock() + + // The link is live and target mailbox was found, deliver immediately. + if isLive && found { + return mailbox.AddPacket(pkt) + } + + // If we detected that the link has not been made live, we will acquire + // the exclusive lock preemptively in order to queue this packet in the + // list of unclaimed packets. + mo.mu.Lock() + + // Double check to see if the mailbox has been not made live since the + // release of the shared lock. + // + // NOTE: Checking again with the exclusive lock held prevents a race + // condition where BindLiveShortChanID is interleaved between the + // release of the shared lock, and acquiring the exclusive lock. The + // result would be stuck packets, as they wouldn't be redelivered until + // the next call to BindLiveShortChanID, which is expected to occur + // infrequently. + chanID, isLive = mo.liveIndex[sid] + if isLive { + // Reaching this point indicates the mailbox is actually live. + // We'll try to load the mailbox using the fresh channel id. + // + // NOTE: This should never create a new mailbox, as the live + // index should only be set if the mailbox had been initialized + // beforehand. However, this does ensure that this case is + // handled properly in the event that it could happen. + mailbox = mo.exclusiveGetOrCreateMailBox(chanID) + mo.mu.Unlock() + + // Deliver the packet to the mailbox if it was found or created. + return mailbox.AddPacket(pkt) + } + + // Finally, if the channel id is still not found in the live index, + // we'll add this to the list of unclaimed packets. These will be + // delivered upon the next call to BindLiveShortChanID. + mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt) + mo.mu.Unlock() + + return nil +}