diff --git a/peer.go b/peer.go index 27355988..66170fa8 100644 --- a/peer.go +++ b/peer.go @@ -391,6 +391,116 @@ func (p *peer) readNextMessage() (lnwire.Message, error) { return nextMsg, nil } +// chanMsgStream implements a goroutine-safe, in-order stream of messages to be +// delivered to an active channel. These messages MUST be in order due to the +// nature of the lightning channel commitment state machine. We utilize +// additional synchronization with the fundingManager to ensure we don't +// attempt to dispatch a message to a channel before it is fully active. +type chanMsgStream struct { + fundingMgr *fundingManager + htlcSwitch *htlcswitch.Switch + + cid lnwire.ChannelID + + peer *peer + + msgCond *sync.Cond + msgs []lnwire.Message + + chanLink htlcswitch.ChannelLink + + mtx sync.Mutex + + quit chan struct{} +} + +// newChanMsgStream creates a new instance of a chanMsgStream for a particular +// channel identified by its channel ID. +func newChanMsgStream(f *fundingManager, h *htlcswitch.Switch, p *peer, + c lnwire.ChannelID) *chanMsgStream { + + stream := &chanMsgStream{ + fundingMgr: f, + htlcSwitch: h, + peer: p, + cid: c, + quit: make(chan struct{}), + } + stream.msgCond = sync.NewCond(&stream.mtx) + + return stream +} + +// Start starts the chanMsgStream. +func (c *chanMsgStream) Start() { + go c.msgConsumer() +} + +// Stop stops the chanMsgStream. +func (c *chanMsgStream) Stop() { + // TODO(roasbeef): signal too? + + close(c.quit) +} + +// msgConsumer is the main goroutine that streams messages from the peer's +// readHandler directly to the target channel. +func (c *chanMsgStream) msgConsumer() { + peerLog.Tracef("Update stream for ChannelID(%x) created", c.cid[:]) + + for { + // First, we'll check our condition. If the queue of messages + // is empty, then we'll wait until a new item is added. + c.msgCond.L.Lock() + for len(c.msgs) == 0 { + c.msgCond.Wait() + } + + // Grab the message off the front of the queue, shifting the + // slice's reference down one in order to remove the message + // from the queue. + msg := c.msgs[0] + c.msgs = c.msgs[1:] + + // We'll send a message to the funding manager and wait iff an + // active funding process for this channel hasn't yet + // completed. We do this in order to account for the following + // scenario: we send the funding locked message to the other + // side, they immediately send a channel update message, but we + // haven't yet sent the channel to the channelManager. + c.fundingMgr.waitUntilChannelOpen(c.cid) + + // Dispatch the commitment update message to the proper active + // goroutine dedicated to this channel. + if c.chanLink == nil { + link, err := c.htlcSwitch.GetLink(c.cid) + if err != nil { + peerLog.Errorf("recv'd update for unknown "+ + "channel %v from %v", c.cid, c.peer) + continue + } + c.chanLink = link + } + + c.chanLink.HandleChannelUpdate(msg) + c.msgCond.L.Unlock() + } +} + +// AddMsg adds a new message to the chanMsgStream. This function is safe for +// concurrent access. +func (c *chanMsgStream) AddMsg(msg lnwire.Message) { + // First, we'll lock the condition, and add the message to the end of + // the message queue. + c.msgCond.L.Lock() + c.msgs = append(c.msgs, msg) + c.msgCond.L.Unlock() + + // With the message added, we signal to the msgConsumer that there are + // additional messages to consume. + c.msgCond.Signal() +} + // readHandler is responsible for reading messages off the wire in series, then // properly dispatching the handling of the message to the proper subsystem. //