peer: introduce chanMsgStream to provide a concurrent safe, in-order stream of msgs
This commit is contained in:
parent
5425eff09c
commit
5e4b368348
110
peer.go
110
peer.go
@ -391,6 +391,116 @@ func (p *peer) readNextMessage() (lnwire.Message, error) {
|
|||||||
return nextMsg, nil
|
return nextMsg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// chanMsgStream implements a goroutine-safe, in-order stream of messages to be
|
||||||
|
// delivered to an active channel. These messages MUST be in order due to the
|
||||||
|
// nature of the lightning channel commitment state machine. We utilize
|
||||||
|
// additional synchronization with the fundingManager to ensure we don't
|
||||||
|
// attempt to dispatch a message to a channel before it is fully active.
|
||||||
|
type chanMsgStream struct {
|
||||||
|
fundingMgr *fundingManager
|
||||||
|
htlcSwitch *htlcswitch.Switch
|
||||||
|
|
||||||
|
cid lnwire.ChannelID
|
||||||
|
|
||||||
|
peer *peer
|
||||||
|
|
||||||
|
msgCond *sync.Cond
|
||||||
|
msgs []lnwire.Message
|
||||||
|
|
||||||
|
chanLink htlcswitch.ChannelLink
|
||||||
|
|
||||||
|
mtx sync.Mutex
|
||||||
|
|
||||||
|
quit chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newChanMsgStream creates a new instance of a chanMsgStream for a particular
|
||||||
|
// channel identified by its channel ID.
|
||||||
|
func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer,
|
||||||
|
c lnwire.ChannelID) *chanMsgStream {
|
||||||
|
|
||||||
|
stream := &chanMsgStream{
|
||||||
|
fundingMgr: f,
|
||||||
|
htlcSwitch: h,
|
||||||
|
peer: p,
|
||||||
|
cid: c,
|
||||||
|
quit: make(chan struct{}),
|
||||||
|
}
|
||||||
|
stream.msgCond = sync.NewCond(&stream.mtx)
|
||||||
|
|
||||||
|
return stream
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the chanMsgStream.
|
||||||
|
func (c *chanMsgStream) Start() {
|
||||||
|
go c.msgConsumer()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the chanMsgStream.
|
||||||
|
func (c *chanMsgStream) Stop() {
|
||||||
|
// TODO(roasbeef): signal too?
|
||||||
|
|
||||||
|
close(c.quit)
|
||||||
|
}
|
||||||
|
|
||||||
|
// msgConsumer is the main goroutine that streams messages from the peer's
|
||||||
|
// readHandler directly to the target channel.
|
||||||
|
func (c *chanMsgStream) msgConsumer() {
|
||||||
|
peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:])
|
||||||
|
|
||||||
|
for {
|
||||||
|
// First, we'll check our condition. If the queue of messages
|
||||||
|
// is empty, then we'll wait until a new item is added.
|
||||||
|
c.msgCond.L.Lock()
|
||||||
|
for len(c.msgs) == 0 {
|
||||||
|
c.msgCond.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Grab the message off the front of the queue, shifting the
|
||||||
|
// slice's reference down one in order to remove the message
|
||||||
|
// from the queue.
|
||||||
|
msg := c.msgs[0]
|
||||||
|
c.msgs = c.msgs[1:]
|
||||||
|
|
||||||
|
// We'll send a message to the funding manager and wait iff an
|
||||||
|
// active funding process for this channel hasn't yet
|
||||||
|
// completed. We do this in order to account for the following
|
||||||
|
// scenario: we send the funding locked message to the other
|
||||||
|
// side, they immediately send a channel update message, but we
|
||||||
|
// haven't yet sent the channel to the channelManager.
|
||||||
|
c.fundingMgr.waitUntilChannelOpen(c.cid)
|
||||||
|
|
||||||
|
// Dispatch the commitment update message to the proper active
|
||||||
|
// goroutine dedicated to this channel.
|
||||||
|
if c.chanLink == nil {
|
||||||
|
link, err := c.htlcSwitch.GetLink(c.cid)
|
||||||
|
if err != nil {
|
||||||
|
peerLog.Errorf("recv'd update for unknown "+
|
||||||
|
"channel %v from %v", c.cid, c.peer)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
c.chanLink = link
|
||||||
|
}
|
||||||
|
|
||||||
|
c.chanLink.HandleChannelUpdate(msg)
|
||||||
|
c.msgCond.L.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddMsg adds a new message to the chanMsgStream. This function is safe for
|
||||||
|
// concurrent access.
|
||||||
|
func (c *chanMsgStream) AddMsg(msg lnwire.Message) {
|
||||||
|
// First, we'll lock the condition, and add the message to the end of
|
||||||
|
// the message queue.
|
||||||
|
c.msgCond.L.Lock()
|
||||||
|
c.msgs = append(c.msgs, msg)
|
||||||
|
c.msgCond.L.Unlock()
|
||||||
|
|
||||||
|
// With the message added, we signal to the msgConsumer that there are
|
||||||
|
// additional messages to consume.
|
||||||
|
c.msgCond.Signal()
|
||||||
|
}
|
||||||
|
|
||||||
// readHandler is responsible for reading messages off the wire in series, then
|
// readHandler is responsible for reading messages off the wire in series, then
|
||||||
// properly dispatching the handling of the message to the proper subsystem.
|
// properly dispatching the handling of the message to the proper subsystem.
|
||||||
//
|
//
|
||||||
|
Loading…
Reference in New Issue
Block a user