diff --git a/peer.go b/peer.go index f052956a..b3e2cc58 100644 --- a/peer.go +++ b/peer.go @@ -222,12 +222,14 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error { // necessary to properly route multi-hop payments, and forward // new payments triggered by RPC clients. downstreamLink := make(chan lnwire.Message) - p.server.htlcSwitch.RegisterLink(p, dbChan.Snapshot(), downstreamLink) + plexChan := p.server.htlcSwitch.RegisterLink(p, + dbChan.Snapshot(), downstreamLink) + // TODO(roasbeef): buffer? upstreamLink := make(chan lnwire.Message) p.htlcManagers[chanPoint] = upstreamLink p.wg.Add(1) - go p.htlcManager(lnChan, downstreamLink, upstreamLink) + go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink) } return nil @@ -301,9 +303,6 @@ func (p *peer) readNextMessage() (lnwire.Message, []byte, error) { // // NOTE: This method MUST be run as a goroutine. func (p *peer) readHandler() { - // TODO(roasbeef): set timeout for initial channel request or version - // exchange. - out: for atomic.LoadInt32(&p.disconnect) == 0 { nextMsg, _, err := p.readNextMessage() @@ -312,6 +311,9 @@ out: break out } + var isChanUpate bool + var targetChan *wire.OutPoint + switch msg := nextMsg.(type) { // TODO(roasbeef): consolidate into predicate (single vs dual) case *lnwire.SingleFundingRequest: @@ -326,6 +328,51 @@ out: p.server.fundingMgr.processFundingOpenProof(msg, p) case *lnwire.CloseRequest: p.remoteCloseChanReqs <- msg + // TODO(roasbeef): interface for htlc update msgs + // * .(CommitmentUpdater) + case *lnwire.HTLCAddRequest: + isChanUpate = true + targetChan = msg.ChannelPoint + case *lnwire.HTLCSettleRequest: + isChanUpate = true + targetChan = msg.ChannelPoint + case *lnwire.CommitRevocation: + isChanUpate = true + targetChan = msg.ChannelPoint + case *lnwire.CommitSignature: + isChanUpate = true + targetChan = msg.ChannelPoint + } + + if isChanUpate { + // We might be receiving an update to a newly funded + // channel in which we were the responder. Therefore + // we need to possibly block until the new channel has + // propagated internally through the system. + p.barrierMtx.RLock() + barrier, ok := p.newChanBarriers[*targetChan] + p.barrierMtx.RUnlock() + if ok { + peerLog.Tracef("waiting for chan barrier "+ + "signal for ChannelPoint(%v)", targetChan) + select { + case <-barrier: + case <-p.quit: // TODO(roasbeef): add timer? + break out + } + peerLog.Tracef("barrier for ChannelPoint(%v) "+ + "closed", targetChan) + } + + // Dispatch the commitment update message to the proper + // active goroutine dedicated to this channel. + targetChan, ok := p.htlcManagers[*targetChan] + if !ok { + peerLog.Errorf("recv'd update for unknown channel %v", + targetChan) + continue + } + targetChan <- nextMsg } } @@ -499,7 +546,8 @@ out: upstreamLink := make(chan lnwire.Message) p.htlcManagers[chanPoint] = upstreamLink p.wg.Add(1) - go p.htlcManager(newChan, downstreamLink, upstreamLink) + go p.htlcManager(newChan, plexChan, downstreamLink, upstreamLink) + // Close the active channel barrier signalling the // readHandler that commitment related modifications to // this channel can now proceed. @@ -656,22 +704,72 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) { } } -// htlcManager... -// * communicates with the htlc switch over several channels -// * in handler sends to this goroutine after getting final revocation -// * has timeouts etc, to send back on queue handler in case of timeout -// TODO(roabseef): split downstream link into two chans (send vs recv) +// commitmentState is the volatile+persistent state of an active channel's +// commitment update state-machine. This struct is used by htlcManager's to +// save meta-state required for proper functioning. +type commitmentState struct { + pendingLogLen uint32 + + htlcsToSettle [][32]byte + sigPending bool + + channel *lnwallet.LightningChannel + chanPoint *wire.OutPoint +} + +// htlcManager is the primary goroutine which drives a channel's commitment +// update state-machine in response to messages received via several channels. +// The htlcManager reads messages from the upstream (remote) peer, and also +// from several possible downstream channels managed by the htlcSwitch. In the +// event that an htlc needs to be forwarded, then send-only htlcPlex chan is +// used which sends htlc packets to the switch for forwarding. Additionally, +// the htlcManager handles acting upon all timeouts for any active HTLC's, +// manages the channel's revocation window, and also the htlc trickle +// queue+timer for this active channels. func (p *peer) htlcManager(channel *lnwallet.LightningChannel, - downstreamLink chan lnwire.Message, upstreamLink chan lnwire.Message) { + htlcPlex chan<- *htlcPacket, downstreamLink <-chan lnwire.Message, + upstreamLink <-chan lnwire.Message) { - peerLog.Tracef("htlc manager for channel %v started", - channel.ChannelPoint()) + chanStats := channel.StateSnapshot() + peerLog.Tracef("HTLC manager for ChannelPoint(%v) started, "+ + "our_balance=%v, their_balance=%v, chain_height=%v", + channel.ChannelPoint(), chanStats.LocalBalance, + chanStats.RemoteBalance, chanStats.NumUpdates) + // A new session for this active channel has just started, therefore we + // need to send our initial revocation window to the remote peer. + for i := 0; i < lnwallet.InitialRevocationWindow; i++ { + rev, err := channel.ExtendRevocationWindow() + if err != nil { + peerLog.Errorf("unable to expand revocation window: %v", err) + continue + } + p.queueMsg(rev, nil) + } + + state := &commitmentState{ + channel: channel, + chanPoint: channel.ChannelPoint(), + } out: for { select { - case htlcPkt := <-downstreamLink: - fmt.Println(htlcPkt) + case msg := <-downstreamLink: + switch htlc := msg.(type) { + case *lnwire.HTLCAddRequest: + // A new payment has been initiated via the + // downstream channel, so we add the new HTLC + // to our local log, then update the commitment + // chains. + channel.AddHTLC(htlc, false) + p.queueMsg(htlc, nil) + + // TODO(roasbeef): batch trickle timer + cap + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + } + } case msg, ok := <-upstreamLink: // If the upstream message link is closed, this signals // that the channel itself is being closed, therefore @@ -680,7 +778,120 @@ out: break out } - fmt.Println(msg) + switch htlcPkt := msg.(type) { + // TODO(roasbeef): timeouts + case *lnwire.HTLCAddRequest: + // We just received an add request from an + // upstream peer, so we add it to our state + // machine, then add the HTLC to our "settle" + // list in the event that we know the pre-image + channel.AddHTLC(htlcPkt, true) + + rHash := htlcPkt.RedemptionHashes[0] + if invoice, found := p.server.invoices.lookupInvoice(rHash); found { + // TODO(roasbeef): check value + // * onion layer strip should also be before invoice lookup + pre := invoice.paymentPreimage + state.htlcsToSettle = append(state.htlcsToSettle, pre) + } + case *lnwire.HTLCSettleRequest: + // TODO(roasbeef): this assumes no "multi-sig" + pre := htlcPkt.RedemptionProofs[0] + if _, err := channel.SettleHTLC(pre, true); err != nil { + peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) + continue + } + case *lnwire.CommitSignature: + // We just received a new update to our local + // commitment chain, validate this new + // commitment, closing the link if invalid. + // TODO(roasbeef): use uint64 for indexes? + logIndex := uint32(htlcPkt.LogIndex) + sig := htlcPkt.CommitSig.Serialize() + if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil { + peerLog.Errorf("unable to accept new commitment: %v", err) + continue + } + + // If we didn't initiate this state transition, + // then we'll update the remote commitment + // chain with a new commitment. Otherwise, we + // can reset the pending bit as we received the + // signature we were expecting. + if !state.sigPending { + // TODO(roasbeef): may not always want to *immediatly* + // sign next commitment. + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + continue + } + } else { + state.sigPending = false + } + + // Finally, since we just accepted a new state, + // send the remote peer a revocation for our + // prior state. + nextRevocation, err := channel.RevokeCurrentCommitment() + if err != nil { + peerLog.Errorf("unable to revoke current commitment: %v", err) + continue + } + p.queueMsg(nextRevocation, nil) + case *lnwire.CommitRevocation: + // We've received a revocation from the remote + // chain, if valid, this moves the remote chain + // forward, and expands our revocation window. + htlcsToForward, err := channel.ReceiveRevocation(htlcPkt) + if err != nil { + peerLog.Errorf("unable to accept revocation: %v", err) + continue + } + // TODO(roasbeef): send the locked-in HTLC's + // over the plex chan to the switch. + peerLog.Debugf("htlcs ready to forward: %v", + spew.Sdump(htlcsToForward)) + + // A full state transition has been completed, + // if we don't need to settle any HTLC's, then + // we're done. + if len(state.htlcsToSettle) == 0 { + continue + } + + // Otherwise, we have some pending HTLC's which + // we can pull funds from, thereby settling. + peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle)) + for _, pre := range state.htlcsToSettle { + // Add each HTLC settle update to the + // channel's state update log, also + // sending the log update to the remote + // party. + logIndex, err := channel.SettleHTLC(pre, false) + if err != nil { + peerLog.Errorf("unable to settle htlc: %v", err) + continue + } + settleMsg := &lnwire.HTLCSettleRequest{ + ChannelPoint: state.chanPoint, + HTLCKey: lnwire.HTLCKey(logIndex), + RedemptionProofs: [][32]byte{pre}, + } + p.queueMsg(settleMsg, nil) + } + + // With all the settle updates added to the + // local and remote HTLC logs, initiate a state + // transition by updating the remote commitment + // chain. + if err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + continue + } + state.htlcsToSettle = nil + } case <-p.quit: break out } @@ -689,4 +900,30 @@ out: p.wg.Done() } +// updateCommitTx signs, then sends an update to the remote peer adding a new +// commitment to their commitment chain which includes all the latest updates +// we've received+processed up to this point. +func (p *peer) updateCommitTx(state *commitmentState) error { + sigTheirs, logIndexTheirs, err := state.channel.SignNextCommitment() + if err != nil { + return fmt.Errorf("unable to sign next commitment: %v", err) + } + + parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256()) + if err != nil { + return fmt.Errorf("unable to parse sig: %v", err) + } + + commitSig := &lnwire.CommitSignature{ + ChannelPoint: state.chanPoint, + CommitSig: parsedSig, + LogIndex: uint64(logIndexTheirs), + } + p.queueMsg(commitSig, nil) + + state.sigPending = true + + return nil +} + // TODO(roasbeef): make all start/stop mutexes a CAS