327768f4ad
Use [33]byte for graph vertex representation. Delete unneeded stuff: 1. DeepEqual for graph comparison 2. EdgePath 3. 2-thread BFS 4. Table transfer messages and neighborhood radius 5. Beacons Refactor: 1. Change ID to Vertex 2. Test use table driven approach 3. Add comments 4. Make graph internal representation private 5. Use wire.OutPoint as EdgeId 6. Decouple routing messages from routing implementation 7. Delete Async methods 8. Delete unneeded channels and priority buffer from manager 9. Delete unneeded interfaces in internal graph realisation 10. Renamed ID to Vertex
470 lines
12 KiB
Go
470 lines
12 KiB
Go
// Copyright (c) 2016 Bitfury Group Limited
|
|
// Distributed under the MIT software license, see the accompanying
|
|
// file LICENSE or http://www.opensource.org/licenses/mit-license.php
|
|
|
|
package routing
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/lightningnetwork/lnd/routing/rt"
|
|
"github.com/lightningnetwork/lnd/routing/rt/graph"
|
|
"github.com/lightningnetwork/lnd/lnwire"
|
|
"github.com/roasbeef/btcd/wire"
|
|
)
|
|
|
|
func channelOperationsFromRT(r *rt.RoutingTable) []lnwire.ChannelOperation {
|
|
channels := r.AllChannels()
|
|
chOps := make([]lnwire.ChannelOperation, len(channels))
|
|
for i:=0; i<len(channels); i++ {
|
|
var info *graph.ChannelInfo
|
|
if channels[i].Info != nil {
|
|
info = channels[i].Info
|
|
} else {
|
|
info = new(graph.ChannelInfo)
|
|
}
|
|
chOp := lnwire.ChannelOperation{
|
|
NodePubKey1: channels[i].Src.ToByte33(),
|
|
NodePubKey2: channels[i].Tgt.ToByte33(),
|
|
ChannelId: (*wire.OutPoint)(&channels[i].Id),
|
|
Capacity: info.Cpt,
|
|
Weight: info.Wgt,
|
|
Operation: byte(rt.AddChannelOP),
|
|
}
|
|
chOps[i] = chOp
|
|
}
|
|
return chOps
|
|
}
|
|
|
|
func channelOperationsFromDiffBuff(r rt.DifferenceBuffer) []lnwire.ChannelOperation {
|
|
chOps := make([]lnwire.ChannelOperation, len(r))
|
|
for i:=0; i<len(r); i++ {
|
|
var info *graph.ChannelInfo
|
|
if r[i].Info != nil {
|
|
info = r[i].Info
|
|
} else {
|
|
info = new(graph.ChannelInfo)
|
|
}
|
|
chOp := lnwire.ChannelOperation{
|
|
NodePubKey1: r[i].Src.ToByte33(),
|
|
NodePubKey2: r[i].Tgt.ToByte33(),
|
|
ChannelId: (*wire.OutPoint)(&r[i].Id),
|
|
Capacity: info.Cpt,
|
|
Weight: info.Wgt,
|
|
Operation: byte(r[i].Operation),
|
|
}
|
|
chOps[i] = chOp
|
|
}
|
|
return chOps
|
|
}
|
|
|
|
func rtFromChannelOperations(chOps []lnwire.ChannelOperation) *rt.RoutingTable {
|
|
r := rt.NewRoutingTable()
|
|
for i := 0; i<len(chOps); i++{
|
|
r.AddChannel(
|
|
graph.NewVertex(chOps[i].NodePubKey1[:]),
|
|
graph.NewVertex(chOps[i].NodePubKey2[:]),
|
|
graph.EdgeID(*chOps[i].ChannelId),
|
|
&graph.ChannelInfo{
|
|
Cpt: chOps[i].Capacity,
|
|
Wgt: chOps[i].Weight,
|
|
},
|
|
)
|
|
}
|
|
return r
|
|
}
|
|
|
|
func diffBuffFromChannelOperations(chOps []lnwire.ChannelOperation) *rt.DifferenceBuffer {
|
|
d := rt.NewDifferenceBuffer()
|
|
for i := 0; i<len(chOps); i++ {
|
|
op := rt.NewChannelOperation(
|
|
graph.NewVertex(chOps[i].NodePubKey1[:]),
|
|
graph.NewVertex(chOps[i].NodePubKey2[:]),
|
|
graph.EdgeID(*chOps[i].ChannelId),
|
|
&graph.ChannelInfo{
|
|
Cpt: chOps[i].Capacity,
|
|
Wgt: chOps[i].Weight,
|
|
},
|
|
rt.OperationType(chOps[i].Operation),
|
|
)
|
|
*d = append(*d, op)
|
|
}
|
|
return d
|
|
}
|
|
|
|
// RoutingMessage is a wrapper around lnwire.Message which
|
|
// includes sender and receiver.
|
|
type RoutingMessage struct {
|
|
SenderID graph.Vertex
|
|
ReceiverID graph.Vertex
|
|
Msg lnwire.Message
|
|
}
|
|
|
|
type addChannelCmd struct {
|
|
Id1, Id2 graph.Vertex
|
|
TxID graph.EdgeID
|
|
Info *graph.ChannelInfo
|
|
err chan error
|
|
}
|
|
|
|
type removeChannelCmd struct {
|
|
Id1, Id2 graph.Vertex
|
|
TxID graph.EdgeID
|
|
err chan error
|
|
}
|
|
|
|
type hasChannelCmd struct {
|
|
Id1, Id2 graph.Vertex
|
|
TxID graph.EdgeID
|
|
rez chan bool
|
|
err chan error
|
|
}
|
|
|
|
type openChannelCmd struct {
|
|
Id graph.Vertex
|
|
TxID graph.EdgeID
|
|
info *graph.ChannelInfo
|
|
err chan error
|
|
}
|
|
|
|
type findPathCmd struct {
|
|
Id graph.Vertex
|
|
rez chan []graph.Vertex
|
|
err chan error
|
|
}
|
|
|
|
type findKShortestPathsCmd struct {
|
|
Id graph.Vertex
|
|
k int
|
|
rez chan [][]graph.Vertex
|
|
err chan error
|
|
}
|
|
|
|
type getRTCopyCmd struct {
|
|
rez chan *rt.RoutingTable
|
|
}
|
|
|
|
type NeighborState int
|
|
|
|
const (
|
|
StateINIT NeighborState = 0
|
|
StateACK NeighborState = 1
|
|
StateWAIT NeighborState = 2
|
|
)
|
|
|
|
type neighborDescription struct {
|
|
Id graph.Vertex
|
|
DiffBuff *rt.DifferenceBuffer
|
|
State NeighborState
|
|
}
|
|
|
|
// RoutingConfig contains configuration information for RoutingManager.
|
|
type RoutingConfig struct {
|
|
// SendMessage is used by the routing manager to send a
|
|
// message to a direct neighbor.
|
|
SendMessage func([33]byte, lnwire.Message) error
|
|
}
|
|
|
|
// RoutingManager implements routing functionality.
|
|
type RoutingManager struct {
|
|
// Current node.
|
|
Id graph.Vertex
|
|
// Neighbors of the current node.
|
|
neighbors map[graph.Vertex]*neighborDescription
|
|
// Routing table.
|
|
rT *rt.RoutingTable
|
|
// Configuration parameters.
|
|
config *RoutingConfig
|
|
// Channel for input messages
|
|
chIn chan interface{}
|
|
// Closing this channel will stop RoutingManager.
|
|
chQuit chan struct{}
|
|
// When RoutingManager stops this channel is closed.
|
|
ChDone chan struct{}
|
|
}
|
|
|
|
// NewRoutingManager creates new RoutingManager
|
|
// with empyt routing table.
|
|
func NewRoutingManager(Id graph.Vertex, config *RoutingConfig) *RoutingManager {
|
|
return &RoutingManager{
|
|
Id: Id,
|
|
neighbors: make(map[graph.Vertex]*neighborDescription),
|
|
rT: rt.NewRoutingTable(),
|
|
config: config,
|
|
chIn: make(chan interface{}, 10),
|
|
chQuit: make(chan struct{}, 1),
|
|
ChDone: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start - start message loop.
|
|
func (r *RoutingManager) Start() {
|
|
go func() {
|
|
out:
|
|
for {
|
|
// Prioritise quit.
|
|
select {
|
|
case <-r.chQuit:
|
|
break out
|
|
default:
|
|
}
|
|
select {
|
|
case msg := <-r.chIn:
|
|
r.handleMessage(msg)
|
|
case <-r.chQuit:
|
|
break out
|
|
}
|
|
}
|
|
close(r.ChDone)
|
|
}()
|
|
}
|
|
|
|
// Stop stops RoutingManager.
|
|
// Note if some messages were not processed they will be skipped.
|
|
func (r *RoutingManager) Stop() {
|
|
close(r.chQuit)
|
|
}
|
|
|
|
func (r *RoutingManager) handleMessage(msg interface{}) {
|
|
switch msg := msg.(type) {
|
|
case *openChannelCmd:
|
|
r.handleOpenChannelCmdMessage(msg)
|
|
case *addChannelCmd:
|
|
r.handleAddChannelCmdMessage(msg)
|
|
case *hasChannelCmd:
|
|
r.handleHasChannelCmdMessage(msg)
|
|
case *removeChannelCmd:
|
|
r.handleRemoveChannelCmdMessage(msg)
|
|
case *findPathCmd:
|
|
r.handleFindPath(msg)
|
|
case *findKShortestPathsCmd:
|
|
r.handleFindKShortestPaths(msg)
|
|
case *getRTCopyCmd:
|
|
r.handleGetRTCopy(msg)
|
|
case *RoutingMessage:
|
|
r.handleRoutingMessage(msg)
|
|
default:
|
|
fmt.Println("Unknown message type ", msg)
|
|
}
|
|
}
|
|
|
|
// notifyNeighbors checks if there are
|
|
// pending changes for each neighbor and send them.
|
|
// Each neighbor has three states
|
|
// StateINIT - initial state. No messages has been send to this neighbor
|
|
// StateWAIT - node waits fo acknowledgement.
|
|
// StateACK - acknowledgement has been obtained. New updates can be send.
|
|
func (r *RoutingManager) notifyNeighbors() {
|
|
for _, neighbor := range r.neighbors {
|
|
if neighbor.State == StateINIT {
|
|
neighbor.DiffBuff.Clear()
|
|
msg := &lnwire.NeighborHelloMessage{
|
|
Channels: channelOperationsFromRT(r.rT),
|
|
}
|
|
r.sendRoutingMessage(msg, neighbor.Id)
|
|
neighbor.State = StateWAIT
|
|
continue
|
|
}
|
|
if neighbor.State == StateACK && !neighbor.DiffBuff.IsEmpty() {
|
|
msg := &lnwire.NeighborUpdMessage{
|
|
Updates: channelOperationsFromDiffBuff(*neighbor.DiffBuff),
|
|
}
|
|
r.sendRoutingMessage(msg, neighbor.Id)
|
|
neighbor.DiffBuff.Clear()
|
|
neighbor.State = StateWAIT
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddChannel add channel to routing tables.
|
|
func (r *RoutingManager) AddChannel(Id1, Id2 graph.Vertex, TxID graph.EdgeID, info *graph.ChannelInfo) error {
|
|
msg := &addChannelCmd{
|
|
Id1: Id1,
|
|
Id2: Id2,
|
|
TxID: TxID,
|
|
Info: info,
|
|
err: make(chan error, 1),
|
|
}
|
|
r.chIn <- msg
|
|
return <-msg.err
|
|
}
|
|
|
|
// HasChannel checks if there are channel in routing table
|
|
func (r *RoutingManager) HasChannel(Id1, Id2 graph.Vertex, TxID graph.EdgeID) bool {
|
|
msg := &hasChannelCmd{
|
|
Id1: Id1,
|
|
Id2: Id2,
|
|
TxID: TxID,
|
|
rez: make(chan bool, 1),
|
|
err: make(chan error, 1),
|
|
}
|
|
r.chIn <- msg
|
|
return <-msg.rez
|
|
}
|
|
|
|
// RemoveChannel removes channel from routing table
|
|
func (r *RoutingManager) RemoveChannel(Id1, Id2 graph.Vertex, TxID graph.EdgeID) error {
|
|
msg := &removeChannelCmd{
|
|
Id1: Id1,
|
|
Id2: Id2,
|
|
TxID: TxID,
|
|
err: make(chan error, 1),
|
|
}
|
|
r.chIn <- msg
|
|
return <-msg.err
|
|
}
|
|
|
|
// OpenChannel is used to open channel from this node to other node.
|
|
// It adds node to neighbors and starts routing tables exchange.
|
|
func (r *RoutingManager) OpenChannel(Id graph.Vertex, TxID graph.EdgeID, info *graph.ChannelInfo) error {
|
|
msg := &openChannelCmd{
|
|
Id: Id,
|
|
TxID: TxID,
|
|
info: info,
|
|
err: make(chan error, 1),
|
|
}
|
|
r.chIn <- msg
|
|
return <-msg.err
|
|
}
|
|
|
|
// FindPath finds path from this node to some other node
|
|
func (r *RoutingManager) FindPath(destId graph.Vertex) ([]graph.Vertex, error) {
|
|
msg := &findPathCmd{
|
|
Id: destId,
|
|
rez: make(chan []graph.Vertex, 1),
|
|
err: make(chan error, 1),
|
|
}
|
|
r.chIn <- msg
|
|
return <-msg.rez, <-msg.err
|
|
}
|
|
|
|
func (r *RoutingManager) handleFindPath(msg *findPathCmd) {
|
|
path, err := r.rT.ShortestPath(r.Id, msg.Id)
|
|
msg.rez <- path
|
|
msg.err <- err
|
|
}
|
|
|
|
// FindKShortesPaths tries to find k paths from this node to destination.
|
|
// If timeouts returns all found paths
|
|
func (r *RoutingManager) FindKShortestPaths(destId graph.Vertex, k int) ([][]graph.Vertex, error) {
|
|
msg := &findKShortestPathsCmd{
|
|
Id: destId,
|
|
k: k,
|
|
rez: make(chan [][]graph.Vertex, 1),
|
|
err: make(chan error, 1),
|
|
}
|
|
r.chIn <- msg
|
|
return <-msg.rez, <-msg.err
|
|
}
|
|
|
|
// Find k-shortest path.
|
|
func (r *RoutingManager) handleFindKShortestPaths(msg *findKShortestPathsCmd) {
|
|
paths, err := r.rT.KShortestPaths(r.Id, msg.Id, msg.k)
|
|
msg.rez <- paths
|
|
msg.err <- err
|
|
}
|
|
|
|
func (r *RoutingManager) handleGetRTCopy(msg *getRTCopyCmd) {
|
|
msg.rez <- r.rT.Copy()
|
|
}
|
|
|
|
// GetRTCopy - returns copy of current node routing table.
|
|
// Note: difference buffers are not copied.
|
|
func (r *RoutingManager) GetRTCopy() *rt.RoutingTable {
|
|
msg := &getRTCopyCmd{
|
|
rez: make(chan *rt.RoutingTable, 1),
|
|
}
|
|
r.chIn <- msg
|
|
return <-msg.rez
|
|
}
|
|
|
|
func (r *RoutingManager) handleOpenChannelCmdMessage(msg *openChannelCmd) {
|
|
// TODO: validate that channel do not exist
|
|
r.rT.AddChannel(r.Id, msg.Id, msg.TxID, msg.info)
|
|
// TODO(mkl): what to do if neighbot already exists.
|
|
r.neighbors[msg.Id] = &neighborDescription{
|
|
Id: msg.Id,
|
|
DiffBuff: r.rT.NewDiffBuff(),
|
|
State: StateINIT,
|
|
}
|
|
r.notifyNeighbors()
|
|
msg.err <- nil
|
|
}
|
|
|
|
func (r *RoutingManager) handleAddChannelCmdMessage(msg *addChannelCmd) {
|
|
r.rT.AddChannel(msg.Id1, msg.Id2, msg.TxID, msg.Info)
|
|
msg.err <- nil
|
|
}
|
|
|
|
func (r *RoutingManager) handleHasChannelCmdMessage(msg *hasChannelCmd) {
|
|
msg.rez <- r.rT.HasChannel(msg.Id1, msg.Id2, msg.TxID)
|
|
msg.err <- nil
|
|
}
|
|
|
|
func (r *RoutingManager) handleRemoveChannelCmdMessage(msg *removeChannelCmd) {
|
|
r.rT.RemoveChannel(msg.Id1, msg.Id2, msg.TxID)
|
|
r.notifyNeighbors()
|
|
msg.err <- nil
|
|
}
|
|
|
|
func (r *RoutingManager) handleNeighborHelloMessage(msg *lnwire.NeighborHelloMessage, senderID graph.Vertex) {
|
|
// Sometimes we can obtain NeighborHello message from node that is
|
|
// not our neighbor yet. Because channel creation workflow
|
|
// end in different times for nodes.
|
|
t := rtFromChannelOperations(msg.Channels)
|
|
r.rT.AddTable(t)
|
|
r.sendRoutingMessage(&lnwire.NeighborAckMessage{}, senderID)
|
|
r.notifyNeighbors()
|
|
}
|
|
|
|
func (r *RoutingManager) handleNeighborUpdMessage(msg *lnwire.NeighborUpdMessage, senderID graph.Vertex) {
|
|
if _, ok := r.neighbors[senderID]; ok {
|
|
diffBuff := diffBuffFromChannelOperations(msg.Updates)
|
|
r.rT.ApplyDiffBuff(diffBuff)
|
|
r.sendRoutingMessage(&lnwire.NeighborAckMessage{}, senderID)
|
|
r.notifyNeighbors()
|
|
}
|
|
}
|
|
|
|
func (r *RoutingManager) handleNeighborRstMessage(msg *lnwire.NeighborRstMessage, senderID graph.Vertex) {
|
|
if _, ok := r.neighbors[senderID]; ok {
|
|
r.neighbors[senderID].State = StateINIT
|
|
r.notifyNeighbors()
|
|
}
|
|
}
|
|
|
|
func (r *RoutingManager) handleNeighborAckMessage(msg *lnwire.NeighborAckMessage, senderID graph.Vertex) {
|
|
if _, ok := r.neighbors[senderID]; ok && r.neighbors[senderID].State == StateWAIT {
|
|
r.neighbors[senderID].State = StateACK
|
|
// In case there are new updates for node which
|
|
// appears between sending NeighborUpd and NeighborAck
|
|
r.notifyNeighbors()
|
|
}
|
|
}
|
|
|
|
func (r *RoutingManager) handleRoutingMessage(rmsg *RoutingMessage) {
|
|
msg := rmsg.Msg
|
|
switch msg := msg.(type) {
|
|
case *lnwire.NeighborHelloMessage:
|
|
r.handleNeighborHelloMessage(msg, rmsg.SenderID)
|
|
case *lnwire.NeighborUpdMessage:
|
|
r.handleNeighborUpdMessage(msg, rmsg.SenderID)
|
|
case *lnwire.NeighborRstMessage:
|
|
r.handleNeighborRstMessage(msg, rmsg.SenderID)
|
|
case *lnwire.NeighborAckMessage:
|
|
r.handleNeighborAckMessage(msg, rmsg.SenderID)
|
|
default:
|
|
fmt.Printf("Unknown message type %T\n inside RoutingMessage", msg)
|
|
}
|
|
}
|
|
|
|
func (r *RoutingManager) sendRoutingMessage(msg lnwire.Message, receiverId graph.Vertex) {
|
|
r.config.SendMessage(receiverId.ToByte33(), msg)
|
|
}
|
|
|
|
func (r *RoutingManager) ReceiveRoutingMessage(msg lnwire.Message, senderID graph.Vertex) {
|
|
r.chIn <- &RoutingMessage{
|
|
SenderID: senderID,
|
|
ReceiverID: r.Id,
|
|
Msg: msg,
|
|
}
|
|
} |