Merge pull request #782 from paddyquinn/master
funding: implement reservation zombie sweeper
This commit is contained in:
commit
0befe41384
@ -70,10 +70,29 @@ type reservationWithCtx struct {
|
|||||||
|
|
||||||
chanAmt btcutil.Amount
|
chanAmt btcutil.Amount
|
||||||
|
|
||||||
|
lastUpdated time.Time
|
||||||
|
|
||||||
updates chan *lnrpc.OpenStatusUpdate
|
updates chan *lnrpc.OpenStatusUpdate
|
||||||
err chan error
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isLocked checks the reservation's timestamp to determine whether it is locked.
|
||||||
|
func (r *reservationWithCtx) isLocked() bool {
|
||||||
|
// The time zero value represents a locked reservation.
|
||||||
|
return r.lastUpdated.IsZero()
|
||||||
|
}
|
||||||
|
|
||||||
|
// lock locks the reservation from zombie pruning by setting its timestamp to the
|
||||||
|
// zero value.
|
||||||
|
func (r *reservationWithCtx) lock() {
|
||||||
|
r.lastUpdated = time.Time{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateTimestamp updates the reservation's timestamp with the current time.
|
||||||
|
func (r *reservationWithCtx) updateTimestamp() {
|
||||||
|
r.lastUpdated = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
// initFundingMsg is sent by an outside subsystem to the funding manager in
|
// initFundingMsg is sent by an outside subsystem to the funding manager in
|
||||||
// order to kick off a funding workflow with a specified target peer. The
|
// order to kick off a funding workflow with a specified target peer. The
|
||||||
// original request which defines the parameters of the funding workflow are
|
// original request which defines the parameters of the funding workflow are
|
||||||
@ -266,6 +285,14 @@ type fundingConfig struct {
|
|||||||
// discovered short channel ID of a formerly pending channel to outside
|
// discovered short channel ID of a formerly pending channel to outside
|
||||||
// sub-systems.
|
// sub-systems.
|
||||||
ReportShortChanID func(wire.OutPoint, lnwire.ShortChannelID) error
|
ReportShortChanID func(wire.OutPoint, lnwire.ShortChannelID) error
|
||||||
|
|
||||||
|
// ZombieSweeperInterval is the periodic time interval in which the zombie
|
||||||
|
// sweeper is run.
|
||||||
|
ZombieSweeperInterval time.Duration
|
||||||
|
|
||||||
|
// ReservationTimeout is the length of idle time that must pass before a
|
||||||
|
// reservation is considered a zombie.
|
||||||
|
ReservationTimeout time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// fundingManager acts as an orchestrator/bridge between the wallet's
|
// fundingManager acts as an orchestrator/bridge between the wallet's
|
||||||
@ -307,7 +334,7 @@ type fundingManager struct {
|
|||||||
signedReservations map[lnwire.ChannelID][32]byte
|
signedReservations map[lnwire.ChannelID][32]byte
|
||||||
|
|
||||||
// resMtx guards both of the maps above to ensure that all access is
|
// resMtx guards both of the maps above to ensure that all access is
|
||||||
// goroutine stafe.
|
// goroutine safe.
|
||||||
resMtx sync.RWMutex
|
resMtx sync.RWMutex
|
||||||
|
|
||||||
// fundingMsgs is a channel which receives wrapped wire messages
|
// fundingMsgs is a channel which receives wrapped wire messages
|
||||||
@ -764,6 +791,9 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey,
|
|||||||
func (f *fundingManager) reservationCoordinator() {
|
func (f *fundingManager) reservationCoordinator() {
|
||||||
defer f.wg.Done()
|
defer f.wg.Done()
|
||||||
|
|
||||||
|
zombieSweepTicker := time.NewTicker(f.cfg.ZombieSweeperInterval)
|
||||||
|
defer zombieSweepTicker.Stop()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|
||||||
@ -786,6 +816,9 @@ func (f *fundingManager) reservationCoordinator() {
|
|||||||
case req := <-f.fundingRequests:
|
case req := <-f.fundingRequests:
|
||||||
f.handleInitFundingMsg(req)
|
f.handleInitFundingMsg(req)
|
||||||
|
|
||||||
|
case <-zombieSweepTicker.C:
|
||||||
|
f.pruneZombieReservations()
|
||||||
|
|
||||||
case req := <-f.queries:
|
case req := <-f.queries:
|
||||||
switch msg := req.(type) {
|
switch msg := req.(type) {
|
||||||
case *pendingChansReq:
|
case *pendingChansReq:
|
||||||
@ -944,20 +977,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
|
|||||||
amt, msg.PushAmount)
|
amt, msg.PushAmount)
|
||||||
|
|
||||||
// Once the reservation has been created successfully, we add it to
|
// Once the reservation has been created successfully, we add it to
|
||||||
// this peers map of pending reservations to track this particular
|
// this peer's map of pending reservations to track this particular
|
||||||
// reservation until either abort or completion.
|
// reservation until either abort or completion.
|
||||||
f.resMtx.Lock()
|
f.resMtx.Lock()
|
||||||
if _, ok := f.activeReservations[peerIDKey]; !ok {
|
if _, ok := f.activeReservations[peerIDKey]; !ok {
|
||||||
f.activeReservations[peerIDKey] = make(pendingChannels)
|
f.activeReservations[peerIDKey] = make(pendingChannels)
|
||||||
}
|
}
|
||||||
f.activeReservations[peerIDKey][msg.PendingChannelID] = &reservationWithCtx{
|
resCtx := &reservationWithCtx{
|
||||||
reservation: reservation,
|
reservation: reservation,
|
||||||
chanAmt: amt,
|
chanAmt: amt,
|
||||||
err: make(chan error, 1),
|
err: make(chan error, 1),
|
||||||
peerAddress: fmsg.peerAddress,
|
peerAddress: fmsg.peerAddress,
|
||||||
}
|
}
|
||||||
|
f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx
|
||||||
f.resMtx.Unlock()
|
f.resMtx.Unlock()
|
||||||
|
|
||||||
|
// Update the timestamp once the fundingOpenMsg has been handled.
|
||||||
|
defer resCtx.updateTimestamp()
|
||||||
|
|
||||||
// Using the RequiredRemoteDelay closure, we'll compute the remote CSV
|
// Using the RequiredRemoteDelay closure, we'll compute the remote CSV
|
||||||
// delay we require given the total amount of funds within the channel.
|
// delay we require given the total amount of funds within the channel.
|
||||||
remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt)
|
remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt)
|
||||||
@ -1066,6 +1103,9 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Update the timestamp once the fundingAcceptMsg has been handled.
|
||||||
|
defer resCtx.updateTimestamp()
|
||||||
|
|
||||||
fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID[:])
|
fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID[:])
|
||||||
|
|
||||||
// We'll also specify the responder's preference for the number of
|
// We'll also specify the responder's preference for the number of
|
||||||
@ -1307,6 +1347,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) {
|
|||||||
f.localDiscoverySignals[channelID] = make(chan struct{})
|
f.localDiscoverySignals[channelID] = make(chan struct{})
|
||||||
f.localDiscoveryMtx.Unlock()
|
f.localDiscoveryMtx.Unlock()
|
||||||
|
|
||||||
|
// At this point we have sent our last funding message to the
|
||||||
|
// initiating peer before the funding transaction will be broadcast.
|
||||||
|
// The only thing left to do before we can delete this reservation
|
||||||
|
// is wait for the funding transaction. Lock the reservation so it
|
||||||
|
// is not pruned by the zombie sweeper.
|
||||||
|
resCtx.lock()
|
||||||
|
|
||||||
// With this last message, our job as the responder is now complete.
|
// With this last message, our job as the responder is now complete.
|
||||||
// We'll wait for the funding transaction to reach the specified number
|
// We'll wait for the funding transaction to reach the specified number
|
||||||
// of confirmations, then start normal operations.
|
// of confirmations, then start normal operations.
|
||||||
@ -1457,6 +1504,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// At this point we have broadcast the funding transaction and done all
|
||||||
|
// necessary processing. The only thing left to do before we can delete
|
||||||
|
// this reservation is wait for the funding transaction. Lock the
|
||||||
|
// reservation so it is not pruned by the zombie sweeper.
|
||||||
|
resCtx.lock()
|
||||||
|
|
||||||
f.wg.Add(1)
|
f.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer f.wg.Done()
|
defer f.wg.Done()
|
||||||
@ -2457,15 +2510,19 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) {
|
|||||||
f.activeReservations[peerIDKey] = make(pendingChannels)
|
f.activeReservations[peerIDKey] = make(pendingChannels)
|
||||||
}
|
}
|
||||||
|
|
||||||
f.activeReservations[peerIDKey][chanID] = &reservationWithCtx{
|
resCtx := &reservationWithCtx{
|
||||||
chanAmt: capacity,
|
chanAmt: capacity,
|
||||||
reservation: reservation,
|
reservation: reservation,
|
||||||
peerAddress: msg.peerAddress,
|
peerAddress: msg.peerAddress,
|
||||||
updates: msg.updates,
|
updates: msg.updates,
|
||||||
err: msg.err,
|
err: msg.err,
|
||||||
}
|
}
|
||||||
|
f.activeReservations[peerIDKey][chanID] = resCtx
|
||||||
f.resMtx.Unlock()
|
f.resMtx.Unlock()
|
||||||
|
|
||||||
|
// Update the timestamp once the initFundingMsg has been handled.
|
||||||
|
defer resCtx.updateTimestamp()
|
||||||
|
|
||||||
// Using the RequiredRemoteDelay closure, we'll compute the remote CSV
|
// Using the RequiredRemoteDelay closure, we'll compute the remote CSV
|
||||||
// delay we require given the total amount of funds within the channel.
|
// delay we require given the total amount of funds within the channel.
|
||||||
remoteCsvDelay := f.cfg.RequiredRemoteDelay(capacity)
|
remoteCsvDelay := f.cfg.RequiredRemoteDelay(capacity)
|
||||||
@ -2547,7 +2604,7 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// processErrorGeneric sends a message to the fundingManager allowing it to
|
// processFundingError sends a message to the fundingManager allowing it to
|
||||||
// process the occurred generic error.
|
// process the occurred generic error.
|
||||||
func (f *fundingManager) processFundingError(err *lnwire.Error,
|
func (f *fundingManager) processFundingError(err *lnwire.Error,
|
||||||
peerAddress *lnwire.NetAddress) {
|
peerAddress *lnwire.NetAddress) {
|
||||||
@ -2559,7 +2616,7 @@ func (f *fundingManager) processFundingError(err *lnwire.Error,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleErrorGenericMsg process the error which was received from remote peer,
|
// handleErrorMsg processes the error which was received from remote peer,
|
||||||
// depending on the type of error we should do different clean up steps and
|
// depending on the type of error we should do different clean up steps and
|
||||||
// inform the user about it.
|
// inform the user about it.
|
||||||
func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
|
func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
|
||||||
@ -2592,7 +2649,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
|
|||||||
lnErr.ToGrpcCode(), string(protocolErr.Data),
|
lnErr.ToGrpcCode(), string(protocolErr.Data),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
// Otherwise, we'll attempt tto display just the error code
|
// Otherwise, we'll attempt to display just the error code
|
||||||
// itself.
|
// itself.
|
||||||
resCtx.err <- grpc.Errorf(
|
resCtx.err <- grpc.Errorf(
|
||||||
lnErr.ToGrpcCode(), lnErr.String(),
|
lnErr.ToGrpcCode(), lnErr.String(),
|
||||||
@ -2605,7 +2662,35 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// cancelReservationCtx do all needed work in order to securely cancel the
|
// pruneZombieReservations loops through all pending reservations and fails the
|
||||||
|
// funding flow for any reservations that have not been updated since the
|
||||||
|
// ReservationTimeout and are not locked waiting for the funding transaction.
|
||||||
|
func (f *fundingManager) pruneZombieReservations() {
|
||||||
|
zombieReservations := make(pendingChannels)
|
||||||
|
|
||||||
|
f.resMtx.RLock()
|
||||||
|
for _, pendingReservations := range f.activeReservations {
|
||||||
|
for pendingChanID, resCtx := range pendingReservations {
|
||||||
|
if resCtx.isLocked() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if time.Since(resCtx.lastUpdated) > f.cfg.ReservationTimeout {
|
||||||
|
zombieReservations[pendingChanID] = resCtx
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
f.resMtx.RUnlock()
|
||||||
|
|
||||||
|
for pendingChanID, resCtx := range zombieReservations {
|
||||||
|
err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+
|
||||||
|
"chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:])
|
||||||
|
fndgLog.Warnf(err.Error())
|
||||||
|
f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cancelReservationCtx does all needed work in order to securely cancel the
|
||||||
// reservation.
|
// reservation.
|
||||||
func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
|
func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey,
|
||||||
pendingChanID [32]byte) (*reservationWithCtx, error) {
|
pendingChanID [32]byte) (*reservationWithCtx, error) {
|
||||||
|
@ -107,6 +107,7 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs,
|
|||||||
Confirmed: m.oneConfChannel,
|
Confirmed: m.oneConfChannel,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
|
func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
|
||||||
return &chainntnfs.BlockEpochEvent{
|
return &chainntnfs.BlockEpochEvent{
|
||||||
Epochs: m.epochChan,
|
Epochs: m.epochChan,
|
||||||
@ -121,6 +122,7 @@ func (m *mockNotifier) Start() error {
|
|||||||
func (m *mockNotifier) Stop() error {
|
func (m *mockNotifier) Stop() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
|
func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
|
||||||
heightHint uint32) (*chainntnfs.SpendEvent, error) {
|
heightHint uint32) (*chainntnfs.SpendEvent, error) {
|
||||||
return &chainntnfs.SpendEvent{
|
return &chainntnfs.SpendEvent{
|
||||||
@ -304,6 +306,8 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey,
|
|||||||
ReportShortChanID: func(wire.OutPoint, lnwire.ShortChannelID) error {
|
ReportShortChanID: func(wire.OutPoint, lnwire.ShortChannelID) error {
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
ZombieSweeperInterval: 1 * time.Hour,
|
||||||
|
ReservationTimeout: 1 * time.Nanosecond,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed creating fundingManager: %v", err)
|
t.Fatalf("failed creating fundingManager: %v", err)
|
||||||
@ -381,6 +385,8 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) {
|
|||||||
publishChan <- txn
|
publishChan <- txn
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
|
ZombieSweeperInterval: oldCfg.ZombieSweeperInterval,
|
||||||
|
ReservationTimeout: oldCfg.ReservationTimeout,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed recreating aliceFundingManager: %v", err)
|
t.Fatalf("failed recreating aliceFundingManager: %v", err)
|
||||||
@ -485,68 +491,26 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
|
|||||||
// Let Bob handle the init message.
|
// Let Bob handle the init message.
|
||||||
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
|
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
|
||||||
|
|
||||||
// Bob should answer with an AcceptChannel.
|
// Bob should answer with an AcceptChannel message.
|
||||||
var bobMsg lnwire.Message
|
acceptChannelResponse := assertFundingMsgSent(
|
||||||
select {
|
t, bob.msgChan, "AcceptChannel",
|
||||||
case bobMsg = <-bob.msgChan:
|
).(*lnwire.AcceptChannel)
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
t.Fatalf("bob did not send AcceptChannel message")
|
|
||||||
}
|
|
||||||
|
|
||||||
acceptChannelResponse, ok := bobMsg.(*lnwire.AcceptChannel)
|
|
||||||
if !ok {
|
|
||||||
errorMsg, gotError := bobMsg.(*lnwire.Error)
|
|
||||||
if gotError {
|
|
||||||
t.Fatalf("expected AcceptChannel to be sent "+
|
|
||||||
"from bob, instead got error: %v",
|
|
||||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
|
||||||
}
|
|
||||||
t.Fatalf("expected AcceptChannel to be sent from bob, "+
|
|
||||||
"instead got %T", bobMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forward the response to Alice.
|
// Forward the response to Alice.
|
||||||
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr)
|
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr)
|
||||||
|
|
||||||
// Alice responds with a FundingCreated messages.
|
// Alice responds with a FundingCreated message.
|
||||||
select {
|
fundingCreated := assertFundingMsgSent(
|
||||||
case aliceMsg = <-alice.msgChan:
|
t, alice.msgChan, "FundingCreated",
|
||||||
case <-time.After(time.Second * 5):
|
).(*lnwire.FundingCreated)
|
||||||
t.Fatalf("alice did not send FundingCreated message")
|
|
||||||
}
|
|
||||||
fundingCreated, ok := aliceMsg.(*lnwire.FundingCreated)
|
|
||||||
if !ok {
|
|
||||||
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
|
||||||
if gotError {
|
|
||||||
t.Fatalf("expected FundingCreated to be sent "+
|
|
||||||
"from bob, instead got error: %v",
|
|
||||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
|
||||||
}
|
|
||||||
t.Fatalf("expected FundingCreated to be sent from "+
|
|
||||||
"alice, instead got %T", aliceMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Give the message to Bob.
|
// Give the message to Bob.
|
||||||
bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr)
|
bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr)
|
||||||
|
|
||||||
// Finally, Bob should send the FundingSigned message.
|
// Finally, Bob should send the FundingSigned message.
|
||||||
select {
|
fundingSigned := assertFundingMsgSent(
|
||||||
case bobMsg = <-bob.msgChan:
|
t, bob.msgChan, "FundingSigned",
|
||||||
case <-time.After(time.Second * 5):
|
).(*lnwire.FundingSigned)
|
||||||
t.Fatalf("bob did not send FundingSigned message")
|
|
||||||
}
|
|
||||||
|
|
||||||
fundingSigned, ok := bobMsg.(*lnwire.FundingSigned)
|
|
||||||
if !ok {
|
|
||||||
errorMsg, gotError := bobMsg.(*lnwire.Error)
|
|
||||||
if gotError {
|
|
||||||
t.Fatalf("expected FundingSigned to be "+
|
|
||||||
"sent from bob, instead got error: %v",
|
|
||||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
|
||||||
}
|
|
||||||
t.Fatalf("expected FundingSigned to be sent from "+
|
|
||||||
"bob, instead got %T", bobMsg)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Forward the signature to Alice.
|
// Forward the signature to Alice.
|
||||||
alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr)
|
alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr)
|
||||||
@ -581,6 +545,81 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt,
|
|||||||
return fundingOutPoint
|
return fundingOutPoint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) {
|
||||||
|
select {
|
||||||
|
case <-msgChan:
|
||||||
|
t.Fatalf("error sent unexpectedly")
|
||||||
|
case <- time.After(100 * time.Millisecond):
|
||||||
|
// Expected, return.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertErrorSent(t *testing.T, msgChan chan lnwire.Message) {
|
||||||
|
var msg lnwire.Message
|
||||||
|
select {
|
||||||
|
case msg = <-msgChan:
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
t.Fatalf("node did not send Error message")
|
||||||
|
}
|
||||||
|
_, ok := msg.(*lnwire.Error)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expected Error to be sent from "+
|
||||||
|
"node, instead got %T", msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertFundingMsgSent(t *testing.T, msgChan chan lnwire.Message,
|
||||||
|
msgType string) lnwire.Message {
|
||||||
|
var msg lnwire.Message
|
||||||
|
select {
|
||||||
|
case msg = <-msgChan:
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
t.Fatalf("peer did not send %s message", msgType)
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
sentMsg lnwire.Message
|
||||||
|
ok bool
|
||||||
|
)
|
||||||
|
switch msgType {
|
||||||
|
case "AcceptChannel":
|
||||||
|
sentMsg, ok = msg.(*lnwire.AcceptChannel)
|
||||||
|
case "FundingCreated":
|
||||||
|
sentMsg, ok = msg.(*lnwire.FundingCreated)
|
||||||
|
case "FundingSigned":
|
||||||
|
sentMsg, ok = msg.(*lnwire.FundingSigned)
|
||||||
|
case "FundingLocked":
|
||||||
|
sentMsg, ok = msg.(*lnwire.FundingLocked)
|
||||||
|
default:
|
||||||
|
t.Fatalf("unknown message type: %s", msgType)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
errorMsg, gotError := msg.(*lnwire.Error)
|
||||||
|
if gotError {
|
||||||
|
t.Fatalf("expected %s to be sent, instead got error: %v",
|
||||||
|
msgType, lnwire.ErrorCode(errorMsg.Data[0]))
|
||||||
|
}
|
||||||
|
t.Fatalf("expected %s to be sent, instead got %T",
|
||||||
|
msgType, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sentMsg
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertNumPendingReservations(t *testing.T, node *testNode,
|
||||||
|
peerPubKey *btcec.PublicKey, expectedNum int) {
|
||||||
|
serializedPubKey := newSerializedKey(peerPubKey)
|
||||||
|
actualNum := len(node.fundingMgr.activeReservations[serializedPubKey])
|
||||||
|
if actualNum == expectedNum {
|
||||||
|
// Success, return.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Fatalf("Expected node to have %d pending reservations, had %v",
|
||||||
|
expectedNum, actualNum)
|
||||||
|
}
|
||||||
|
|
||||||
func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum int) {
|
func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum int) {
|
||||||
var numPendingChans int
|
var numPendingChans int
|
||||||
for i := 0; i < testPollNumTries; i++ {
|
for i := 0; i < testPollNumTries; i++ {
|
||||||
@ -665,28 +704,6 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode,
|
|||||||
assertDatabaseState(t, bob, fundingOutPoint, markedOpen)
|
assertDatabaseState(t, bob, fundingOutPoint, markedOpen)
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkNodeSendingFundingLocked(t *testing.T, node *testNode) *lnwire.FundingLocked {
|
|
||||||
var msg lnwire.Message
|
|
||||||
select {
|
|
||||||
case msg = <-node.msgChan:
|
|
||||||
case <-time.After(time.Second * 5):
|
|
||||||
t.Fatalf("node did not send fundingLocked")
|
|
||||||
}
|
|
||||||
|
|
||||||
fundingLocked, ok := msg.(*lnwire.FundingLocked)
|
|
||||||
if !ok {
|
|
||||||
errorMsg, gotError := msg.(*lnwire.Error)
|
|
||||||
if gotError {
|
|
||||||
t.Fatalf("expected FundingLocked to be sent "+
|
|
||||||
"from node, instead got error: %v",
|
|
||||||
lnwire.ErrorCode(errorMsg.Data[0]))
|
|
||||||
}
|
|
||||||
t.Fatalf("expected FundingLocked to be sent from node, "+
|
|
||||||
"instead got %T", msg)
|
|
||||||
}
|
|
||||||
return fundingLocked
|
|
||||||
}
|
|
||||||
|
|
||||||
func assertFundingLockedSent(t *testing.T, alice, bob *testNode,
|
func assertFundingLockedSent(t *testing.T, alice, bob *testNode,
|
||||||
fundingOutPoint *wire.OutPoint) {
|
fundingOutPoint *wire.OutPoint) {
|
||||||
assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent)
|
assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent)
|
||||||
@ -874,7 +891,20 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
|
|||||||
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan,
|
fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan,
|
||||||
true)
|
true)
|
||||||
|
|
||||||
// Notify that transaction was mined
|
// Make sure both reservations time out and then run both zombie sweepers.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
go alice.fundingMgr.pruneZombieReservations()
|
||||||
|
go bob.fundingMgr.pruneZombieReservations()
|
||||||
|
|
||||||
|
// Check that neither Alice nor Bob sent an error message.
|
||||||
|
assertErrorNotSent(t, alice.msgChan)
|
||||||
|
assertErrorNotSent(t, bob.msgChan)
|
||||||
|
|
||||||
|
// Check that neither reservation has been pruned.
|
||||||
|
assertNumPendingReservations(t, alice, bobPubKey, 1)
|
||||||
|
assertNumPendingReservations(t, bob, alicePubKey, 1)
|
||||||
|
|
||||||
|
// Notify that transaction was mined.
|
||||||
alice.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{}
|
alice.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{}
|
||||||
bob.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{}
|
bob.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{}
|
||||||
|
|
||||||
@ -885,10 +915,14 @@ func TestFundingManagerNormalWorkflow(t *testing.T) {
|
|||||||
|
|
||||||
// After the funding transaction is mined, Alice will send
|
// After the funding transaction is mined, Alice will send
|
||||||
// fundingLocked to Bob.
|
// fundingLocked to Bob.
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// And similarly Bob will send funding locked to Alice.
|
// And similarly Bob will send funding locked to Alice.
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Check that the state machine is updated accordingly
|
// Check that the state machine is updated accordingly
|
||||||
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
||||||
@ -970,7 +1004,9 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Bob will send funding locked to Alice.
|
// Bob will send funding locked to Alice.
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Alice should still be markedOpen
|
// Alice should still be markedOpen
|
||||||
assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
|
assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
|
||||||
@ -987,7 +1023,9 @@ func TestFundingManagerRestartBehavior(t *testing.T) {
|
|||||||
return fmt.Errorf("intentional error in SendAnnouncement")
|
return fmt.Errorf("intentional error in SendAnnouncement")
|
||||||
}
|
}
|
||||||
|
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// The state should now be fundingLockedSent
|
// The state should now be fundingLockedSent
|
||||||
assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent)
|
assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent)
|
||||||
@ -1088,7 +1126,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Bob will send funding locked to Alice
|
// Bob will send funding locked to Alice
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Alice should still be markedOpen
|
// Alice should still be markedOpen
|
||||||
assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
|
assertDatabaseState(t, alice, fundingOutPoint, markedOpen)
|
||||||
@ -1131,7 +1171,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
|
|||||||
close(con)
|
close(con)
|
||||||
|
|
||||||
// This should make Alice send the fundingLocked.
|
// This should make Alice send the fundingLocked.
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// The state should now be fundingLockedSent
|
// The state should now be fundingLockedSent
|
||||||
assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent)
|
assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent)
|
||||||
@ -1168,6 +1210,212 @@ func TestFundingManagerOfflinePeer(t *testing.T) {
|
|||||||
assertNoChannelState(t, alice, bob, fundingOutPoint)
|
assertNoChannelState(t, alice, bob, fundingOutPoint)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestFundingManagerPeerTimeoutAfterInitFunding checks that the zombie sweeper
|
||||||
|
// will properly clean up a zombie reservation that times out after the
|
||||||
|
// initFundingMsg has been handled.
|
||||||
|
func TestFundingManagerPeerTimeoutAfterInitFunding(t *testing.T) {
|
||||||
|
alice, bob := setupFundingManagers(t)
|
||||||
|
defer tearDownFundingManagers(t, alice, bob)
|
||||||
|
|
||||||
|
// We will consume the channel updates as we go, so no buffering is needed.
|
||||||
|
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||||
|
|
||||||
|
// Create a funding request and start the workflow.
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
initReq := &openChanReq{
|
||||||
|
targetPubkey: bob.privKey.PubKey(),
|
||||||
|
chainHash: *activeNetParams.GenesisHash,
|
||||||
|
localFundingAmt: 500000,
|
||||||
|
pushAmt: lnwire.NewMSatFromSatoshis(0),
|
||||||
|
private: false,
|
||||||
|
updates: updateChan,
|
||||||
|
err: errChan,
|
||||||
|
}
|
||||||
|
|
||||||
|
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
|
||||||
|
|
||||||
|
// Alice should have sent the OpenChannel message to Bob.
|
||||||
|
var aliceMsg lnwire.Message
|
||||||
|
select {
|
||||||
|
case aliceMsg = <-alice.msgChan:
|
||||||
|
case err := <-initReq.err:
|
||||||
|
t.Fatalf("error init funding workflow: %v", err)
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
t.Fatalf("alice did not send OpenChannel message")
|
||||||
|
}
|
||||||
|
|
||||||
|
_, ok := aliceMsg.(*lnwire.OpenChannel)
|
||||||
|
if !ok {
|
||||||
|
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||||
|
if gotError {
|
||||||
|
t.Fatalf("expected OpenChannel to be sent "+
|
||||||
|
"from bob, instead got error: %v",
|
||||||
|
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||||
|
}
|
||||||
|
t.Fatalf("expected OpenChannel to be sent from "+
|
||||||
|
"alice, instead got %T", aliceMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Alice should have a new pending reservation.
|
||||||
|
assertNumPendingReservations(t, alice, bobPubKey, 1)
|
||||||
|
|
||||||
|
// Make sure Alice's reservation times out and then run her zombie sweeper.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
go alice.fundingMgr.pruneZombieReservations()
|
||||||
|
|
||||||
|
// Alice should have sent an Error message to Bob.
|
||||||
|
assertErrorSent(t, alice.msgChan)
|
||||||
|
|
||||||
|
// Alice's zombie reservation should have been pruned.
|
||||||
|
assertNumPendingReservations(t, alice, bobPubKey, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestFundingManagerPeerTimeoutAfterFundingOpen checks that the zombie sweeper
|
||||||
|
// will properly clean up a zombie reservation that times out after the
|
||||||
|
// fundingOpenMsg has been handled.
|
||||||
|
func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) {
|
||||||
|
alice, bob := setupFundingManagers(t)
|
||||||
|
defer tearDownFundingManagers(t, alice, bob)
|
||||||
|
|
||||||
|
// We will consume the channel updates as we go, so no buffering is needed.
|
||||||
|
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||||
|
|
||||||
|
// Create a funding request and start the workflow.
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
initReq := &openChanReq{
|
||||||
|
targetPubkey: bob.privKey.PubKey(),
|
||||||
|
chainHash: *activeNetParams.GenesisHash,
|
||||||
|
localFundingAmt: 500000,
|
||||||
|
pushAmt: lnwire.NewMSatFromSatoshis(0),
|
||||||
|
private: false,
|
||||||
|
updates: updateChan,
|
||||||
|
err: errChan,
|
||||||
|
}
|
||||||
|
|
||||||
|
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
|
||||||
|
|
||||||
|
// Alice should have sent the OpenChannel message to Bob.
|
||||||
|
var aliceMsg lnwire.Message
|
||||||
|
select {
|
||||||
|
case aliceMsg = <-alice.msgChan:
|
||||||
|
case err := <-initReq.err:
|
||||||
|
t.Fatalf("error init funding workflow: %v", err)
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
t.Fatalf("alice did not send OpenChannel message")
|
||||||
|
}
|
||||||
|
|
||||||
|
openChannelReq, ok := aliceMsg.(*lnwire.OpenChannel)
|
||||||
|
if !ok {
|
||||||
|
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||||
|
if gotError {
|
||||||
|
t.Fatalf("expected OpenChannel to be sent "+
|
||||||
|
"from bob, instead got error: %v",
|
||||||
|
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||||
|
}
|
||||||
|
t.Fatalf("expected OpenChannel to be sent from "+
|
||||||
|
"alice, instead got %T", aliceMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Alice should have a new pending reservation.
|
||||||
|
assertNumPendingReservations(t, alice, bobPubKey, 1)
|
||||||
|
|
||||||
|
// Let Bob handle the init message.
|
||||||
|
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
|
||||||
|
|
||||||
|
// Bob should answer with an AcceptChannel.
|
||||||
|
assertFundingMsgSent(t, bob.msgChan, "AcceptChannel")
|
||||||
|
|
||||||
|
// Bob should have a new pending reservation.
|
||||||
|
assertNumPendingReservations(t, bob, alicePubKey, 1)
|
||||||
|
|
||||||
|
// Make sure Bob's reservation times out and then run his zombie sweeper.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
go bob.fundingMgr.pruneZombieReservations()
|
||||||
|
|
||||||
|
// Bob should have sent an Error message to Alice.
|
||||||
|
assertErrorSent(t, bob.msgChan)
|
||||||
|
|
||||||
|
// Bob's zombie reservation should have been pruned.
|
||||||
|
assertNumPendingReservations(t, bob, alicePubKey, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestFundingManagerPeerTimeoutAfterFundingAccept checks that the zombie sweeper
|
||||||
|
// will properly clean up a zombie reservation that times out after the
|
||||||
|
// fundingAcceptMsg has been handled.
|
||||||
|
func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) {
|
||||||
|
alice, bob := setupFundingManagers(t)
|
||||||
|
defer tearDownFundingManagers(t, alice, bob)
|
||||||
|
|
||||||
|
// We will consume the channel updates as we go, so no buffering is needed.
|
||||||
|
updateChan := make(chan *lnrpc.OpenStatusUpdate)
|
||||||
|
|
||||||
|
// Create a funding request and start the workflow.
|
||||||
|
errChan := make(chan error, 1)
|
||||||
|
initReq := &openChanReq{
|
||||||
|
targetPubkey: bob.privKey.PubKey(),
|
||||||
|
chainHash: *activeNetParams.GenesisHash,
|
||||||
|
localFundingAmt: 500000,
|
||||||
|
pushAmt: lnwire.NewMSatFromSatoshis(0),
|
||||||
|
private: false,
|
||||||
|
updates: updateChan,
|
||||||
|
err: errChan,
|
||||||
|
}
|
||||||
|
|
||||||
|
alice.fundingMgr.initFundingWorkflow(bobAddr, initReq)
|
||||||
|
|
||||||
|
// Alice should have sent the OpenChannel message to Bob.
|
||||||
|
var aliceMsg lnwire.Message
|
||||||
|
select {
|
||||||
|
case aliceMsg = <-alice.msgChan:
|
||||||
|
case err := <-initReq.err:
|
||||||
|
t.Fatalf("error init funding workflow: %v", err)
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
t.Fatalf("alice did not send OpenChannel message")
|
||||||
|
}
|
||||||
|
|
||||||
|
openChannelReq, ok := aliceMsg.(*lnwire.OpenChannel)
|
||||||
|
if !ok {
|
||||||
|
errorMsg, gotError := aliceMsg.(*lnwire.Error)
|
||||||
|
if gotError {
|
||||||
|
t.Fatalf("expected OpenChannel to be sent "+
|
||||||
|
"from bob, instead got error: %v",
|
||||||
|
lnwire.ErrorCode(errorMsg.Data[0]))
|
||||||
|
}
|
||||||
|
t.Fatalf("expected OpenChannel to be sent from "+
|
||||||
|
"alice, instead got %T", aliceMsg)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Alice should have a new pending reservation.
|
||||||
|
assertNumPendingReservations(t, alice, bobPubKey, 1)
|
||||||
|
|
||||||
|
// Let Bob handle the init message.
|
||||||
|
bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr)
|
||||||
|
|
||||||
|
// Bob should answer with an AcceptChannel.
|
||||||
|
acceptChannelResponse := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "AcceptChannel",
|
||||||
|
).(*lnwire.AcceptChannel)
|
||||||
|
|
||||||
|
// Bob should have a new pending reservation.
|
||||||
|
assertNumPendingReservations(t, bob, alicePubKey, 1)
|
||||||
|
|
||||||
|
// Forward the response to Alice.
|
||||||
|
alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr)
|
||||||
|
|
||||||
|
// Alice responds with a FundingCreated messages.
|
||||||
|
assertFundingMsgSent(t, alice.msgChan, "FundingCreated")
|
||||||
|
|
||||||
|
// Make sure Alice's reservation times out and then run her zombie sweeper.
|
||||||
|
time.Sleep(1 * time.Millisecond)
|
||||||
|
go alice.fundingMgr.pruneZombieReservations()
|
||||||
|
|
||||||
|
// Alice should have sent an Error message to Bob.
|
||||||
|
assertErrorSent(t, alice.msgChan)
|
||||||
|
|
||||||
|
// Alice's zombie reservation should have been pruned.
|
||||||
|
assertNumPendingReservations(t, alice, bobPubKey, 0)
|
||||||
|
}
|
||||||
|
|
||||||
func TestFundingManagerFundingTimeout(t *testing.T) {
|
func TestFundingManagerFundingTimeout(t *testing.T) {
|
||||||
alice, bob := setupFundingManagers(t)
|
alice, bob := setupFundingManagers(t)
|
||||||
defer tearDownFundingManagers(t, alice, bob)
|
defer tearDownFundingManagers(t, alice, bob)
|
||||||
@ -1208,7 +1456,7 @@ func TestFundingManagerFundingTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TestFundingManagerFundingNotTimeoutInitiator checks that if the user was
|
// TestFundingManagerFundingNotTimeoutInitiator checks that if the user was
|
||||||
// the channel initiator, that it does not timeout when the lnd restarts
|
// the channel initiator, that it does not timeout when the lnd restarts.
|
||||||
func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) {
|
func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) {
|
||||||
|
|
||||||
alice, bob := setupFundingManagers(t)
|
alice, bob := setupFundingManagers(t)
|
||||||
@ -1297,10 +1545,14 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) {
|
|||||||
|
|
||||||
// After the funding transaction is mined, Alice will send
|
// After the funding transaction is mined, Alice will send
|
||||||
// fundingLocked to Bob.
|
// fundingLocked to Bob.
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// And similarly Bob will send funding locked to Alice.
|
// And similarly Bob will send funding locked to Alice.
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Check that the state machine is updated accordingly
|
// Check that the state machine is updated accordingly
|
||||||
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
||||||
@ -1394,10 +1646,14 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) {
|
|||||||
|
|
||||||
// After the funding transaction is mined, Alice will send
|
// After the funding transaction is mined, Alice will send
|
||||||
// fundingLocked to Bob.
|
// fundingLocked to Bob.
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// And similarly Bob will send funding locked to Alice.
|
// And similarly Bob will send funding locked to Alice.
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Check that the state machine is updated accordingly
|
// Check that the state machine is updated accordingly
|
||||||
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
||||||
@ -1464,10 +1720,14 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) {
|
|||||||
|
|
||||||
// After the funding transaction is mined, Alice will send
|
// After the funding transaction is mined, Alice will send
|
||||||
// fundingLocked to Bob.
|
// fundingLocked to Bob.
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// And similarly Bob will send funding locked to Alice.
|
// And similarly Bob will send funding locked to Alice.
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Check that the state machine is updated accordingly
|
// Check that the state machine is updated accordingly
|
||||||
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
||||||
@ -1530,10 +1790,14 @@ func TestFundingManagerPrivateChannel(t *testing.T) {
|
|||||||
|
|
||||||
// After the funding transaction is mined, Alice will send
|
// After the funding transaction is mined, Alice will send
|
||||||
// fundingLocked to Bob.
|
// fundingLocked to Bob.
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// And similarly Bob will send funding locked to Alice.
|
// And similarly Bob will send funding locked to Alice.
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Check that the state machine is updated accordingly
|
// Check that the state machine is updated accordingly
|
||||||
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
||||||
@ -1606,10 +1870,14 @@ func TestFundingManagerPrivateRestart(t *testing.T) {
|
|||||||
|
|
||||||
// After the funding transaction is mined, Alice will send
|
// After the funding transaction is mined, Alice will send
|
||||||
// fundingLocked to Bob.
|
// fundingLocked to Bob.
|
||||||
fundingLockedAlice := checkNodeSendingFundingLocked(t, alice)
|
fundingLockedAlice := assertFundingMsgSent(
|
||||||
|
t, alice.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// And similarly Bob will send funding locked to Alice.
|
// And similarly Bob will send funding locked to Alice.
|
||||||
fundingLockedBob := checkNodeSendingFundingLocked(t, bob)
|
fundingLockedBob := assertFundingMsgSent(
|
||||||
|
t, bob.msgChan, "FundingLocked",
|
||||||
|
).(*lnwire.FundingLocked)
|
||||||
|
|
||||||
// Check that the state machine is updated accordingly
|
// Check that the state machine is updated accordingly
|
||||||
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
assertFundingLockedSent(t, alice, bob, fundingOutPoint)
|
||||||
|
2
lnd.go
2
lnd.go
@ -414,6 +414,8 @@ func lndMain() error {
|
|||||||
// channel bandwidth.
|
// channel bandwidth.
|
||||||
return uint16(lnwallet.MaxHTLCNumber / 2)
|
return uint16(lnwallet.MaxHTLCNumber / 2)
|
||||||
},
|
},
|
||||||
|
ZombieSweeperInterval: 1 * time.Minute,
|
||||||
|
ReservationTimeout: 10 * time.Minute,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -58,8 +58,8 @@ type InputScript struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ChannelReservation represents an intent to open a lightning payment channel
|
// ChannelReservation represents an intent to open a lightning payment channel
|
||||||
// a counterparty. The funding processes from reservation to channel opening is
|
// with a counterparty. The funding processes from reservation to channel opening
|
||||||
// a 3-step process. In order to allow for full concurrency during the
|
// is a 3-step process. In order to allow for full concurrency during the
|
||||||
// reservation workflow, resources consumed by a contribution are "locked"
|
// reservation workflow, resources consumed by a contribution are "locked"
|
||||||
// themselves. This prevents a number of race conditions such as two funding
|
// themselves. This prevents a number of race conditions such as two funding
|
||||||
// transactions double-spending the same input. A reservation can also be
|
// transactions double-spending the same input. A reservation can also be
|
||||||
@ -69,12 +69,12 @@ type InputScript struct {
|
|||||||
// The reservation workflow consists of the following three steps:
|
// The reservation workflow consists of the following three steps:
|
||||||
// 1. lnwallet.InitChannelReservation
|
// 1. lnwallet.InitChannelReservation
|
||||||
// * One requests the wallet to allocate the necessary resources for a
|
// * One requests the wallet to allocate the necessary resources for a
|
||||||
// channel reservation. These resources a put in limbo for the lifetime
|
// channel reservation. These resources are put in limbo for the lifetime
|
||||||
// of a reservation.
|
// of a reservation.
|
||||||
// * Once completed the reservation will have the wallet's contribution
|
// * Once completed the reservation will have the wallet's contribution
|
||||||
// accessible via the .OurContribution() method. This contribution
|
// accessible via the .OurContribution() method. This contribution
|
||||||
// contains the necessary items to allow the remote party to build both
|
// contains the necessary items to allow the remote party to build both
|
||||||
// the funding, and commitment transactions.
|
// the funding, and commitment transactions.
|
||||||
// 2. ChannelReservation.ProcessContribution/ChannelReservation.ProcessSingleContribution
|
// 2. ChannelReservation.ProcessContribution/ChannelReservation.ProcessSingleContribution
|
||||||
// * The counterparty presents their contribution to the payment channel.
|
// * The counterparty presents their contribution to the payment channel.
|
||||||
// This allows us to build the funding, and commitment transactions
|
// This allows us to build the funding, and commitment transactions
|
||||||
|
@ -52,9 +52,8 @@ func (e *ErrInsufficientFunds) Error() string {
|
|||||||
// will be created in order to track the lifetime of this pending channel.
|
// will be created in order to track the lifetime of this pending channel.
|
||||||
// Outputs selected will be 'locked', making them unavailable, for any other
|
// Outputs selected will be 'locked', making them unavailable, for any other
|
||||||
// pending reservations. Therefore, all channels in reservation limbo will be
|
// pending reservations. Therefore, all channels in reservation limbo will be
|
||||||
// periodically after a timeout period in order to avoid "exhaustion" attacks.
|
// periodically timed out after an idle period in order to avoid "exhaustion"
|
||||||
//
|
// attacks.
|
||||||
// TODO(roasbeef): zombie reservation sweeper goroutine.
|
|
||||||
type initFundingReserveMsg struct {
|
type initFundingReserveMsg struct {
|
||||||
// chainHash denotes that chain to be used to ultimately open the
|
// chainHash denotes that chain to be used to ultimately open the
|
||||||
// target channel.
|
// target channel.
|
||||||
@ -261,8 +260,6 @@ type LightningWallet struct {
|
|||||||
fundingLimbo map[uint64]*ChannelReservation
|
fundingLimbo map[uint64]*ChannelReservation
|
||||||
nextFundingID uint64
|
nextFundingID uint64
|
||||||
limboMtx sync.RWMutex
|
limboMtx sync.RWMutex
|
||||||
// TODO(roasbeef): zombie garbage collection routine to solve
|
|
||||||
// lost-object/starvation problem/attack.
|
|
||||||
|
|
||||||
// lockedOutPoints is a set of the currently locked outpoint. This
|
// lockedOutPoints is a set of the currently locked outpoint. This
|
||||||
// information is kept in order to provide an easy way to unlock all
|
// information is kept in order to provide an easy way to unlock all
|
||||||
@ -366,7 +363,7 @@ func (l *LightningWallet) ActiveReservations() []*ChannelReservation {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// requestHandler is the primary goroutine(s) responsible for handling, and
|
// requestHandler is the primary goroutine(s) responsible for handling, and
|
||||||
// dispatching relies to all messages.
|
// dispatching replies to all messages.
|
||||||
func (l *LightningWallet) requestHandler() {
|
func (l *LightningWallet) requestHandler() {
|
||||||
out:
|
out:
|
||||||
for {
|
for {
|
||||||
@ -403,14 +400,14 @@ out:
|
|||||||
// successful, a ChannelReservation containing our completed contribution is
|
// successful, a ChannelReservation containing our completed contribution is
|
||||||
// returned. Our contribution contains all the items necessary to allow the
|
// returned. Our contribution contains all the items necessary to allow the
|
||||||
// counterparty to build the funding transaction, and both versions of the
|
// counterparty to build the funding transaction, and both versions of the
|
||||||
// commitment transaction. Otherwise, an error occurred a nil pointer along with
|
// commitment transaction. Otherwise, an error occurred and a nil pointer along
|
||||||
// an error are returned.
|
// with an error are returned.
|
||||||
//
|
//
|
||||||
// Once a ChannelReservation has been obtained, two additional steps must be
|
// Once a ChannelReservation has been obtained, two additional steps must be
|
||||||
// processed before a payment channel can be considered 'open'. The second step
|
// processed before a payment channel can be considered 'open'. The second step
|
||||||
// validates, and processes the counterparty's channel contribution. The third,
|
// validates, and processes the counterparty's channel contribution. The third,
|
||||||
// and final step verifies all signatures for the inputs of the funding
|
// and final step verifies all signatures for the inputs of the funding
|
||||||
// transaction, and that the signature we records for our version of the
|
// transaction, and that the signature we record for our version of the
|
||||||
// commitment transaction is valid.
|
// commitment transaction is valid.
|
||||||
func (l *LightningWallet) InitChannelReservation(
|
func (l *LightningWallet) InitChannelReservation(
|
||||||
capacity, ourFundAmt btcutil.Amount, pushMSat lnwire.MilliSatoshi,
|
capacity, ourFundAmt btcutil.Amount, pushMSat lnwire.MilliSatoshi,
|
||||||
@ -579,7 +576,7 @@ func (l *LightningWallet) handleFundingReserveRequest(req *initFundingReserveMsg
|
|||||||
reservation.partialState.RevocationProducer = producer
|
reservation.partialState.RevocationProducer = producer
|
||||||
reservation.ourContribution.ChannelConstraints = l.Cfg.DefaultConstraints
|
reservation.ourContribution.ChannelConstraints = l.Cfg.DefaultConstraints
|
||||||
|
|
||||||
// TODO(roasbeef): turn above into: initContributio()
|
// TODO(roasbeef): turn above into: initContribution()
|
||||||
|
|
||||||
// Create a limbo and record entry for this newly pending funding
|
// Create a limbo and record entry for this newly pending funding
|
||||||
// request.
|
// request.
|
||||||
|
Loading…
Reference in New Issue
Block a user