From 4548e4497d3812cd0e3d1f63397218389c810e8d Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 9 Jul 2016 16:41:06 -0700 Subject: [PATCH] lnd: set up messaging chans between peer and htlcSwitch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Each active channel now gains its a dedicated htlcManager goroutine which currently accepts to two golang channels, and a lightning channel. The “downstream” channel will be used for dispatched multi-hop payments sent from the htlcSwitch, while the “upstream” channel will be used once the readHandler de-multiplexes hltc add/timeout/settle messages. Each time a new channel is fully created after N confirmations, the peer’s channelManager registers the new link with the htlcSwitch. Once the channel is closed either cooperatively or non-cooperatively, then the link is unregistered. --- log.go | 5 ++++ peer.go | 78 ++++++++++++++++++++++++++++++++++++++++----------------- 2 files changed, 60 insertions(+), 23 deletions(-) diff --git a/log.go b/log.go index a2da9ffd..cdb52210 100644 --- a/log.go +++ b/log.go @@ -25,6 +25,7 @@ var ( srvrLog = btclog.Disabled ntfnLog = btclog.Disabled chdbLog = btclog.Disabled + hswcLog = btclog.Disabled ) // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -37,6 +38,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "NTFN": ntfnLog, "CHDB": chdbLog, "FNDG": fndgLog, + "HSWC": hswcLog, } // useLogger updates the logger references for subsystemID to logger. Invalid @@ -74,6 +76,9 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "FNDG": fndgLog = logger + + case "HSWC": + hswcLog = logger } } diff --git a/peer.go b/peer.go index 2c76ad84..58b8f3a1 100644 --- a/peer.go +++ b/peer.go @@ -2,6 +2,7 @@ package main import ( "container/list" + "fmt" "net" "sync" "sync/atomic" @@ -100,6 +101,8 @@ type peer struct { activeChannels map[wire.OutPoint]*lnwallet.LightningChannel chanSnapshotReqs chan *chanSnapshotReq + htlcManagers map[wire.OutPoint]chan lnwire.Message + // newChanBarriers is a map from a channel point to a 'barrier' which // will be signalled once the channel is fully open. This barrier acts // as a synchronization point for any incoming/outgoing HTLCs before @@ -115,7 +118,7 @@ type peer struct { // localCloseChanReqs is a channel in which any local requests to // close a particular channel are sent over. - localCloseChanReqs chan *closeChanReq + localCloseChanReqs chan *closeLinkReq // remoteCloseChanReqs is a channel in which any remote requests // (initiated by the remote peer) close a particular channel are sent @@ -160,10 +163,11 @@ func newPeer(conn net.Conn, server *server, net wire.BitcoinNet, inbound bool) ( newChanBarriers: make(map[wire.OutPoint]chan struct{}), activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), + htlcManagers: make(map[wire.OutPoint]chan lnwire.Message), chanSnapshotReqs: make(chan *chanSnapshotReq), newChannels: make(chan *lnwallet.LightningChannel, 1), - localCloseChanReqs: make(chan *closeChanReq), + localCloseChanReqs: make(chan *closeLinkReq), remoteCloseChanReqs: make(chan *lnwire.CloseRequest), queueQuit: make(chan struct{}), @@ -214,10 +218,16 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { p.activeChannels[chanPoint] = lnChan peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint) - // Update the server's global channel index. - p.server.chanIndexMtx.Lock() - p.server.chanIndex[chanPoint] = p - p.server.chanIndexMtx.Unlock() + // Register this new channel link with the HTLC Switch. This is + // necessary to properly route multi-hop payments, and forward + // new payments triggered by RPC clients. + downstreamLink := make(chan lnwire.Message) + p.server.htlcSwitch.RegisterLink(p, dbChan.Snapshot(), downstreamLink) + + upstreamLink := make(chan lnwire.Message) + p.htlcManagers[chanPoint] = upstreamLink + p.wg.Add(1) + go p.htlcManager(lnChan, downstreamLink, upstreamLink) } return nil @@ -233,12 +243,11 @@ func (p *peer) Start() error { peerLog.Tracef("peer %v starting", p) - p.wg.Add(5) + p.wg.Add(4) go p.readHandler() go p.queueHandler() go p.writeHandler() go p.channelManager() - go p.htlcManager() return nil } @@ -472,13 +481,16 @@ out: peerLog.Infof("New channel active ChannelPoint(%v) "+ "with peerId(%v)", chanPoint, p.id) - // Now that the channel is open, update the server's - // map of channels to the peers we have a particular - // channel open to. - // TODO(roasbeef): should server have this knowledge? - p.server.chanIndexMtx.Lock() - p.server.chanIndex[chanPoint] = p - p.server.chanIndexMtx.Unlock() + // Now that the channel is open, notify the Htlc + // Switch of a new active link. + chanSnapShot := newChan.StateSnapshot() + downstreamLink := make(chan lnwire.Message) + p.server.htlcSwitch.RegisterLink(p, chanSnapShot, downstreamLink) + + upstreamLink := make(chan lnwire.Message) + p.htlcManagers[chanPoint] = upstreamLink + p.wg.Add(1) + go p.htlcManager(newChan, downstreamLink, upstreamLink) case req := <-p.localCloseChanReqs: p.handleLocalClose(req) case req := <-p.remoteCloseChanReqs: @@ -493,7 +505,7 @@ out: // handleLocalClose kicks-off the workflow to execute a cooperative closure of // the channel initiated by a local sub-system. -func (p *peer) handleLocalClose(req *closeChanReq) { +func (p *peer) handleLocalClose(req *closeLinkReq) { chanPoint := req.chanPoint key := wire.OutPoint{ Hash: chanPoint.Hash, @@ -560,7 +572,7 @@ func (p *peer) handleLocalClose(req *closeChanReq) { // Respond to the local sub-system which requested the channel // closure. - req.resp <- &closeChanResp{txid, success} + req.resp <- &closeLinkResp{txid, success} req.err <- nil }() } @@ -610,13 +622,16 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) { // wipeChannel removes the passed channel from all indexes associated with the // peer, and deletes the channel from the database. func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { - chanID := *channel.ChannelPoint() + chanID := channel.ChannelPoint() - delete(p.activeChannels, chanID) + delete(p.activeChannels, *chanID) - p.server.chanIndexMtx.Lock() - delete(p.server.chanIndex, chanID) - p.server.chanIndexMtx.Unlock() + // Instruct the Htlc Switch to close this link as the channel is no + // longer active. + p.server.htlcSwitch.UnregisterLink(p.lightningID, chanID) + htlcWireLink := p.htlcManagers[*chanID] + delete(p.htlcManagers, *chanID) + close(htlcWireLink) if err := channel.DeleteState(); err != nil { peerLog.Errorf("Unable to delete ChannelPoint(%v) "+ @@ -628,10 +643,27 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { // * communicates with the htlc switch over several channels // * in handler sends to this goroutine after getting final revocation // * has timeouts etc, to send back on queue handler in case of timeout -func (p *peer) htlcManager() { +// TODO(roabseef): split downstream link into two chans (send vs recv) +func (p *peer) htlcManager(channel *lnwallet.LightningChannel, + downstreamLink chan lnwire.Message, upstreamLink chan lnwire.Message) { + + peerLog.Tracef("htlc manager for channel %v started", + channel.ChannelPoint()) + out: for { select { + case htlcPkt := <-downstreamLink: + fmt.Println(htlcPkt) + case msg, ok := <-upstreamLink: + // If the upstream message link is closed, this signals + // that the channel itself is being closed, therefore + // we exit. + if !ok { + break out + } + + fmt.Println(msg) case <-p.quit: break out }