htlcswitch/mailbox: rename Settle/Fail queue to indicate replies
This commit renames the variables associated with processing the Settle/Fail packets to indicate that they are replies.
This commit is contained in:
parent
ec1b8d874d
commit
55f90be2a5
@ -112,10 +112,12 @@ type memoryMailBox struct {
|
|||||||
messageOutbox chan lnwire.Message
|
messageOutbox chan lnwire.Message
|
||||||
msgReset chan chan struct{}
|
msgReset chan chan struct{}
|
||||||
|
|
||||||
htlcPkts *list.List
|
// repPkts is a queue for reply packets, e.g. Settles and Fails.
|
||||||
pktIndex map[CircuitKey]*list.Element
|
repPkts *list.List
|
||||||
pktHead *list.Element
|
repIndex map[CircuitKey]*list.Element
|
||||||
|
repHead *list.Element
|
||||||
|
|
||||||
|
// addPkts is a dedicated queue for Adds.
|
||||||
addPkts *list.List
|
addPkts *list.List
|
||||||
addIndex map[CircuitKey]*list.Element
|
addIndex map[CircuitKey]*list.Element
|
||||||
addHead *list.Element
|
addHead *list.Element
|
||||||
@ -136,13 +138,13 @@ func newMemoryMailBox(cfg *mailBoxConfig) *memoryMailBox {
|
|||||||
box := &memoryMailBox{
|
box := &memoryMailBox{
|
||||||
cfg: cfg,
|
cfg: cfg,
|
||||||
wireMessages: list.New(),
|
wireMessages: list.New(),
|
||||||
htlcPkts: list.New(),
|
repPkts: list.New(),
|
||||||
addPkts: list.New(),
|
addPkts: list.New(),
|
||||||
messageOutbox: make(chan lnwire.Message),
|
messageOutbox: make(chan lnwire.Message),
|
||||||
pktOutbox: make(chan *htlcPacket),
|
pktOutbox: make(chan *htlcPacket),
|
||||||
msgReset: make(chan chan struct{}, 1),
|
msgReset: make(chan chan struct{}, 1),
|
||||||
pktReset: make(chan chan struct{}, 1),
|
pktReset: make(chan chan struct{}, 1),
|
||||||
pktIndex: make(map[CircuitKey]*list.Element),
|
repIndex: make(map[CircuitKey]*list.Element),
|
||||||
addIndex: make(map[CircuitKey]*list.Element),
|
addIndex: make(map[CircuitKey]*list.Element),
|
||||||
wireShutdown: make(chan struct{}),
|
wireShutdown: make(chan struct{}),
|
||||||
pktShutdown: make(chan struct{}),
|
pktShutdown: make(chan struct{}),
|
||||||
@ -238,17 +240,17 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
|
|||||||
m.pktCond.L.Lock()
|
m.pktCond.L.Lock()
|
||||||
defer m.pktCond.L.Unlock()
|
defer m.pktCond.L.Unlock()
|
||||||
|
|
||||||
if entry, ok := m.pktIndex[inKey]; ok {
|
if entry, ok := m.repIndex[inKey]; ok {
|
||||||
// Check whether we are removing the head of the queue. If so,
|
// Check whether we are removing the head of the queue. If so,
|
||||||
// we must advance the head to the next packet before removing.
|
// we must advance the head to the next packet before removing.
|
||||||
// It's possible that the courier has already advanced the
|
// It's possible that the courier has already advanced the
|
||||||
// pktHead, so this check prevents the pktHead from getting
|
// repHead, so this check prevents the repHead from getting
|
||||||
// desynchronized.
|
// desynchronized.
|
||||||
if entry == m.pktHead {
|
if entry == m.repHead {
|
||||||
m.pktHead = entry.Next()
|
m.repHead = entry.Next()
|
||||||
}
|
}
|
||||||
m.htlcPkts.Remove(entry)
|
m.repPkts.Remove(entry)
|
||||||
delete(m.pktIndex, inKey)
|
delete(m.repIndex, inKey)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@ -282,7 +284,7 @@ func (m *memoryMailBox) AckPacket(inKey CircuitKey) bool {
|
|||||||
// bound for the switch that already have a queued response.
|
// bound for the switch that already have a queued response.
|
||||||
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
|
func (m *memoryMailBox) HasPacket(inKey CircuitKey) bool {
|
||||||
m.pktCond.L.Lock()
|
m.pktCond.L.Lock()
|
||||||
_, ok := m.pktIndex[inKey]
|
_, ok := m.repIndex[inKey]
|
||||||
m.pktCond.L.Unlock()
|
m.pktCond.L.Unlock()
|
||||||
|
|
||||||
return ok
|
return ok
|
||||||
@ -376,7 +378,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
|||||||
|
|
||||||
case pktCourier:
|
case pktCourier:
|
||||||
m.pktCond.L.Lock()
|
m.pktCond.L.Lock()
|
||||||
for m.pktHead == nil && m.addHead == nil {
|
for m.repHead == nil && m.addHead == nil {
|
||||||
m.pktCond.Wait()
|
m.pktCond.Wait()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
@ -385,7 +387,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
|||||||
// any un-ACK'd messages are re-delivered upon
|
// any un-ACK'd messages are re-delivered upon
|
||||||
// reconnect.
|
// reconnect.
|
||||||
case pktDone := <-m.pktReset:
|
case pktDone := <-m.pktReset:
|
||||||
m.pktHead = m.htlcPkts.Front()
|
m.repHead = m.repPkts.Front()
|
||||||
m.addHead = m.addPkts.Front()
|
m.addHead = m.addPkts.Front()
|
||||||
|
|
||||||
close(pktDone)
|
close(pktDone)
|
||||||
@ -399,8 +401,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
nextPkt *htlcPacket
|
nextRep *htlcPacket
|
||||||
nextPktEl *list.Element
|
nextRepEl *list.Element
|
||||||
nextAdd *pktWithExpiry
|
nextAdd *pktWithExpiry
|
||||||
nextAddEl *list.Element
|
nextAddEl *list.Element
|
||||||
nextMsg lnwire.Message
|
nextMsg lnwire.Message
|
||||||
@ -424,9 +426,9 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
|||||||
// pending Add if it's present. Due to clock
|
// pending Add if it's present. Due to clock
|
||||||
// monotonicity, we know that the head of the Adds is
|
// monotonicity, we know that the head of the Adds is
|
||||||
// the next to expire.
|
// the next to expire.
|
||||||
if m.pktHead != nil {
|
if m.repHead != nil {
|
||||||
nextPkt = m.pktHead.Value.(*htlcPacket)
|
nextRep = m.repHead.Value.(*htlcPacket)
|
||||||
nextPktEl = m.pktHead
|
nextRepEl = m.repHead
|
||||||
}
|
}
|
||||||
if m.addHead != nil {
|
if m.addHead != nil {
|
||||||
nextAdd = m.addHead.Value.(*pktWithExpiry)
|
nextAdd = m.addHead.Value.(*pktWithExpiry)
|
||||||
@ -479,8 +481,8 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
|||||||
// same channel, but we can control which is delivered
|
// same channel, but we can control which is delivered
|
||||||
// by exclusively making one nil and the other non-nil.
|
// by exclusively making one nil and the other non-nil.
|
||||||
// We know from our loop condition that at least one
|
// We know from our loop condition that at least one
|
||||||
// nextPkt and nextAdd are non-nil.
|
// nextRep and nextAdd are non-nil.
|
||||||
if nextPkt != nil {
|
if nextRep != nil {
|
||||||
pktOutbox = m.pktOutbox
|
pktOutbox = m.pktOutbox
|
||||||
} else {
|
} else {
|
||||||
addOutbox = m.pktOutbox
|
addOutbox = m.pktOutbox
|
||||||
@ -503,12 +505,12 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case pktOutbox <- nextPkt:
|
case pktOutbox <- nextRep:
|
||||||
m.pktCond.L.Lock()
|
m.pktCond.L.Lock()
|
||||||
// Only advance the pktHead if this Settle or
|
// Only advance the repHead if this Settle or
|
||||||
// Fail is still at the head of the queue.
|
// Fail is still at the head of the queue.
|
||||||
if m.pktHead != nil && m.pktHead == nextPktEl {
|
if m.repHead != nil && m.repHead == nextRepEl {
|
||||||
m.pktHead = m.pktHead.Next()
|
m.repHead = m.repHead.Next()
|
||||||
}
|
}
|
||||||
m.pktCond.L.Unlock()
|
m.pktCond.L.Unlock()
|
||||||
|
|
||||||
@ -526,7 +528,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
|||||||
|
|
||||||
case pktDone := <-m.pktReset:
|
case pktDone := <-m.pktReset:
|
||||||
m.pktCond.L.Lock()
|
m.pktCond.L.Lock()
|
||||||
m.pktHead = m.htlcPkts.Front()
|
m.repHead = m.repPkts.Front()
|
||||||
m.addHead = m.addPkts.Front()
|
m.addHead = m.addPkts.Front()
|
||||||
m.pktCond.L.Unlock()
|
m.pktCond.L.Unlock()
|
||||||
|
|
||||||
@ -566,17 +568,17 @@ func (m *memoryMailBox) AddPacket(pkt *htlcPacket) error {
|
|||||||
m.pktCond.L.Lock()
|
m.pktCond.L.Lock()
|
||||||
switch htlc := pkt.htlc.(type) {
|
switch htlc := pkt.htlc.(type) {
|
||||||
|
|
||||||
// Split off Settle/Fail packets into the htlcPkts queue.
|
// Split off Settle/Fail packets into the repPkts queue.
|
||||||
case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
|
case *lnwire.UpdateFulfillHTLC, *lnwire.UpdateFailHTLC:
|
||||||
if _, ok := m.pktIndex[pkt.inKey()]; ok {
|
if _, ok := m.repIndex[pkt.inKey()]; ok {
|
||||||
m.pktCond.L.Unlock()
|
m.pktCond.L.Unlock()
|
||||||
return ErrPacketAlreadyExists
|
return ErrPacketAlreadyExists
|
||||||
}
|
}
|
||||||
|
|
||||||
entry := m.htlcPkts.PushBack(pkt)
|
entry := m.repPkts.PushBack(pkt)
|
||||||
m.pktIndex[pkt.inKey()] = entry
|
m.repIndex[pkt.inKey()] = entry
|
||||||
if m.pktHead == nil {
|
if m.repHead == nil {
|
||||||
m.pktHead = entry
|
m.repHead = entry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Split off Add packets into the addPkts queue.
|
// Split off Add packets into the addPkts queue.
|
||||||
|
Loading…
Reference in New Issue
Block a user