lnd: set up messaging chans between peer and htlcSwitch

Each active channel now gains its a dedicated htlcManager goroutine
which currently accepts to two golang channels, and a lightning
channel. The “downstream” channel will be used for dispatched multi-hop
payments sent from the htlcSwitch, while the “upstream” channel will be
used once the readHandler de-multiplexes hltc add/timeout/settle
messages.

Each time a new channel is fully created after N confirmations, the
peer’s channelManager registers the new link with the htlcSwitch. Once
the channel is closed either cooperatively or non-cooperatively, then
the link is unregistered.
This commit is contained in:
Olaoluwa Osuntokun 2016-07-09 16:41:06 -07:00
parent 5fab6ea39e
commit 4548e4497d
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 60 additions and 23 deletions

5
log.go

@ -25,6 +25,7 @@ var (
srvrLog = btclog.Disabled srvrLog = btclog.Disabled
ntfnLog = btclog.Disabled ntfnLog = btclog.Disabled
chdbLog = btclog.Disabled chdbLog = btclog.Disabled
hswcLog = btclog.Disabled
) )
// subsystemLoggers maps each subsystem identifier to its associated logger. // subsystemLoggers maps each subsystem identifier to its associated logger.
@ -37,6 +38,7 @@ var subsystemLoggers = map[string]btclog.Logger{
"NTFN": ntfnLog, "NTFN": ntfnLog,
"CHDB": chdbLog, "CHDB": chdbLog,
"FNDG": fndgLog, "FNDG": fndgLog,
"HSWC": hswcLog,
} }
// useLogger updates the logger references for subsystemID to logger. Invalid // useLogger updates the logger references for subsystemID to logger. Invalid
@ -74,6 +76,9 @@ func useLogger(subsystemID string, logger btclog.Logger) {
case "FNDG": case "FNDG":
fndgLog = logger fndgLog = logger
case "HSWC":
hswcLog = logger
} }
} }

78
peer.go

@ -2,6 +2,7 @@ package main
import ( import (
"container/list" "container/list"
"fmt"
"net" "net"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -100,6 +101,8 @@ type peer struct {
activeChannels map[wire.OutPoint]*lnwallet.LightningChannel activeChannels map[wire.OutPoint]*lnwallet.LightningChannel
chanSnapshotReqs chan *chanSnapshotReq chanSnapshotReqs chan *chanSnapshotReq
htlcManagers map[wire.OutPoint]chan lnwire.Message
// newChanBarriers is a map from a channel point to a 'barrier' which // newChanBarriers is a map from a channel point to a 'barrier' which
// will be signalled once the channel is fully open. This barrier acts // will be signalled once the channel is fully open. This barrier acts
// as a synchronization point for any incoming/outgoing HTLCs before // as a synchronization point for any incoming/outgoing HTLCs before
@ -115,7 +118,7 @@ type peer struct {
// localCloseChanReqs is a channel in which any local requests to // localCloseChanReqs is a channel in which any local requests to
// close a particular channel are sent over. // close a particular channel are sent over.
localCloseChanReqs chan *closeChanReq localCloseChanReqs chan *closeLinkReq
// remoteCloseChanReqs is a channel in which any remote requests // remoteCloseChanReqs is a channel in which any remote requests
// (initiated by the remote peer) close a particular channel are sent // (initiated by the remote peer) close a particular channel are sent
@ -160,10 +163,11 @@ func newPeer(conn net.Conn, server *server, net wire.BitcoinNet, inbound bool) (
newChanBarriers: make(map[wire.OutPoint]chan struct{}), newChanBarriers: make(map[wire.OutPoint]chan struct{}),
activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel),
htlcManagers: make(map[wire.OutPoint]chan lnwire.Message),
chanSnapshotReqs: make(chan *chanSnapshotReq), chanSnapshotReqs: make(chan *chanSnapshotReq),
newChannels: make(chan *lnwallet.LightningChannel, 1), newChannels: make(chan *lnwallet.LightningChannel, 1),
localCloseChanReqs: make(chan *closeChanReq), localCloseChanReqs: make(chan *closeLinkReq),
remoteCloseChanReqs: make(chan *lnwire.CloseRequest), remoteCloseChanReqs: make(chan *lnwire.CloseRequest),
queueQuit: make(chan struct{}), queueQuit: make(chan struct{}),
@ -214,10 +218,16 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
p.activeChannels[chanPoint] = lnChan p.activeChannels[chanPoint] = lnChan
peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint) peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint)
// Update the server's global channel index. // Register this new channel link with the HTLC Switch. This is
p.server.chanIndexMtx.Lock() // necessary to properly route multi-hop payments, and forward
p.server.chanIndex[chanPoint] = p // new payments triggered by RPC clients.
p.server.chanIndexMtx.Unlock() downstreamLink := make(chan lnwire.Message)
p.server.htlcSwitch.RegisterLink(p, dbChan.Snapshot(), downstreamLink)
upstreamLink := make(chan lnwire.Message)
p.htlcManagers[chanPoint] = upstreamLink
p.wg.Add(1)
go p.htlcManager(lnChan, downstreamLink, upstreamLink)
} }
return nil return nil
@ -233,12 +243,11 @@ func (p *peer) Start() error {
peerLog.Tracef("peer %v starting", p) peerLog.Tracef("peer %v starting", p)
p.wg.Add(5) p.wg.Add(4)
go p.readHandler() go p.readHandler()
go p.queueHandler() go p.queueHandler()
go p.writeHandler() go p.writeHandler()
go p.channelManager() go p.channelManager()
go p.htlcManager()
return nil return nil
} }
@ -472,13 +481,16 @@ out:
peerLog.Infof("New channel active ChannelPoint(%v) "+ peerLog.Infof("New channel active ChannelPoint(%v) "+
"with peerId(%v)", chanPoint, p.id) "with peerId(%v)", chanPoint, p.id)
// Now that the channel is open, update the server's // Now that the channel is open, notify the Htlc
// map of channels to the peers we have a particular // Switch of a new active link.
// channel open to. chanSnapShot := newChan.StateSnapshot()
// TODO(roasbeef): should server have this knowledge? downstreamLink := make(chan lnwire.Message)
p.server.chanIndexMtx.Lock() p.server.htlcSwitch.RegisterLink(p, chanSnapShot, downstreamLink)
p.server.chanIndex[chanPoint] = p
p.server.chanIndexMtx.Unlock() upstreamLink := make(chan lnwire.Message)
p.htlcManagers[chanPoint] = upstreamLink
p.wg.Add(1)
go p.htlcManager(newChan, downstreamLink, upstreamLink)
case req := <-p.localCloseChanReqs: case req := <-p.localCloseChanReqs:
p.handleLocalClose(req) p.handleLocalClose(req)
case req := <-p.remoteCloseChanReqs: case req := <-p.remoteCloseChanReqs:
@ -493,7 +505,7 @@ out:
// handleLocalClose kicks-off the workflow to execute a cooperative closure of // handleLocalClose kicks-off the workflow to execute a cooperative closure of
// the channel initiated by a local sub-system. // the channel initiated by a local sub-system.
func (p *peer) handleLocalClose(req *closeChanReq) { func (p *peer) handleLocalClose(req *closeLinkReq) {
chanPoint := req.chanPoint chanPoint := req.chanPoint
key := wire.OutPoint{ key := wire.OutPoint{
Hash: chanPoint.Hash, Hash: chanPoint.Hash,
@ -560,7 +572,7 @@ func (p *peer) handleLocalClose(req *closeChanReq) {
// Respond to the local sub-system which requested the channel // Respond to the local sub-system which requested the channel
// closure. // closure.
req.resp <- &closeChanResp{txid, success} req.resp <- &closeLinkResp{txid, success}
req.err <- nil req.err <- nil
}() }()
} }
@ -610,13 +622,16 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
// wipeChannel removes the passed channel from all indexes associated with the // wipeChannel removes the passed channel from all indexes associated with the
// peer, and deletes the channel from the database. // peer, and deletes the channel from the database.
func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { func wipeChannel(p *peer, channel *lnwallet.LightningChannel) {
chanID := *channel.ChannelPoint() chanID := channel.ChannelPoint()
delete(p.activeChannels, chanID) delete(p.activeChannels, *chanID)
p.server.chanIndexMtx.Lock() // Instruct the Htlc Switch to close this link as the channel is no
delete(p.server.chanIndex, chanID) // longer active.
p.server.chanIndexMtx.Unlock() p.server.htlcSwitch.UnregisterLink(p.lightningID, chanID)
htlcWireLink := p.htlcManagers[*chanID]
delete(p.htlcManagers, *chanID)
close(htlcWireLink)
if err := channel.DeleteState(); err != nil { if err := channel.DeleteState(); err != nil {
peerLog.Errorf("Unable to delete ChannelPoint(%v) "+ peerLog.Errorf("Unable to delete ChannelPoint(%v) "+
@ -628,10 +643,27 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) {
// * communicates with the htlc switch over several channels // * communicates with the htlc switch over several channels
// * in handler sends to this goroutine after getting final revocation // * in handler sends to this goroutine after getting final revocation
// * has timeouts etc, to send back on queue handler in case of timeout // * has timeouts etc, to send back on queue handler in case of timeout
func (p *peer) htlcManager() { // TODO(roabseef): split downstream link into two chans (send vs recv)
func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
downstreamLink chan lnwire.Message, upstreamLink chan lnwire.Message) {
peerLog.Tracef("htlc manager for channel %v started",
channel.ChannelPoint())
out: out:
for { for {
select { select {
case htlcPkt := <-downstreamLink:
fmt.Println(htlcPkt)
case msg, ok := <-upstreamLink:
// If the upstream message link is closed, this signals
// that the channel itself is being closed, therefore
// we exit.
if !ok {
break out
}
fmt.Println(msg)
case <-p.quit: case <-p.quit:
break out break out
} }