diff --git a/htlcswitch/interfaces.go b/htlcswitch/interfaces.go index 33a866fa..b3f88003 100644 --- a/htlcswitch/interfaces.go +++ b/htlcswitch/interfaces.go @@ -38,14 +38,22 @@ type InvoiceDatabase interface { // | // type ChannelLink interface { + // TODO(roasbeef): modify interface to embed mail boxes? + // HandleSwitchPacket handles the switch packets. This packets might be // forwarded to us from another channel link in case the htlc update // came from another peer or if the update was created by user // initially. + // + // NOTE: This function MUST be non-blocking (or block as little as + // possible). HandleSwitchPacket(*htlcPacket) // HandleChannelUpdate handles the htlc requests as settle/add/fail // which sent to us from remote peer we have a channel with. + // + // NOTE: This function MUST be non-blocking (or block as little as + // possible). HandleChannelUpdate(lnwire.Message) // ChanID returns the channel ID for the channel link. The channel ID diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 64c1273a..13dfe8ed 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -182,11 +182,11 @@ type channelLink struct { // been processed because of the commitment transaction overflow. overflowQueue *packetQueue - // availableBandwidth is an integer with units of millisatoshi which - // indicates the total available bandwidth of a link, taking into - // account any pending (uncommitted) HLTC's, and any HTLC's that are - // within the overflow queue. - availableBandwidth uint64 + // mailBox is the main interface between the outside world and the + // link. All incoming messages will be sent over this mailBox. Messages + // include new updates from our connected peer, and new packets to be + // forwarded sent by the switch. + mailBox *memoryMailBox // upstream is a channel that new messages sent from the remote peer to // the local peer will be sent across. @@ -219,19 +219,23 @@ type channelLink struct { func NewChannelLink(cfg ChannelLinkConfig, channel *lnwallet.LightningChannel, currentHeight uint32) ChannelLink { - return &channelLink{ + link := &channelLink{ cfg: cfg, channel: channel, - upstream: make(chan lnwire.Message), - downstream: make(chan *htlcPacket), + mailBox: newMemoryMailBox(), linkControl: make(chan interface{}), // TODO(roasbeef): just do reserve here? availableBandwidth: uint64(channel.StateSnapshot().LocalBalance), - logCommitTimer: time.NewTimer(300 * time.Millisecond), - overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), - bestHeight: currentHeight, - quit: make(chan struct{}), + logCommitTimer: time.NewTimer(300 * time.Millisecond), + overflowQueue: newPacketQueue(lnwallet.MaxHTLCNumber / 2), + bestHeight: currentHeight, + quit: make(chan struct{}), } + + link.upstream = link.mailBox.MessageOutBox() + link.downstream = link.mailBox.PacketOutBox() + + return link } // A compile time check to ensure channelLink implements the ChannelLink @@ -1020,10 +1024,7 @@ func (l *channelLink) String() string { // // NOTE: Part of the ChannelLink interface. func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) { - select { - case l.downstream <- packet: - case <-l.quit: - } + l.mailBox.AddPacket(packet) } // HandleChannelUpdate handles the htlc requests as settle/add/fail which sent @@ -1031,10 +1032,7 @@ func (l *channelLink) HandleSwitchPacket(packet *htlcPacket) { // // NOTE: Part of the ChannelLink interface. func (l *channelLink) HandleChannelUpdate(message lnwire.Message) { - select { - case l.upstream <- message: - case <-l.quit: - } + l.mailBox.AddMessage(message) } // updateChannelFee updates the commitment fee-per-kw on this channel by diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index d2cf6418..419849be 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -60,7 +60,7 @@ const ( // should be attempted. CloseRegular ChannelCloseType = iota - // CloseBreach indicates that a channel breach has been dtected, and + // CloseBreach indicates that a channel breach has been detected, and // the link should immediately be marked as unavailable. CloseBreach ) @@ -464,7 +464,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return err } - go source.HandleSwitchPacket(newFailPacket( + source.HandleSwitchPacket(newFailPacket( packet.src, &lnwire.UpdateFailHTLC{ Reason: reason, @@ -504,7 +504,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return err } - go source.HandleSwitchPacket(newFailPacket( + source.HandleSwitchPacket(newFailPacket( packet.src, &lnwire.UpdateFailHTLC{ Reason: reason, @@ -538,7 +538,7 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return err } - go source.HandleSwitchPacket(newFailPacket( + source.HandleSwitchPacket(newFailPacket( packet.src, &lnwire.UpdateFailHTLC{ Reason: reason,