funding+peer: switch to using new channel ID's

This commit modifies the way the fundingManager tracks pending funding
workflows internally. Rather than using the old auto-incrementing
64-bit pending channel ID’s, we now use a 32-byte pending channel ID
which is generated using a CSPRG. Additionally, once the final funding
message has been sent, we now de-multiplex the FundingLocked message
according to the new Channel ID’s which replace the old ChannelPoint’s
and are exactly 32-bytes long.
This commit is contained in:
Olaoluwa Osuntokun 2017-04-16 15:34:23 -07:00
parent b3424149ef
commit 609cba95d7
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 151 additions and 144 deletions

@ -2,10 +2,13 @@ package main
import ( import (
"bytes" "bytes"
"encoding/binary"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"golang.org/x/crypto/salsa20"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/go-errors/errors" "github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
@ -14,7 +17,6 @@ import (
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
@ -92,17 +94,16 @@ type fundingLockedMsg struct {
peerAddress *lnwire.NetAddress peerAddress *lnwire.NetAddress
} }
// fundingErrorMsg couples an lnwire.ErrorGeneric message // fundingErrorMsg couples an lnwire.Error message with the peer who sent the
// with the peer who sent the message. This allows the funding // message. This allows the funding manager to properly process the error.
// manager to properly process the error.
type fundingErrorMsg struct { type fundingErrorMsg struct {
err *lnwire.ErrorGeneric err *lnwire.Error
peerAddress *lnwire.NetAddress peerAddress *lnwire.NetAddress
} }
// pendingChannels is a map instantiated per-peer which tracks all active // pendingChannels is a map instantiated per-peer which tracks all active
// pending single funded channels indexed by their pending channel identifier. // pending single funded channels indexed by their pending channel identifier.
type pendingChannels map[uint64]*reservationWithCtx type pendingChannels map[[32]byte]*reservationWithCtx
// serializedPubKey is used within the FundingManager's activeReservations list // serializedPubKey is used within the FundingManager's activeReservations list
// to identify the nodes with which the FundingManager is actively working to // to identify the nodes with which the FundingManager is actively working to
@ -161,8 +162,12 @@ type fundingConfig struct {
FindPeer func(peerKey *btcec.PublicKey) (*peer, error) FindPeer func(peerKey *btcec.PublicKey) (*peer, error)
// FindChannel queries the database for the channel with the given // FindChannel queries the database for the channel with the given
// funding transaction outpoint. // channel ID.
FindChannel func(chanPoint wire.OutPoint) (*lnwallet.LightningChannel, error) FindChannel func(chanID lnwire.ChannelID) (*lnwallet.LightningChannel, error)
// TempChanIDSeed is a cryptographically random string of bytes that's
// used as a seed to generate pending channel ID's.
TempChanIDSeed [32]byte
} }
// fundingManager acts as an orchestrator/bridge between the wallet's // fundingManager acts as an orchestrator/bridge between the wallet's
@ -183,6 +188,15 @@ type fundingManager struct {
// initialized with. // initialized with.
cfg *fundingConfig cfg *fundingConfig
// chanIDKey is a cryptographically random key that's used to generate
// temporary channel ID's.
chanIDKey [32]byte
// chanIDNonce is a nonce that's incremented for each new funding
// reservation created.
nonceMtx sync.RWMutex
chanIDNonce uint64
// channelReservations is a map which houses the state of all pending // channelReservations is a map which houses the state of all pending
// funding workflows. // funding workflows.
resMtx sync.RWMutex resMtx sync.RWMutex
@ -200,12 +214,12 @@ type fundingManager struct {
// requests from a local subsystem within the daemon. // requests from a local subsystem within the daemon.
fundingRequests chan *initFundingMsg fundingRequests chan *initFundingMsg
// newChanBarriers is a map from a channel point to a 'barrier' which // newChanBarriers is a map from a channel ID to a 'barrier' which will
// will be signalled once the channel is fully open. This barrier acts // be signalled once the channel is fully open. This barrier acts as a
// as a synchronization point for any incoming/outgoing HTLCs before // synchronization point for any incoming/outgoing HTLCs before the
// the channel has been fully opened. // channel has been fully opened.
barrierMtx sync.RWMutex barrierMtx sync.RWMutex
newChanBarriers map[wire.OutPoint]chan struct{} newChanBarriers map[lnwire.ChannelID]chan struct{}
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
@ -216,8 +230,9 @@ type fundingManager struct {
func newFundingManager(cfg fundingConfig) (*fundingManager, error) { func newFundingManager(cfg fundingConfig) (*fundingManager, error) {
return &fundingManager{ return &fundingManager{
cfg: &cfg, cfg: &cfg,
chanIDKey: cfg.TempChanIDSeed,
activeReservations: make(map[serializedPubKey]pendingChannels), activeReservations: make(map[serializedPubKey]pendingChannels),
newChanBarriers: make(map[wire.OutPoint]chan struct{}), newChanBarriers: make(map[lnwire.ChannelID]chan struct{}),
fundingMsgs: make(chan interface{}, msgBufferSize), fundingMsgs: make(chan interface{}, msgBufferSize),
fundingRequests: make(chan *initFundingMsg, msgBufferSize), fundingRequests: make(chan *initFundingMsg, msgBufferSize),
queries: make(chan interface{}, 1), queries: make(chan interface{}, 1),
@ -251,7 +266,8 @@ func (f *fundingManager) Start() error {
f.barrierMtx.Lock() f.barrierMtx.Lock()
fndgLog.Tracef("Loading pending ChannelPoint(%v), creating chan "+ fndgLog.Tracef("Loading pending ChannelPoint(%v), creating chan "+
"barrier", *channel.FundingOutpoint) "barrier", *channel.FundingOutpoint)
f.newChanBarriers[*channel.FundingOutpoint] = make(chan struct{}) chanID := lnwire.NewChanIDFromOutPoint(channel.FundingOutpoint)
f.newChanBarriers[chanID] = make(chan struct{})
f.barrierMtx.Unlock() f.barrierMtx.Unlock()
doneChan := make(chan struct{}) doneChan := make(chan struct{})
@ -299,6 +315,28 @@ func (f *fundingManager) NumPendingChannels() (uint32, error) {
return <-respChan, <-errChan return <-respChan, <-errChan
} }
// nextPendingChanID returns the next free pending channel ID to be used to
// identify a particular future channel funding workflow.
func (f *fundingManager) nextPendingChanID() [32]byte {
// Obtain a fresh nonce. We do this by encoding the current nonce
// counter, then incrementing it by one.
f.nonceMtx.Lock()
var nonce [8]byte
binary.LittleEndian.PutUint64(nonce[:], f.chanIDNonce)
f.chanIDNonce++
f.nonceMtx.Unlock()
// We'll generate the next pending channelID by "encrypting" 32-bytes
// of zeroes which'll extract 32 random bytes from our stream cipher.
var (
nextChanID [32]byte
zeroes [32]byte
)
salsa20.XORKeyStream(nextChanID[:], zeroes[:], nonce[:], &f.chanIDKey)
return nextChanID
}
type pendingChannel struct { type pendingChannel struct {
identityPub *btcec.PublicKey identityPub *btcec.PublicKey
channelPoint *wire.OutPoint channelPoint *wire.OutPoint
@ -349,7 +387,7 @@ func (f *fundingManager) reservationCoordinator() {
case *fundingLockedMsg: case *fundingLockedMsg:
f.handleFundingLocked(fmsg) f.handleFundingLocked(fmsg)
case *fundingErrorMsg: case *fundingErrorMsg:
f.handleErrorGenericMsg(fmsg) f.handleErrorMsg(fmsg)
} }
case req := <-f.fundingRequests: case req := <-f.fundingRequests:
f.handleInitFundingMsg(req) f.handleInitFundingMsg(req)
@ -433,14 +471,10 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey) peerIDKey := newSerializedKey(fmsg.peerAddress.IdentityKey)
if len(f.activeReservations[peerIDKey]) >= cfg.MaxPendingChannels { if len(f.activeReservations[peerIDKey]) >= cfg.MaxPendingChannels {
errMsg := &lnwire.ErrorGeneric{ errMsg := &lnwire.Error{
ChannelPoint: wire.OutPoint{ ChanID: fmsg.msg.PendingChannelID,
Hash: chainhash.Hash{}, Code: lnwire.ErrMaxPendingChannels,
Index: 0, Data: []byte("Number of pending channels exceed maximum"),
},
Problem: "Number of pending channels exceed maximum",
Code: lnwire.ErrMaxPendingChannels,
PendingChannelID: fmsg.msg.ChannelID,
} }
if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, errMsg); err != nil { if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, errMsg); err != nil {
fndgLog.Errorf("unable to send max pending channels "+ fndgLog.Errorf("unable to send max pending channels "+
@ -460,14 +494,10 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
return return
} }
if !isSynced { if !isSynced {
errMsg := &lnwire.ErrorGeneric{ errMsg := &lnwire.Error{
ChannelPoint: wire.OutPoint{ ChanID: fmsg.msg.PendingChannelID,
Hash: chainhash.Hash{}, Code: lnwire.ErrSynchronizingChain,
Index: 0, Data: []byte("Synchronizing blockchain"),
},
Problem: "Synchronizing blockchain",
Code: lnwire.ErrSynchronizingChain,
PendingChannelID: fmsg.msg.ChannelID,
} }
if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, errMsg); err != nil { if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, errMsg); err != nil {
fndgLog.Errorf("unable to send error message to peer %v", err) fndgLog.Errorf("unable to send error message to peer %v", err)
@ -481,9 +511,9 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
delay := msg.CsvDelay delay := msg.CsvDelay
// TODO(roasbeef): error if funding flow already ongoing // TODO(roasbeef): error if funding flow already ongoing
fndgLog.Infof("Recv'd fundingRequest(amt=%v, push=%v, delay=%v, "+ fndgLog.Infof("Recv'd fundingRequest(amt=%v, delay=%v, pendingId=%x) "+
"pendingId=%v) from peer(%x)", amt, msg.PushSatoshis, delay, "from peer(%x)", amt, msg.PushSatoshis, delay, msg.PendingChannelID,
msg.ChannelID, fmsg.peerAddress.IdentityKey.SerializeCompressed()) fmsg.peerAddress.IdentityKey.SerializeCompressed())
ourDustLimit := lnwallet.DefaultDustLimit() ourDustLimit := lnwallet.DefaultDustLimit()
theirDustlimit := msg.DustLimit theirDustlimit := msg.DustLimit
@ -514,7 +544,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
if _, ok := f.activeReservations[peerIDKey]; !ok { if _, ok := f.activeReservations[peerIDKey]; !ok {
f.activeReservations[peerIDKey] = make(pendingChannels) f.activeReservations[peerIDKey] = make(pendingChannels)
} }
f.activeReservations[peerIDKey][msg.ChannelID] = &reservationWithCtx{ f.activeReservations[peerIDKey][msg.PendingChannelID] = &reservationWithCtx{
reservation: reservation, reservation: reservation,
err: make(chan error, 1), err: make(chan error, 1),
peerAddress: fmsg.peerAddress, peerAddress: fmsg.peerAddress,
@ -522,7 +552,8 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
f.resMtx.Unlock() f.resMtx.Unlock()
cancelReservation := func() { cancelReservation := func() {
_, err := f.cancelReservationCtx(fmsg.peerAddress.IdentityKey, msg.ChannelID) _, err := f.cancelReservationCtx(fmsg.peerAddress.IdentityKey,
msg.PendingChannelID)
if err != nil { if err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err) fndgLog.Errorf("unable to cancel reservation: %v", err)
} }
@ -550,7 +581,8 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
return return
} }
fndgLog.Infof("Sending fundingResp for pendingID(%v)", msg.ChannelID) fndgLog.Infof("Sending fundingResp for pendingID(%x)",
msg.PendingChannelID)
// With the initiator's contribution recorded, respond with our // With the initiator's contribution recorded, respond with our
// contribution in the next message of the workflow. // contribution in the next message of the workflow.
@ -561,7 +593,7 @@ func (f *fundingManager) handleFundingRequest(fmsg *fundingRequestMsg) {
cancelReservation() cancelReservation()
return return
} }
fundingResp := lnwire.NewSingleFundingResponse(msg.ChannelID, fundingResp := lnwire.NewSingleFundingResponse(msg.PendingChannelID,
ourContribution.RevocationKey, ourContribution.CommitKey, ourContribution.RevocationKey, ourContribution.CommitKey,
ourContribution.MultiSigKey, ourContribution.CsvDelay, ourContribution.MultiSigKey, ourContribution.CsvDelay,
deliveryScript, ourDustLimit, msg.ConfirmationDepth) deliveryScript, ourDustLimit, msg.ConfirmationDepth)
@ -585,23 +617,24 @@ func (f *fundingManager) processFundingResponse(msg *lnwire.SingleFundingRespons
// outpoint, and a commitment signature to the remote peer. // outpoint, and a commitment signature to the remote peer.
func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) { func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
msg := fmsg.msg msg := fmsg.msg
chanID := fmsg.msg.ChannelID pendingChanID := fmsg.msg.PendingChannelID
peerKey := fmsg.peerAddress.IdentityKey peerKey := fmsg.peerAddress.IdentityKey
resCtx, err := f.getReservationCtx(peerKey, chanID) resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil { if err != nil {
fndgLog.Warnf("Can't find reservation (peerKey:%v, chanID:%v)", fndgLog.Warnf("Can't find reservation (peerKey:%v, chanID:%v)",
peerKey, chanID) peerKey, pendingChanID)
return return
} }
cancelReservation := func() { cancelReservation := func() {
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil { _, err := f.cancelReservationCtx(peerKey, pendingChanID)
if err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err) fndgLog.Errorf("unable to cancel reservation: %v", err)
} }
} }
fndgLog.Infof("Recv'd fundingResponse for pendingID(%v)", msg.ChannelID) fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID)
resCtx.reservation.SetTheirDustLimit(msg.DustLimit) resCtx.reservation.SetTheirDustLimit(msg.DustLimit)
@ -651,21 +684,22 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
// channel to the barriers map which will be closed once the channel is // channel to the barriers map which will be closed once the channel is
// fully open. // fully open.
f.barrierMtx.Lock() f.barrierMtx.Lock()
fndgLog.Debugf("Creating chan barrier for "+ channelID := lnwire.NewChanIDFromOutPoint(outPoint)
"ChannelPoint(%v)", outPoint) fndgLog.Debugf("Creating chan barrier for ChanID(%v)", channelID)
f.newChanBarriers[*outPoint] = make(chan struct{}) f.newChanBarriers[channelID] = make(chan struct{})
f.barrierMtx.Unlock() f.barrierMtx.Unlock()
fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)", outPoint, fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%x)", outPoint,
chanID) pendingChanID)
revocationKey := resCtx.reservation.OurContribution().RevocationKey revocationKey := resCtx.reservation.OurContribution().RevocationKey
obsfucator := resCtx.reservation.StateNumObfuscator() obsfucator := resCtx.reservation.StateNumObfuscator()
fundingComplete := lnwire.NewSingleFundingComplete(chanID, *outPoint, fundingComplete := lnwire.NewSingleFundingComplete(pendingChanID, *outPoint,
commitSig, revocationKey, obsfucator) commitSig, revocationKey, obsfucator)
if err := f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingComplete); err != nil { err = f.cfg.SendToPeer(fmsg.peerAddress.IdentityKey, fundingComplete)
if err != nil {
fndgLog.Errorf("Unable to send funding complete message: %v", err) fndgLog.Errorf("Unable to send funding complete message: %v", err)
cancelReservation() cancelReservation()
resCtx.err <- err resCtx.err <- err
@ -686,17 +720,18 @@ func (f *fundingManager) processFundingComplete(msg *lnwire.SingleFundingComplet
// the funding transaction, progressing the workflow into the final stage. // the funding transaction, progressing the workflow into the final stage.
func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) { func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
peerKey := fmsg.peerAddress.IdentityKey peerKey := fmsg.peerAddress.IdentityKey
chanID := fmsg.msg.ChannelID pendingChanID := fmsg.msg.PendingChannelID
resCtx, err := f.getReservationCtx(peerKey, chanID) resCtx, err := f.getReservationCtx(peerKey, pendingChanID)
if err != nil { if err != nil {
fndgLog.Warnf("can't find reservation (peerID:%v, chanID:%v)", fndgLog.Warnf("can't find reservation (peerID:%v, chanID:%v)",
peerKey, chanID) peerKey, pendingChanID)
return return
} }
cancelReservation := func() { cancelReservation := func() {
if _, err := f.cancelReservationCtx(peerKey, chanID); err != nil { _, err := f.cancelReservationCtx(peerKey, pendingChanID)
if err != nil {
fndgLog.Errorf("unable to cancel reservation: %v", err) fndgLog.Errorf("unable to cancel reservation: %v", err)
} }
} }
@ -707,9 +742,8 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
// inititator's commitment transaction, then send our own if it's valid. // inititator's commitment transaction, then send our own if it's valid.
// TODO(roasbeef): make case (p vs P) consistent throughout // TODO(roasbeef): make case (p vs P) consistent throughout
fundingOut := fmsg.msg.FundingOutPoint fundingOut := fmsg.msg.FundingOutPoint
fndgLog.Infof("completing pendingID(%v) with ChannelPoint(%v)", fndgLog.Infof("completing pendingID(%x) with ChannelPoint(%v)",
chanID, fundingOut, pendingChanID, fundingOut)
)
revokeKey := copyPubKey(fmsg.msg.RevocationKey) revokeKey := copyPubKey(fmsg.msg.RevocationKey)
obsfucator := fmsg.msg.StateHintObsfucator obsfucator := fmsg.msg.StateHintObsfucator
@ -744,15 +778,15 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
// channel to the barriers map which will be closed once the channel is // channel to the barriers map which will be closed once the channel is
// fully open. // fully open.
f.barrierMtx.Lock() f.barrierMtx.Lock()
fndgLog.Debugf("Creating chan barrier for "+ channelID := lnwire.NewChanIDFromOutPoint(&fundingOut)
"ChannelPoint(%v)", fundingOut) fndgLog.Debugf("Creating chan barrier for ChanID(%v)", channelID)
f.newChanBarriers[fundingOut] = make(chan struct{}) f.newChanBarriers[channelID] = make(chan struct{})
f.barrierMtx.Unlock() f.barrierMtx.Unlock()
fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)", fndgLog.Infof("sending signComplete for pendingID(%x) over ChannelPoint(%v)",
chanID, fundingOut) pendingChanID, fundingOut)
signComplete := lnwire.NewSingleFundingSignComplete(chanID, ourCommitSig) signComplete := lnwire.NewSingleFundingSignComplete(pendingChanID, ourCommitSig)
if err := f.cfg.SendToPeer(peerKey, signComplete); err != nil { if err := f.cfg.SendToPeer(peerKey, signComplete); err != nil {
fndgLog.Errorf("unable to send signComplete message: %v", err) fndgLog.Errorf("unable to send signComplete message: %v", err)
cancelReservation() cancelReservation()
@ -764,7 +798,7 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
go f.waitForFundingConfirmation(completeChan, doneChan) go f.waitForFundingConfirmation(completeChan, doneChan)
<-doneChan <-doneChan
f.deleteReservationCtx(peerKey, fmsg.msg.ChannelID) f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
}() }()
} }
@ -781,7 +815,7 @@ func (f *fundingManager) processFundingSignComplete(msg *lnwire.SingleFundingSig
// confirmations, a message is sent to the responding peer along with a compact // confirmations, a message is sent to the responding peer along with a compact
// encoding of the location of the channel within the blockchain. // encoding of the location of the channel within the blockchain.
func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) { func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg) {
chanID := fmsg.msg.ChannelID chanID := fmsg.msg.PendingChannelID
peerKey := fmsg.peerAddress.IdentityKey peerKey := fmsg.peerAddress.IdentityKey
resCtx, err := f.getReservationCtx(peerKey, chanID) resCtx, err := f.getReservationCtx(peerKey, chanID)
@ -807,7 +841,7 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
} }
fundingPoint := resCtx.reservation.FundingOutpoint() fundingPoint := resCtx.reservation.FundingOutpoint()
fndgLog.Infof("Finalizing pendingID(%v) over ChannelPoint(%v), "+ fndgLog.Infof("Finalizing pendingID(%x) over ChannelPoint(%v), "+
"waiting for channel open on-chain", chanID, fundingPoint) "waiting for channel open on-chain", chanID, fundingPoint)
// Send an update to the upstream client that the negotiation process // Send an update to the upstream client that the negotiation process
@ -835,7 +869,7 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
// Finally give the caller a final update notifying them that // Finally give the caller a final update notifying them that
// the channel is now open. // the channel is now open.
// TODO(roasbeef): helper funcs for proto construction // TODO(roasbeef): only notify after recv of funding locked?
resCtx.updates <- &lnrpc.OpenStatusUpdate{ resCtx.updates <- &lnrpc.OpenStatusUpdate{
Update: &lnrpc.OpenStatusUpdate_ChanOpen{ Update: &lnrpc.OpenStatusUpdate_ChanOpen{
ChanOpen: &lnrpc.ChannelOpenUpdate{ ChanOpen: &lnrpc.ChannelOpenUpdate{
@ -847,7 +881,7 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
}, },
} }
f.deleteReservationCtx(peerKey, fmsg.msg.ChannelID) f.deleteReservationCtx(peerKey, fmsg.msg.PendingChannelID)
}() }()
} }
@ -856,8 +890,8 @@ func (f *fundingManager) handleFundingSignComplete(fmsg *fundingSignCompleteMsg)
// function of waitForFundingConfirmation is to wait for blockchain // function of waitForFundingConfirmation is to wait for blockchain
// confirmation, and then to notify the other systems that must be notified // confirmation, and then to notify the other systems that must be notified
// when a channel has become active for lightning transactions. // when a channel has become active for lightning transactions.
func (f *fundingManager) waitForFundingConfirmation( func (f *fundingManager) waitForFundingConfirmation(completeChan *channeldb.OpenChannel,
completeChan *channeldb.OpenChannel, doneChan chan struct{}) { doneChan chan struct{}) {
defer close(doneChan) defer close(doneChan)
@ -865,7 +899,12 @@ func (f *fundingManager) waitForFundingConfirmation(
// transaction reaches `numConfs` confirmations. // transaction reaches `numConfs` confirmations.
txid := completeChan.FundingOutpoint.Hash txid := completeChan.FundingOutpoint.Hash
numConfs := uint32(completeChan.NumConfsRequired) numConfs := uint32(completeChan.NumConfsRequired)
confNtfn, _ := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid, numConfs) confNtfn, err := f.cfg.Notifier.RegisterConfirmationsNtfn(&txid, numConfs)
if err != nil {
fndgLog.Errorf("Unable to register for confirmation of "+
"ChannelPoint(%v)", completeChan.FundingOutpoint)
return
}
fndgLog.Infof("Waiting for funding tx (%v) to reach %v confirmations", fndgLog.Infof("Waiting for funding tx (%v) to reach %v confirmations",
txid, numConfs) txid, numConfs)
@ -874,27 +913,31 @@ func (f *fundingManager) waitForFundingConfirmation(
// or the wallet signals a shutdown. // or the wallet signals a shutdown.
confDetails, ok := <-confNtfn.Confirmed confDetails, ok := <-confNtfn.Confirmed
if !ok { if !ok {
fndgLog.Infof("ChainNotifier shutting down, cannot complete "+ fndgLog.Warnf("ChainNotifier shutting down, cannot complete "+
"funding flow for ChannelPoint(%v)", "funding flow for ChannelPoint(%v)",
completeChan.FundingOutpoint) completeChan.FundingOutpoint)
return return
} }
fundingPoint := *completeChan.FundingOutpoint fundingPoint := *completeChan.FundingOutpoint
fndgLog.Infof("ChannelPoint(%v) is now active", fundingPoint) chanID := lnwire.NewChanIDFromOutPoint(&fundingPoint)
fndgLog.Infof("ChannelPoint(%v) is now active: ChannelID(%x)",
fundingPoint, chanID)
// Now that the channel has been fully confirmed, we'll mark it as open
// within the database.
completeChan.IsPending = false completeChan.IsPending = false
err := f.cfg.Wallet.ChannelDB.MarkChannelAsOpen(&fundingPoint) err = f.cfg.Wallet.ChannelDB.MarkChannelAsOpen(&fundingPoint)
if err != nil { if err != nil {
fndgLog.Errorf("error setting channel pending flag to false: "+ fndgLog.Errorf("error setting channel pending flag to false: "+
"%v", err) "%v", err)
return return
} }
// Finally, create and officially open the payment channel! // With the channel marked open, we'll create the state-machine object
// TODO(roasbeef): CreationTime once tx is 'open' // which wraps the database state.
channel, err := lnwallet.NewLightningChannel(f.cfg.Wallet.Signer, channel, err := lnwallet.NewLightningChannel(nil, nil, completeChan)
f.cfg.Notifier, completeChan)
if err != nil { if err != nil {
fndgLog.Errorf("error creating new lightning channel: %v", err) fndgLog.Errorf("error creating new lightning channel: %v", err)
return return
@ -1014,15 +1057,15 @@ type chanAnnouncement struct {
// authenticated only by us and contains our directional routing policy for the // authenticated only by us and contains our directional routing policy for the
// channel. // channel.
func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.PublicKey, func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.PublicKey,
localFundingKey, remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID, localFundingKey, remoteFundingKey *btcec.PublicKey,
chanPoint wire.OutPoint) (*chanAnnouncement, error) { shortChanID lnwire.ShortChannelID,
var err error chanID lnwire.ChannelID) (*chanAnnouncement, error) {
// The unconditional section of the announcement is the ShortChannelID // The unconditional section of the announcement is the ShortChannelID
// itself which compactly encodes the location of the funding output // itself which compactly encodes the location of the funding output
// within the blockchain. // within the blockchain.
chanAnn := &lnwire.ChannelAnnouncement{ chanAnn := &lnwire.ChannelAnnouncement{
ShortChannelID: chanID, ShortChannelID: shortChanID,
} }
// The chanFlags field indicates which directed edge of the channel is // The chanFlags field indicates which directed edge of the channel is
@ -1059,7 +1102,7 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
// TODO(roasbeef): populate proper FeeSchema // TODO(roasbeef): populate proper FeeSchema
chanUpdateAnn := &lnwire.ChannelUpdateAnnouncement{ chanUpdateAnn := &lnwire.ChannelUpdateAnnouncement{
ShortChannelID: chanID, ShortChannelID: shortChanID,
Timestamp: uint32(time.Now().Unix()), Timestamp: uint32(time.Now().Unix()),
Flags: chanFlags, Flags: chanFlags,
TimeLockDelta: 1, TimeLockDelta: 1,
@ -1108,8 +1151,8 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
// provide the other side with the necessary signatures required to // provide the other side with the necessary signatures required to
// allow them to reconstruct the full channel announcement. // allow them to reconstruct the full channel announcement.
proof := &lnwire.AnnounceSignatures{ proof := &lnwire.AnnounceSignatures{
ChannelID: chanPoint, ChannelID: chanID,
ShortChannelID: chanID, ShortChannelID: shortChanID,
NodeSignature: nodeSig, NodeSignature: nodeSig,
BitcoinSignature: bitcoinSig, BitcoinSignature: bitcoinSig,
} }
@ -1127,19 +1170,16 @@ func (f *fundingManager) newChanAnnouncement(localPubKey, remotePubKey *btcec.Pu
// announcements are then sent to the channel router to handle broadcasting to // announcements are then sent to the channel router to handle broadcasting to
// the network during its next trickle. // the network during its next trickle.
func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKey, func (f *fundingManager) announceChannel(localIDKey, remoteIDKey, localFundingKey,
remoteFundingKey *btcec.PublicKey, chanID lnwire.ShortChannelID, remoteFundingKey *btcec.PublicKey, shortChanID lnwire.ShortChannelID,
chanPoint wire.OutPoint) { chanID lnwire.ChannelID) {
ann, err := f.newChanAnnouncement(localIDKey, remoteIDKey, localFundingKey, ann, err := f.newChanAnnouncement(localIDKey, remoteIDKey, localFundingKey,
remoteFundingKey, chanID, chanPoint) remoteFundingKey, shortChanID, chanID)
if err != nil { if err != nil {
fndgLog.Errorf("can't generate channel announcement: %v", err) fndgLog.Errorf("can't generate channel announcement: %v", err)
return return
} }
fndgLog.Infof("Announcing ChannelPoint(%v), short_chan_id=%v", chanPoint,
spew.Sdump(chanID))
f.cfg.SendAnnouncement(ann.chanAnn) f.cfg.SendAnnouncement(ann.chanAnn)
f.cfg.SendAnnouncement(ann.chanUpdateAnn) f.cfg.SendAnnouncement(ann.chanUpdateAnn)
f.cfg.SendAnnouncement(ann.chanProof) f.cfg.SendAnnouncement(ann.chanProof)
@ -1188,13 +1228,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
// Obtain a new pending channel ID which is used to track this // Obtain a new pending channel ID which is used to track this
// reservation throughout its lifetime. // reservation throughout its lifetime.
peer, err := f.cfg.FindPeer(peerKey) chanID := f.nextPendingChanID()
if err != nil {
msg.err <- err
return
}
chanID := peer.fetchNextPendingChanID()
// If a pending channel map for this peer isn't already created, then // If a pending channel map for this peer isn't already created, then
// we create one, ultimately allowing us to track this pending // we create one, ultimately allowing us to track this pending
@ -1223,7 +1257,7 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
return return
} }
fndgLog.Infof("Starting funding workflow with %v for pendingID(%v)", fndgLog.Infof("Starting funding workflow with %v for pendingID(%x)",
msg.peerAddress.Address, chanID) msg.peerAddress.Address, chanID)
// TODO(roasbeef): add FundingRequestFromContribution func // TODO(roasbeef): add FundingRequestFromContribution func
@ -1252,26 +1286,27 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
// waitUntilChannelOpen is designed to prevent other lnd subsystems from // waitUntilChannelOpen is designed to prevent other lnd subsystems from
// sending new update messages to a channel before the channel is fully // sending new update messages to a channel before the channel is fully
// opened. // opened.
func (f *fundingManager) waitUntilChannelOpen(targetChan wire.OutPoint) { func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) {
f.barrierMtx.RLock() f.barrierMtx.RLock()
barrier, ok := f.newChanBarriers[targetChan] barrier, ok := f.newChanBarriers[targetChan]
f.barrierMtx.RUnlock() f.barrierMtx.RUnlock()
if ok { if ok {
fndgLog.Tracef("waiting for chan barrier signal for "+ fndgLog.Tracef("waiting for chan barrier signal for ChanID(%v)",
"ChannelPoint(%v)", targetChan) targetChan)
select { select {
case <-barrier: case <-barrier:
case <-f.quit: // TODO(roasbeef): add timer? case <-f.quit: // TODO(roasbeef): add timer?
break break
} }
fndgLog.Tracef("barrier for ChannelPoint(%v) closed",
targetChan) fndgLog.Tracef("barrier for ChanID(%v) closed", targetChan)
} }
} }
// processErrorGeneric sends a message to the fundingManager allowing it to // processErrorGeneric sends a message to the fundingManager allowing it to
// process the occurred generic error. // process the occurred generic error.
func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric, func (f *fundingManager) processFundingError(err *lnwire.Error,
peerAddress *lnwire.NetAddress) { peerAddress *lnwire.NetAddress) {
f.fundingMsgs <- &fundingErrorMsg{err, peerAddress} f.fundingMsgs <- &fundingErrorMsg{err, peerAddress}
@ -1280,7 +1315,7 @@ func (f *fundingManager) processErrorGeneric(err *lnwire.ErrorGeneric,
// handleErrorGenericMsg process the error which was received from remote peer, // handleErrorGenericMsg process the error which was received from remote peer,
// depending on the type of error we should do different clean up steps and // depending on the type of error we should do different clean up steps and
// inform the user about it. // inform the user about it.
func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) { func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
e := fmsg.err e := fmsg.err
switch e.Code { switch e.Code {
@ -1288,8 +1323,7 @@ func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) {
fallthrough fallthrough
case lnwire.ErrSynchronizingChain: case lnwire.ErrSynchronizingChain:
peerKey := fmsg.peerAddress.IdentityKey peerKey := fmsg.peerAddress.IdentityKey
chanID := fmsg.err.PendingChannelID chanID := fmsg.err.ChanID
ctx, err := f.cancelReservationCtx(peerKey, chanID) ctx, err := f.cancelReservationCtx(peerKey, chanID)
if err != nil { if err != nil {
fndgLog.Warnf("unable to delete reservation: %v", err) fndgLog.Warnf("unable to delete reservation: %v", err)
@ -1303,21 +1337,21 @@ func (f *fundingManager) handleErrorGenericMsg(fmsg *fundingErrorMsg) {
}), }),
) )
ctx.err <- grpc.Errorf(e.Code.ToGrpcCode(), e.Problem) ctx.err <- grpc.Errorf(e.Code.ToGrpcCode(), string(e.Data))
return return
default: default:
fndgLog.Warnf("unknown funding error (%v:%v)", e.Code, e.Problem) fndgLog.Warnf("unknown funding error (%v:%v)", e.Code, e.Data)
} }
} }
// cancelReservationCtx do all needed work in order to securely cancel the // cancelReservationCtx do all needed work in order to securely cancel the
// reservation. // reservation.
func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
chanID uint64) (*reservationWithCtx, error) { chanID [32]byte) (*reservationWithCtx, error) {
fndgLog.Infof("Cancelling funding reservation for node_key=%x, "+ fndgLog.Infof("Cancelling funding reservation for node_key=%x, "+
"chan_id=%v", peerKey.SerializeCompressed(), chanID) "chan_id=%x", peerKey.SerializeCompressed(), chanID)
ctx, err := f.getReservationCtx(peerKey, chanID) ctx, err := f.getReservationCtx(peerKey, chanID)
if err != nil { if err != nil {
@ -1335,7 +1369,9 @@ func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
} }
// deleteReservationCtx is needed in order to securely delete the reservation. // deleteReservationCtx is needed in order to securely delete the reservation.
func (f *fundingManager) deleteReservationCtx(peerKey *btcec.PublicKey, chanID uint64) { func (f *fundingManager) deleteReservationCtx(peerKey *btcec.PublicKey,
chanID [32]byte) {
// TODO(roasbeef): possibly cancel funding barrier in peer's // TODO(roasbeef): possibly cancel funding barrier in peer's
// channelManager? // channelManager?
peerIDKey := newSerializedKey(peerKey) peerIDKey := newSerializedKey(peerKey)
@ -1346,7 +1382,7 @@ func (f *fundingManager) deleteReservationCtx(peerKey *btcec.PublicKey, chanID u
// getReservationCtx returns the reservation context by peer id and channel id. // getReservationCtx returns the reservation context by peer id and channel id.
func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey, func (f *fundingManager) getReservationCtx(peerKey *btcec.PublicKey,
chanID uint64) (*reservationWithCtx, error) { chanID [32]byte) (*reservationWithCtx, error) {
peerIDKey := newSerializedKey(peerKey) peerIDKey := newSerializedKey(peerKey)
f.resMtx.RLock() f.resMtx.RLock()

29
peer.go

@ -137,15 +137,6 @@ type peer struct {
// over. // over.
remoteCloseChanReqs chan *lnwire.CloseRequest remoteCloseChanReqs chan *lnwire.CloseRequest
// nextPendingChannelID is an integer which represents the id of the
// next pending channel. Pending channels are tracked by this id
// throughout their lifetime until they become active channels, or are
// cancelled. Channels id's initiated by an outbound node start from 0,
// while channels initiated by an inbound node start from 2^63. In
// either case, this value is always monotonically increasing.
nextPendingChannelID uint64
pendingChannelMtx sync.RWMutex
server *server server *server
// localSharedFeatures is a product of comparison of our and their // localSharedFeatures is a product of comparison of our and their
@ -200,15 +191,6 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
// Initiate the pending channel identifier properly depending on if this
// node is inbound or outbound. This value will be used in an increasing
// manner to track pending channels.
if p.inbound {
p.nextPendingChannelID = 1 << 63
} else {
p.nextPendingChannelID = 0
}
// Fetch and then load all the active channels we have with this // Fetch and then load all the active channels we have with this
// remote peer from the database. // remote peer from the database.
activeChans, err := server.chanDB.FetchOpenChannels(p.addr.IdentityKey) activeChans, err := server.chanDB.FetchOpenChannels(p.addr.IdentityKey)
@ -1721,17 +1703,6 @@ func (p *peer) updateCommitTx(state *commitmentState) error {
return nil return nil
} }
// fetchNextPendingChanID provides unique IDs for each channel opened between
// two peers
func (p *peer) fetchNextPendingChanID() uint64 {
p.pendingChannelMtx.Lock()
defer p.pendingChannelMtx.Unlock()
chanID := p.nextPendingChannelID
p.nextPendingChannelID++
return chanID
}
// logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP) // logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP)
// log entry the corresponding htlcPacket with src/dest set along with the // log entry the corresponding htlcPacket with src/dest set along with the
// proper wire message. This helper method is provided in order to aid an // proper wire message. This helper method is provided in order to aid an