multi: initial integration of routing module

This commit integrates BitFury's current routing functionality into lnd. The
primary ochestration point for the routing sub-system in the routingMgr. The
routingMgr manages all persistent and volatile state related to routing within
the network.

Newly opened channels, either when the initiator or responder are inserted into
the routing table once the channel is fully open. Once new links are inserted
the routingMgr can then perform path selection in order to locate an "optimal"
path to a target destination.
This commit is contained in:
BitfuryLightning 2016-07-15 07:02:59 -04:00 committed by Olaoluwa Osuntokun
parent fc16159a37
commit f8c851769f
16 changed files with 467 additions and 1 deletions

@ -503,3 +503,23 @@ func sendPaymentCommand(ctx *cli.Context) error {
return nil return nil
} }
var ShowRoutingTableCommand = cli.Command{
Name: "showroutingtable",
Description: "shows routing table for a node",
Action: showRoutingTable,
}
func showRoutingTable(ctx *cli.Context) error {
ctxb := context.Background()
client := getClient(ctx)
req := &lnrpc.ShowRoutingTableRequest{}
resp, err := client.ShowRoutingTable(ctxb, req)
if err != nil {
return err
}
printRespJson(resp)
return nil
}

@ -60,6 +60,7 @@ func main() {
GetInfoCommand, GetInfoCommand,
PendingChannelsCommand, PendingChannelsCommand,
SendPaymentCommand, SendPaymentCommand,
ShowRoutingTableCommand,
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {

@ -10,6 +10,9 @@ import (
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"github.com/BitfuryLightning/tools/rt"
"github.com/BitfuryLightning/tools/rt/graph"
) )
const ( const (
@ -614,6 +617,16 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) {
fndgLog.Infof("FundingOpen: ChannelPoint(%v) with peerID(%v) is now open", fndgLog.Infof("FundingOpen: ChannelPoint(%v) with peerID(%v) is now open",
resCtx.reservation.FundingOutpoint, fmsg.peer.id) resCtx.reservation.FundingOutpoint, fmsg.peer.id)
// ROUTING ADDED
capacity := float64(resCtx.reservation.OurContribution().FundingAmount + resCtx.reservation.TheirContribution().FundingAmount)
fmsg.peer.server.routingMgr.AddChannel(
graph.NewID(fmsg.peer.server.lightningID),
graph.NewID([32]byte(fmsg.peer.lightningID)),
graph.NewEdgeID(resCtx.reservation.FundingOutpoint().String()),
&rt.ChannelInfo{
Cpt:capacity,
},
)
fmsg.peer.newChannels <- openChan fmsg.peer.newChannels <- openChan
} }

@ -17,6 +17,7 @@ service Lightning {
rpc PendingChannels(PendingChannelRequest) returns (PendingChannelResponse); rpc PendingChannels(PendingChannelRequest) returns (PendingChannelResponse);
rpc SendPayment(stream SendRequest) returns (stream SendResponse); rpc SendPayment(stream SendRequest) returns (stream SendResponse);
rpc ShowRoutingTable(ShowRoutingTableRequest) returns (ShowRoutingTableResponse);
} }
message SendRequest { message SendRequest {
@ -214,3 +215,10 @@ message WalletBalanceRequest {
message WalletBalanceResponse { message WalletBalanceResponse {
double balance = 1; double balance = 1;
} }
message ShowRoutingTableRequest {
}
message ShowRoutingTableResponse {
string rt = 1;
}

@ -44,6 +44,14 @@ const (
CmdCommitSignature = uint32(2000) CmdCommitSignature = uint32(2000)
CmdCommitRevocation = uint32(2010) CmdCommitRevocation = uint32(2010)
// Commands for routing
CmdNeighborHelloMessage = uint32(3000)
CmdNeighborUpdMessage = uint32(3010)
CmdNeighborAckMessage = uint32(3020)
CmdNeighborRstMessage = uint32(3030)
CmdRoutingTableRequestMessage = uint32(3040)
CmdRoutingTableTransferMessage = uint32(3050)
// Commands for reporting protocol errors. // Commands for reporting protocol errors.
CmdErrorGeneric = uint32(4000) CmdErrorGeneric = uint32(4000)
) )
@ -94,6 +102,18 @@ func makeEmptyMessage(command uint32) (Message, error) {
msg = &CommitRevocation{} msg = &CommitRevocation{}
case CmdErrorGeneric: case CmdErrorGeneric:
msg = &ErrorGeneric{} msg = &ErrorGeneric{}
case CmdNeighborHelloMessage:
msg = &NeighborHelloMessage{}
case CmdNeighborUpdMessage:
msg = &NeighborUpdMessage{}
case CmdNeighborAckMessage:
msg = &NeighborAckMessage{}
case CmdNeighborRstMessage:
msg = &NeighborRstMessage{}
case CmdRoutingTableRequestMessage:
msg = &RoutingTableRequestMessage{}
case CmdRoutingTableTransferMessage:
msg = &RoutingTableTransferMessage{}
default: default:
return nil, fmt.Errorf("unhandled command [%d]", command) return nil, fmt.Errorf("unhandled command [%d]", command)
} }

@ -0,0 +1,39 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"fmt"
"io"
)
type NeighborAckMessage struct {
RoutingMessageBase
}
func (msg *NeighborAckMessage) String() string {
return fmt.Sprintf("NeighborAckMessage{%v %v}", msg.SenderID, msg.ReceiverID)
}
func (msg *NeighborAckMessage) Command() uint32{
return CmdNeighborAckMessage
}
func (msg *NeighborAckMessage) Encode(w io.Writer, pver uint32) error{
return nil
}
func (msg *NeighborAckMessage) Decode(r io.Reader, pver uint32) error{
return nil
}
func (msg *NeighborAckMessage) MaxPayloadLength(uint32) uint32{
return 0
}
func (msg *NeighborAckMessage) Validate() error{
return nil
}
var _ Message = (*NeighborAckMessage)(nil)

@ -0,0 +1,52 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"io"
"github.com/BitfuryLightning/tools/rt"
"encoding/gob"
"fmt"
)
type NeighborHelloMessage struct{
RoutingMessageBase
RT *rt.RoutingTable
}
func (msg *NeighborHelloMessage) Decode(r io.Reader, pver uint32) error{
decoder := gob.NewDecoder(r)
rt1 := rt.NewRoutingTable()
err := decoder.Decode(rt1.G)
msg.RT = rt1
return err
}
func (msg *NeighborHelloMessage) Encode(w io.Writer, pver uint32) error{
encoder := gob.NewEncoder(w)
err := encoder.Encode(msg.RT.G)
return err
}
func (msg *NeighborHelloMessage) Command() uint32{
return CmdNeighborHelloMessage
}
func (msg *NeighborHelloMessage) MaxPayloadLength(uint32) uint32{
// TODO: Insert some estimations
return 1000000
}
func (msg *NeighborHelloMessage) Validate() error{
// TODO: Add validation
return nil
}
func (msg *NeighborHelloMessage) String() string{
return fmt.Sprintf("NeighborHelloMessage{%v %v %v}", msg.SenderID, msg.ReceiverID, msg.RT)
}
var _ Message = (*NeighborHelloMessage)(nil)

@ -0,0 +1,41 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"testing"
"bytes"
"github.com/BitfuryLightning/tools/rt"
"github.com/BitfuryLightning/tools/rt/graph"
)
func TestNeighborHelloMessageEncodeDecode(t *testing.T){
Id1 := graph.NewID(1)
Id2 := graph.NewID(2)
rt1 := rt.NewRoutingTable()
rt1.AddChannel(Id1, Id2, graph.NewEdgeID("1"), &rt.ChannelInfo{1, 1})
b := new(bytes.Buffer)
msg1 := NeighborHelloMessage{RT:rt1}
err := msg1.Encode(b, 0)
if err != nil{
t.Fatalf("Can't encode message ", err)
}
msg2 := new(NeighborHelloMessage)
err = msg2.Decode(b, 0)
if err != nil{
t.Fatalf("Can't decode message ", err)
}
if msg2.RT == nil{
t.Fatal("After decoding RT should not be nil")
}
if !msg2.RT.HasChannel(Id1, Id2, nil){
t.Errorf("msg2.RT.HasChannel(Id1, Id2) = false, want true")
}
if !msg2.RT.HasChannel(Id2, Id1, nil){
t.Errorf("msg2.RT.HasChannel(Id2, Id1) = false, want true")
}
}

@ -0,0 +1,39 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"fmt"
"io"
)
type NeighborRstMessage struct {
RoutingMessageBase
}
func (msg *NeighborRstMessage) String() string {
return fmt.Sprintf("NeighborRstMessage{%v %v}", msg.SenderID, msg.ReceiverID)
}
func (msg *NeighborRstMessage) Command() uint32{
return CmdNeighborRstMessage
}
func (msg *NeighborRstMessage) Encode(w io.Writer, pver uint32) error{
return nil
}
func (msg *NeighborRstMessage) Decode(r io.Reader, pver uint32) error{
return nil
}
func (msg *NeighborRstMessage) MaxPayloadLength(uint32) uint32{
return 0
}
func (msg *NeighborRstMessage) Validate() error{
return nil
}
var _ Message = (*NeighborRstMessage)(nil)

@ -0,0 +1,51 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"fmt"
"github.com/BitfuryLightning/tools/rt"
"encoding/gob"
"io"
)
type NeighborUpdMessage struct {
RoutingMessageBase
DiffBuff *rt.DifferenceBuffer
}
func (msg *NeighborUpdMessage) Decode(r io.Reader, pver uint32) error{
decoder := gob.NewDecoder(r)
diffBuff := new(rt.DifferenceBuffer)
err := decoder.Decode(diffBuff)
msg.DiffBuff = diffBuff
return err
}
func (msg *NeighborUpdMessage) Encode(w io.Writer, pver uint32) error{
encoder := gob.NewEncoder(w)
err := encoder.Encode(msg.DiffBuff)
return err
}
func (msg *NeighborUpdMessage) Command() uint32{
return CmdNeighborUpdMessage
}
func (msg *NeighborUpdMessage) MaxPayloadLength(uint32) uint32{
// TODO: Insert some estimations
return 1000000
}
func (msg *NeighborUpdMessage) Validate() error{
// TODO: Add validation
return nil
}
func (msg *NeighborUpdMessage) String() string {
return fmt.Sprintf("NeighborUpdMessage{%v %v %v}", msg.SenderID, msg.ReceiverID, *msg.DiffBuff)
}
var _ Message = (*NeighborUpdMessage)(nil)

@ -0,0 +1,28 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"github.com/BitfuryLightning/tools/rt/graph"
)
type RoutingMessageBase struct {
SenderID graph.ID
ReceiverID graph.ID
}
func (msg RoutingMessageBase) GetReceiverID() graph.ID{
return msg.ReceiverID
}
func (msg RoutingMessageBase) GetSenderID() graph.ID{
return msg.SenderID
}
// Interface for all routing messages. All messages have sender and receiver
type RoutingMessage interface {
GetSenderID() graph.ID
GetReceiverID() graph.ID
}

@ -0,0 +1,40 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"fmt"
"io"
)
type RoutingTableRequestMessage struct {
RoutingMessageBase
}
func (msg *RoutingTableRequestMessage) String() string {
return fmt.Sprintf("RoutingTableRequestMessage{%v %v}", msg.SenderID, msg.ReceiverID)
}
func (msg *RoutingTableRequestMessage) Command() uint32{
return CmdRoutingTableRequestMessage
}
func (msg *RoutingTableRequestMessage) Encode(w io.Writer, pver uint32) error{
return nil
}
func (msg *RoutingTableRequestMessage) Decode(r io.Reader, pver uint32) error{
return nil
}
func (msg *RoutingTableRequestMessage) MaxPayloadLength(uint32) uint32{
return 0
}
func (msg *RoutingTableRequestMessage) Validate() error{
return nil
}
var _ Message = (*RoutingTableRequestMessage)(nil)

@ -0,0 +1,51 @@
// Copyright (c) 2016 Bitfury Group Limited
// Distributed under the MIT software license, see the accompanying
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
package lnwire
import (
"io"
"github.com/BitfuryLightning/tools/rt"
"encoding/gob"
"fmt"
)
type RoutingTableTransferMessage struct {
RoutingMessageBase
RT *rt.RoutingTable
}
func (msg *RoutingTableTransferMessage) String() string {
return fmt.Sprintf("RoutingTableTransferMessage{%v %v %v}", msg.SenderID, msg.ReceiverID, msg.RT)
}
func (msg *RoutingTableTransferMessage) Decode(r io.Reader, pver uint32) error{
decoder := gob.NewDecoder(r)
rt1 := rt.NewRoutingTable()
err := decoder.Decode(rt1.G)
msg.RT = rt1
return err
}
func (msg *RoutingTableTransferMessage) Encode(w io.Writer, pver uint32) error{
encoder := gob.NewEncoder(w)
err := encoder.Encode(msg.RT.G)
return err
}
func (msg *RoutingTableTransferMessage) Command() uint32{
return CmdRoutingTableTransferMessage
}
func (msg *RoutingTableTransferMessage) MaxPayloadLength(uint32) uint32{
// TODO: Insert some estimations
return 1000000
}
func (msg *RoutingTableTransferMessage) Validate() error{
// TODO: Add validation
return nil
}
var _ Message = (*RoutingTableTransferMessage)(nil)

10
peer.go

@ -17,6 +17,7 @@ import (
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
) )
var ( var (
@ -377,6 +378,15 @@ out:
case *lnwire.CommitSignature: case *lnwire.CommitSignature:
isChanUpate = true isChanUpate = true
targetChan = msg.ChannelPoint targetChan = msg.ChannelPoint
// ROUTING ADDED
case *lnwire.NeighborAckMessage,
*lnwire.NeighborHelloMessage,
*lnwire.NeighborRstMessage,
*lnwire.NeighborUpdMessage,
*lnwire.RoutingTableRequestMessage,
*lnwire.RoutingTableTransferMessage:
p.server.routingMgr.ChIn <- msg
// TODO(mkl): determine sender and receiver of message
} }
if isChanUpate { if isChanUpate {

@ -482,3 +482,12 @@ func (r *rpcServer) SendPayment(paymentStream lnrpc.Lightning_SendPaymentServer)
return nil return nil
} }
func (r *rpcServer) ShowRoutingTable(ctx context.Context,
in *lnrpc.ShowRoutingTableRequest) (*lnrpc.ShowRoutingTableResponse, error) {
rpcsLog.Debugf("[ShowRoutingTable]")
rtCopy := r.server.routingMgr.GetRTCopy()
return &lnrpc.ShowRoutingTableResponse{
Rt: rtCopy.String(),
}, nil
}

@ -11,10 +11,14 @@ import (
"github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lndc"
"github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil" "github.com/roasbeef/btcutil"
"github.com/BitfuryLightning/tools/routing"
"github.com/BitfuryLightning/tools/rt"
"github.com/BitfuryLightning/tools/rt/graph"
"github.com/roasbeef/btcwallet/waddrmgr" "github.com/roasbeef/btcwallet/waddrmgr"
) )
@ -48,6 +52,9 @@ type server struct {
htlcSwitch *htlcSwitch htlcSwitch *htlcSwitch
invoices *invoiceRegistry invoices *invoiceRegistry
// ROUTING ADDED
routingMgr *routing.RoutingManager
newPeers chan *peer newPeers chan *peer
donePeers chan *peer donePeers chan *peer
queries chan interface{} queries chan interface{}
@ -94,6 +101,10 @@ func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet,
// TODO(roasbeef): remove // TODO(roasbeef): remove
s.invoices.addInvoice(1000*1e8, *debugPre) s.invoices.addInvoice(1000*1e8, *debugPre)
// Create a new routing manager with ourself as the sole node within
// the graph.
s.routingMgr = routing.NewRoutingManager(graph.NewID(s.lightningID), nil)
s.rpcServer = newRpcServer(s) s.rpcServer = newRpcServer(s)
return s, nil return s, nil
@ -115,6 +126,7 @@ func (s *server) Start() {
s.fundingMgr.Start() s.fundingMgr.Start()
s.htlcSwitch.Start() s.htlcSwitch.Start()
s.routingMgr.Start()
s.wg.Add(1) s.wg.Add(1)
go s.queryHandler() go s.queryHandler()
@ -141,6 +153,9 @@ func (s *server) Stop() error {
s.lnwallet.Shutdown() s.lnwallet.Shutdown()
s.fundingMgr.Stop() s.fundingMgr.Stop()
// ROUTING ADDED
s.routingMgr.Stop()
// Signal all the lingering goroutines to quit. // Signal all the lingering goroutines to quit.
close(s.quit) close(s.quit)
s.wg.Wait() s.wg.Wait()
@ -255,6 +270,24 @@ out:
case *openChanReq: case *openChanReq:
s.handleOpenChanReq(msg) s.handleOpenChanReq(msg)
} }
case msg := <-s.routingMgr.ChOut:
msg1 := msg.(lnwire.RoutingMessage)
receiverID := msg1.GetReceiverID().ToByte32()
var targetPeer *peer
for _, peer := range s.peers { // TODO: threadsafe api
// We found the the target
if peer.lightningID == receiverID {
targetPeer = peer
break
}
}
if targetPeer != nil {
fndgLog.Info("Peer found. Sending message")
done := make(chan struct{}, 1)
targetPeer.queueMsg(msg.(lnwire.Message), done)
} else {
srvrLog.Errorf("Can't find peer to send message %v", receiverID)
}
case <-s.quit: case <-s.quit:
break out break out
} }
@ -372,7 +405,18 @@ func (s *server) handleOpenChanReq(req *openChanReq) {
go func() { go func() {
// TODO(roasbeef): server semaphore to restrict num goroutines // TODO(roasbeef): server semaphore to restrict num goroutines
fundingID, err := s.fundingMgr.initFundingWorkflow(targetPeer, req) fundingID, err := s.fundingMgr.initFundingWorkflow(targetPeer, req)
if err == nil {
// ROUTING ADDED
capacity := float64(req.localFundingAmt + req.remoteFundingAmt)
s.routingMgr.AddChannel(
graph.NewID(s.lightningID),
graph.NewID([32]byte(targetPeer.lightningID)),
graph.NewEdgeID(fundingID.String()),
&rt.ChannelInfo{
Cpt: capacity,
},
)
}
req.resp <- &openChanResp{fundingID} req.resp <- &openChanResp{fundingID}
req.err <- err req.err <- err
}() }()