htlcswitch/mailbox: adds mailOrchestrator
This commit is contained in:
parent
d3403306b6
commit
c290398b4b
@ -402,3 +402,174 @@ func (m *memoryMailBox) MessageOutBox() chan lnwire.Message {
|
||||
func (m *memoryMailBox) PacketOutBox() chan *htlcPacket {
|
||||
return m.pktOutbox
|
||||
}
|
||||
|
||||
// mailOrchestrator is responsible for coordinating the creation and lifecycle
|
||||
// of mailboxes used within the switch. It supports the ability to create
|
||||
// mailboxes, reassign their short channel id's, deliver htlc packets, and
|
||||
// queue packets for mailboxes that have not been created due to a link's late
|
||||
// registration.
|
||||
type mailOrchestrator struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
// mailboxes caches exactly one mailbox for all known channels.
|
||||
mailboxes map[lnwire.ChannelID]MailBox
|
||||
|
||||
// liveIndex maps a live short chan id to the primary mailbox key.
|
||||
// An index in liveIndex map is only entered under two conditions:
|
||||
// 1. A link has a non-zero short channel id at time of AddLink.
|
||||
// 2. A link receives a non-zero short channel via UpdateShortChanID.
|
||||
liveIndex map[lnwire.ShortChannelID]lnwire.ChannelID
|
||||
|
||||
// TODO(conner): add another pair of indexes:
|
||||
// chan_id -> short_chan_id
|
||||
// short_chan_id -> mailbox
|
||||
// so that Deliver can lookup mailbox directly once live,
|
||||
// but still queriable by channel_id.
|
||||
|
||||
// unclaimedPackets maps a live short chan id to queue of packets if no
|
||||
// mailbox has been created.
|
||||
unclaimedPackets map[lnwire.ShortChannelID][]*htlcPacket
|
||||
}
|
||||
|
||||
// newMailOrchestrator initializes a fresh mailOrchestrator.
|
||||
func newMailOrchestrator() *mailOrchestrator {
|
||||
return &mailOrchestrator{
|
||||
mailboxes: make(map[lnwire.ChannelID]MailBox),
|
||||
liveIndex: make(map[lnwire.ShortChannelID]lnwire.ChannelID),
|
||||
unclaimedPackets: make(map[lnwire.ShortChannelID][]*htlcPacket),
|
||||
}
|
||||
}
|
||||
|
||||
// Stop instructs the orchestrator to stop all active mailboxes.
|
||||
func (mo *mailOrchestrator) Stop() {
|
||||
for _, mailbox := range mo.mailboxes {
|
||||
mailbox.Stop()
|
||||
}
|
||||
}
|
||||
|
||||
// GetOrCreateMailBox returns an existing mailbox belonging to `chanID`, or
|
||||
// creates and returns a new mailbox if none is found.
|
||||
func (mo *mailOrchestrator) GetOrCreateMailBox(chanID lnwire.ChannelID) MailBox {
|
||||
// First, try lookup the mailbox directly using only the shared mutex.
|
||||
mo.mu.RLock()
|
||||
mailbox, ok := mo.mailboxes[chanID]
|
||||
if ok {
|
||||
mo.mu.RUnlock()
|
||||
return mailbox
|
||||
}
|
||||
mo.mu.RUnlock()
|
||||
|
||||
// Otherwise, we will try again with exclusive lock, creating a mailbox
|
||||
// if one still has not been created.
|
||||
mo.mu.Lock()
|
||||
mailbox = mo.exclusiveGetOrCreateMailBox(chanID)
|
||||
mo.mu.Unlock()
|
||||
|
||||
return mailbox
|
||||
}
|
||||
|
||||
// exclusiveGetOrCreateMailBox checks for the existence of a mailbox for the
|
||||
// given channel id. If none is found, a new one is creates, started, and
|
||||
// recorded.
|
||||
//
|
||||
// NOTE: This method MUST be invoked with the mailOrchestrator's exclusive lock.
|
||||
func (mo *mailOrchestrator) exclusiveGetOrCreateMailBox(
|
||||
chanID lnwire.ChannelID) MailBox {
|
||||
|
||||
mailbox, ok := mo.mailboxes[chanID]
|
||||
if !ok {
|
||||
mailbox = newMemoryMailBox()
|
||||
mailbox.Start()
|
||||
mo.mailboxes[chanID] = mailbox
|
||||
}
|
||||
|
||||
return mailbox
|
||||
}
|
||||
|
||||
// BindLiveShortChanID registers that messages bound for a particular short
|
||||
// channel id should be forwarded to the mailbox corresponding to the given
|
||||
// channel id. This method also checks to see if there are any unclaimed
|
||||
// packets for this short_chan_id. If any are found, they are delivered to the
|
||||
// mailbox and removed (marked as claimed).
|
||||
func (mo *mailOrchestrator) BindLiveShortChanID(mailbox MailBox,
|
||||
cid lnwire.ChannelID, sid lnwire.ShortChannelID) {
|
||||
|
||||
mo.mu.Lock()
|
||||
// Update the mapping from short channel id to mailbox's channel id.
|
||||
mo.liveIndex[sid] = cid
|
||||
|
||||
// Retrieve any unclaimed packets destined for this mailbox.
|
||||
pkts := mo.unclaimedPackets[sid]
|
||||
delete(mo.unclaimedPackets, sid)
|
||||
mo.mu.Unlock()
|
||||
|
||||
// Deliver the unclaimed packets.
|
||||
for _, pkt := range pkts {
|
||||
mailbox.AddPacket(pkt)
|
||||
}
|
||||
}
|
||||
|
||||
// Deliver lookups the target mailbox using the live index from short_chan_id
|
||||
// to channel_id. If the mailbox is found, the message is delivered directly.
|
||||
// Otherwise the packet is recorded as unclaimed, and will be delivered to the
|
||||
// mailbox upon the subsequent call to BindLiveShortChanID.
|
||||
func (mo *mailOrchestrator) Deliver(
|
||||
sid lnwire.ShortChannelID, pkt *htlcPacket) error {
|
||||
|
||||
var (
|
||||
mailbox MailBox
|
||||
found bool
|
||||
)
|
||||
|
||||
// First, try to find the channel id for the target short_chan_id. If
|
||||
// the link is live, we will also look up the created mailbox.
|
||||
mo.mu.RLock()
|
||||
chanID, isLive := mo.liveIndex[sid]
|
||||
if isLive {
|
||||
mailbox, found = mo.mailboxes[chanID]
|
||||
}
|
||||
mo.mu.RUnlock()
|
||||
|
||||
// The link is live and target mailbox was found, deliver immediately.
|
||||
if isLive && found {
|
||||
return mailbox.AddPacket(pkt)
|
||||
}
|
||||
|
||||
// If we detected that the link has not been made live, we will acquire
|
||||
// the exclusive lock preemptively in order to queue this packet in the
|
||||
// list of unclaimed packets.
|
||||
mo.mu.Lock()
|
||||
|
||||
// Double check to see if the mailbox has been not made live since the
|
||||
// release of the shared lock.
|
||||
//
|
||||
// NOTE: Checking again with the exclusive lock held prevents a race
|
||||
// condition where BindLiveShortChanID is interleaved between the
|
||||
// release of the shared lock, and acquiring the exclusive lock. The
|
||||
// result would be stuck packets, as they wouldn't be redelivered until
|
||||
// the next call to BindLiveShortChanID, which is expected to occur
|
||||
// infrequently.
|
||||
chanID, isLive = mo.liveIndex[sid]
|
||||
if isLive {
|
||||
// Reaching this point indicates the mailbox is actually live.
|
||||
// We'll try to load the mailbox using the fresh channel id.
|
||||
//
|
||||
// NOTE: This should never create a new mailbox, as the live
|
||||
// index should only be set if the mailbox had been initialized
|
||||
// beforehand. However, this does ensure that this case is
|
||||
// handled properly in the event that it could happen.
|
||||
mailbox = mo.exclusiveGetOrCreateMailBox(chanID)
|
||||
mo.mu.Unlock()
|
||||
|
||||
// Deliver the packet to the mailbox if it was found or created.
|
||||
return mailbox.AddPacket(pkt)
|
||||
}
|
||||
|
||||
// Finally, if the channel id is still not found in the live index,
|
||||
// we'll add this to the list of unclaimed packets. These will be
|
||||
// delivered upon the next call to BindLiveShortChanID.
|
||||
mo.unclaimedPackets[sid] = append(mo.unclaimedPackets[sid], pkt)
|
||||
mo.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user