diff --git a/peer.go b/peer.go index 2a5c193b..10ba5f45 100644 --- a/peer.go +++ b/peer.go @@ -766,22 +766,33 @@ type pendingPayment struct { // commitment update state-machine. This struct is used by htlcManager's to // save meta-state required for proper functioning. type commitmentState struct { - // TODO(roasbeef): use once trickle+batch logic is in - pendingLogLen uint32 - // htlcsToSettle is a list of preimages which allow us to settle one or // many of the pending HTLC's we've received from the upstream peer. // TODO(roasbeef): should send sig to settle once preimage is known. - htlcsToSettle [][32]byte + htlcsToSettle map[uint32][32]byte - // sigPending is a bool which indicates if we're currently awaiting a - // signature response to a commitment update we've initiated. - sigPending bool + // TODO(roasbeef): use once trickle+batch logic is in + pendingBatch []*pendingPayment // clearedHTCLs is a map of outgoing HTLC's we've committed to in our // chain which have not yet been settled by the upstream peer. clearedHTCLs map[uint32]*pendingPayment + // numUnAcked is a counter tracking the number of unacked changes we've + // sent. A change is acked once we receive a new update to our local + // chain from the remote peer. + numUnAcked uint32 + + // logCommitTimer is a timer which is sent upon if we go an interval + // without receiving/sending a commitment update. It's role is to + // ensure both chains converge to identical state in a timely manner. + // TODO(roasbeef): timer should be >> then RTT + logCommitTimer <-chan time.Time + + // switchChan is a channel used to send packets to the htlc switch for + // fowarding. + switchChan chan<- *htlcPacket + channel *lnwallet.LightningChannel chanPoint *wire.OutPoint } @@ -800,7 +811,7 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, upstreamLink <-chan lnwire.Message) { chanStats := channel.StateSnapshot() - peerLog.Tracef("HTLC manager for ChannelPoint(%v) started, "+ + peerLog.Infof("HTLC manager for ChannelPoint(%v) started, "+ "our_balance=%v, their_balance=%v, chain_height=%v", channel.ChannelPoint(), chanStats.LocalBalance, chanStats.RemoteBalance, chanStats.NumUpdates) @@ -817,13 +828,58 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, } state := &commitmentState{ - channel: channel, - chanPoint: channel.ChannelPoint(), - clearedHTCLs: make(map[uint32]*pendingPayment), + channel: channel, + chanPoint: channel.ChannelPoint(), + clearedHTCLs: make(map[uint32]*pendingPayment), + htlcsToSettle: make(map[uint32][32]byte), + switchChan: htlcPlex, } + + batchTimer := time.Tick(10 * time.Millisecond) out: for { select { + // TODO(roasbeef): prevent leaking ticker? + case <-state.logCommitTimer: + // If we haven't sent or received a new commitment + // update in some time, check to see if we have any + // pending updates we need to commit. If so, then send + // an update incrementing the unacked coutner is + // succesful. + if !state.channel.PendingUpdates() { + continue + } + + if sent, err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + p.Disconnect() + break out + } else if sent { + state.numUnAcked += 1 + } + case <-batchTimer: + // If the current batch is empty, then we have no work + // here. + if len(state.pendingBatch) == 0 { + continue + } + + // Otherwise, attempt to extend the remote commitment + // chain including all the currently pending entries. + // If the send was unsuccesful, then abaondon the + // update, waiting for the revocation window to open + // up. + if sent, err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + p.Disconnect() + break out + } else if !sent { + continue + } + + state.numUnAcked += 1 case pkt := <-downstreamLink: p.handleDownStreamPkt(state, pkt) case msg, ok := <-upstreamLink: @@ -855,20 +911,29 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { // downstream channel, so we add the new HTLC // to our local log, then update the commitment // chains. - index := state.channel.AddHTLC(htlc, false) + index := state.channel.AddHTLC(htlc) p.queueMsg(htlc, nil) - // TODO(roasbeef): batch trickle timer + cap - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - } - - state.sigPending = true - state.clearedHTCLs[index] = &pendingPayment{ + state.pendingBatch = append(state.pendingBatch, &pendingPayment{ htlc: htlc, index: index, err: pkt.err, + }) + + // If this newly added update exceeds the max batch size, the + // initiate an update. + // TODO(roasbeef): enforce max HTLC's in flight limit + if len(state.pendingBatch) >= 10 { + if sent, err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + p.Disconnect() + return + } else if !sent { + return + } + + state.numUnAcked += 1 } } } @@ -879,11 +944,12 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { switch htlcPkt := msg.(type) { // TODO(roasbeef): timeouts + // * fail if can't parse sphinx mix-header case *lnwire.HTLCAddRequest: // We just received an add request from an upstream peer, so we // add it to our state machine, then add the HTLC to our // "settle" list in the event that we know the pre-image - state.channel.AddHTLC(htlcPkt, true) + index := state.channel.ReceiveHTLC(htlcPkt) rHash := htlcPkt.RedemptionHashes[0] if invoice, found := p.server.invoices.lookupInvoice(rHash); found { @@ -891,12 +957,13 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // * onion layer strip should also be before invoice lookup // * also can immediately send the settle msg pre := invoice.paymentPreimage - state.htlcsToSettle = append(state.htlcsToSettle, pre) + state.htlcsToSettle[index] = pre } case *lnwire.HTLCSettleRequest: // TODO(roasbeef): this assumes no "multi-sig" pre := htlcPkt.RedemptionProofs[0] - if _, err := state.channel.SettleHTLC(pre, true); err != nil { + idx := uint32(htlcPkt.HTLCKey) + if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil { // TODO(roasbeef): broadcast on-chain peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) p.Disconnect() @@ -914,20 +981,16 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { return } - // If we didn't initiate this state transition, then we'll - // update the remote commitment chain with a new commitment. - // Otherwise, we can reset the pending bit as we received the - // signature we were expecting. - // TODO(roasbeef): move sig updates to own trigger - // * can remove sigPending if so - if !state.sigPending { - if err := p.updateCommitTx(state); err != nil { + if state.numUnAcked > 0 { + state.numUnAcked -= 1 + state.logCommitTimer = time.Tick(300 * time.Millisecond) + } else { + if _, err := p.updateCommitTx(state); err != nil { peerLog.Errorf("unable to update "+ "commitment: %v", err) + p.Disconnect() return } - } else { - state.sigPending = false } // Finally, since we just accepted a new state, send the remote @@ -948,75 +1011,90 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { p.Disconnect() return } - // TODO(roasbeef): send the locked-in HTLC's over the plex chan - // to the switch. - peerLog.Debugf("htlcs ready to forward: %v", - spew.Sdump(htlcsToForward)) // If any of the htlc's eligible for forwarding are pending // settling or timeing out previous outgoing payments, then we // can them from the pending set, and signal the requster (if // existing) that the payment has been fully fulfilled. + numSettled := 0 for _, htlc := range htlcsToForward { + // Send this fully activated HTLC to the htlc switch to + // continue the chained clear/settle. + state.switchChan <- p.logEntryToHtlcPkt(htlc) + if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok { - peerLog.Debugf("local htlc %v cleared", - spew.Sdump(p.htlc)) p.err <- nil delete(state.clearedHTCLs, htlc.ParentIndex) } - } - // A full state transition has been completed, if we don't need - // to settle any HTLC's, then we're done. - if len(state.htlcsToSettle) == 0 { - return - } - - // Otherwise, we have some pending HTLC's which we can pull - // funds from, thereby settling. - peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle)) - for _, pre := range state.htlcsToSettle { - // Add each HTLC settle update to the channel's state - // update log, also sending the log update to the - // remote party. - logIndex, err := state.channel.SettleHTLC(pre, false) - if err != nil { - peerLog.Errorf("unable to settle htlc: %v", err) + // TODO(roasbeef): rework log entries to a shared + // interface. + if htlc.EntryType != lnwallet.Add { continue } + + // If we can't immediately settle this HTLC, then we + // can halt processing here. + preimage, ok := state.htlcsToSettle[htlc.Index] + if !ok { + continue + } + + // Otherwise, we settle this HTLC within our local + // state update log, then send the update entry to the + // remote party. + logIndex, err := state.channel.SettleHTLC(preimage) + if err != nil { + peerLog.Errorf("unable to settle htlc: %v", err) + p.Disconnect() + continue + } + settleMsg := &lnwire.HTLCSettleRequest{ ChannelPoint: state.chanPoint, HTLCKey: lnwire.HTLCKey(logIndex), - RedemptionProofs: [][32]byte{pre}, + RedemptionProofs: [][32]byte{preimage}, } p.queueMsg(settleMsg, nil) + delete(state.htlcsToSettle, htlc.Index) + + numSettled++ + } + + if numSettled == 0 { + return } // With all the settle updates added to the local and remote // HTLC logs, initiate a state transition by updating the // remote commitment chain. - if err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) + if sent, err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update commitment: %v", err) + p.Disconnect() return + } else if sent { + // TODO(roasbeef): wait to delete from htlcsToSettle? + state.numUnAcked += 1 } - state.sigPending = true - state.htlcsToSettle = nil } } // updateCommitTx signs, then sends an update to the remote peer adding a new // commitment to their commitment chain which includes all the latest updates // we've received+processed up to this point. -func (p *peer) updateCommitTx(state *commitmentState) error { +func (p *peer) updateCommitTx(state *commitmentState) (bool, error) { sigTheirs, logIndexTheirs, err := state.channel.SignNextCommitment() - if err != nil { - return fmt.Errorf("unable to sign next commitment: %v", err) + if err == lnwallet.ErrNoWindow { + peerLog.Tracef("revocation window exhausted, unable to send %v", + len(state.pendingBatch)) + return false, nil + } else if err != nil { + return false, err } parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256()) if err != nil { - return fmt.Errorf("unable to parse sig: %v", err) + return false, fmt.Errorf("unable to parse sig: %v", err) } commitSig := &lnwire.CommitSignature{ @@ -1026,7 +1104,48 @@ func (p *peer) updateCommitTx(state *commitmentState) error { } p.queueMsg(commitSig, nil) - return nil + // Move all pending updates to the map of cleared HTLC's, clearing out + // the set of pending updates. + for _, update := range state.pendingBatch { + // TODO(roasbeef): add parsed next-hop info to pending batch + // for multi-hop forwarding + state.clearedHTCLs[update.index] = update + } + state.logCommitTimer = nil + state.pendingBatch = nil + + return true, nil +} + +// logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP) +// log entry the corresponding htlcPacket with src/dest set along with the +// proper wire message. This helepr method is provided in order to aide an +// htlcManager in forwarding packets to the htlcSwitch. +func (p *peer) logEntryToHtlcPkt(pd *lnwallet.PaymentDescriptor) *htlcPacket { + pkt := &htlcPacket{} + + // TODO(roasbeef): alter after switch to log entry interface + var msg lnwire.Message + switch pd.EntryType { + case lnwallet.Add: + // TODO(roasbeef): timeout, onion blob, etc + msg = &lnwire.HTLCAddRequest{ + Amount: lnwire.CreditsAmount(pd.Amount), + RedemptionHashes: [][32]byte{pd.RHash}, + } + case lnwallet.Settle: + // TODO(roasbeef): thread through preimage + msg = &lnwire.HTLCSettleRequest{ + HTLCKey: lnwire.HTLCKey(pd.ParentIndex), + } + } + + // TODO(roasbeef): set dest via onion blob or state + pkt.amt = pd.Amount + pkt.msg = msg + pkt.src = p.lightningID + + return pkt } // TODO(roasbeef): make all start/stop mutexes a CAS