rpc: implement new streaming SubscribeChannelGraph RPC call
This commit implements the new server-side streaming RPC call within the current default implementation of the RPC server. With this, the new functionality can now be used within the integration tests to achieve a greater degree of synchronization in the tests. As a result, we should be able to eliminate many of the sleeps lingering within the tests.
This commit is contained in:
parent
94fa55cca3
commit
24a69c1164
122
rpcserver.go
122
rpcserver.go
@ -262,6 +262,8 @@ func (r *rpcServer) OpenChannel(in *lnrpc.OpenChannelRequest,
|
||||
err error
|
||||
)
|
||||
|
||||
// TODO(roasbeef): also return channel ID?
|
||||
|
||||
// If the node key is set, the we'll parse the raw bytes into a pubkey
|
||||
// object so we can easily manipulate it. If this isn't set, then we
|
||||
// expected the TargetPeerId to be set accordingly.
|
||||
@ -1671,6 +1673,126 @@ func (r *rpcServer) GetNetworkInfo(context.Context, *lnrpc.NetworkInfoRequest) (
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SubscribeChannelGraph launches a streaming RPC that allows the caller to
|
||||
// receive notifications upon any changes the channel graph topology from the
|
||||
// review of the responding node. Events notified include: new nodes coming
|
||||
// online, nodes updating their authenticated attributes, new channels being
|
||||
// advertised, updates in the routing policy for a directional channel edge,
|
||||
// and finally when prior channels are closed on-chain.
|
||||
func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription,
|
||||
updateStream lnrpc.Lightning_SubscribeChannelGraphServer) error {
|
||||
|
||||
// First, we start by subscribing to a new intent to receive
|
||||
// notifications from the channel router.
|
||||
client, err := r.server.chanRouter.SubscribeTopology()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ensure that the resources for the topology update client is cleaned
|
||||
// up once either the server, or client exists.
|
||||
defer client.Cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
// A new update has been sent by the channel router, we'll
|
||||
// marshall it into the form expected by the gRPC client, then
|
||||
// send it off.
|
||||
case topChange, ok := <-client.TopologyChanges:
|
||||
// If the second value from the channel read is nil,
|
||||
// then this means that the channel router is exiting
|
||||
// or the notification client was cancelled. So we'll
|
||||
// exit early.
|
||||
if !ok {
|
||||
return errors.New("sever shutting down")
|
||||
}
|
||||
|
||||
// Convert the struct from the channel router into the
|
||||
// form expected by the gRPC service then send it off
|
||||
// to the client.
|
||||
graphUpdate := marshallTopologyChange(topChange)
|
||||
if err := updateStream.Send(graphUpdate); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// The server is quitting, so we'll exit immediately. Returning
|
||||
// nil will close the clients read end of the stream.
|
||||
case <-r.quit:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// marshallTopologyChange performs a mapping from the topology change sturct
|
||||
// returned by the router to the form of notifications expected by the current
|
||||
// gRPC service.
|
||||
func marshallTopologyChange(topChange *routing.TopologyChange) *lnrpc.GraphTopologyUpdate {
|
||||
|
||||
// encodeKey is a simple helper function that converts a live public
|
||||
// key into a hex-encoded version of the compressed serialization for
|
||||
// the public key.
|
||||
encodeKey := func(k *btcec.PublicKey) string {
|
||||
return hex.EncodeToString(k.SerializeCompressed())
|
||||
}
|
||||
|
||||
nodeUpdates := make([]*lnrpc.NodeUpdate, len(topChange.NodeUpdates))
|
||||
for i, nodeUpdate := range topChange.NodeUpdates {
|
||||
addrs := make([]string, len(nodeUpdate.Addresses))
|
||||
for i, addr := range nodeUpdate.Addresses {
|
||||
addrs[i] = addr.String()
|
||||
}
|
||||
|
||||
nodeUpdates[i] = &lnrpc.NodeUpdate{
|
||||
Addresses: addrs,
|
||||
IdentityKey: encodeKey(nodeUpdate.IdentityKey),
|
||||
GlobalFeatures: nodeUpdate.GlobalFeatures,
|
||||
Alias: nodeUpdate.Alias,
|
||||
}
|
||||
}
|
||||
|
||||
channelUpdates := make([]*lnrpc.ChannelEdgeUpdate, len(topChange.ChannelEdgeUpdates))
|
||||
for i, channelUpdate := range topChange.ChannelEdgeUpdates {
|
||||
channelUpdates[i] = &lnrpc.ChannelEdgeUpdate{
|
||||
ChanId: channelUpdate.ChanID,
|
||||
ChanPoint: &lnrpc.ChannelPoint{
|
||||
FundingTxid: channelUpdate.ChanPoint.Hash[:],
|
||||
OutputIndex: channelUpdate.ChanPoint.Index,
|
||||
},
|
||||
Capacity: int64(channelUpdate.Capacity),
|
||||
RoutingPolicy: &lnrpc.RoutingPolicy{
|
||||
TimeLockDelta: uint32(channelUpdate.TimeLockDelta),
|
||||
MinHtlc: int64(channelUpdate.MinHTLC),
|
||||
FeeBaseMsat: int64(channelUpdate.BaseFee),
|
||||
FeeRateMilliMsat: int64(channelUpdate.FeeRate),
|
||||
},
|
||||
AdvertisingNode: encodeKey(channelUpdate.AdvertisingNode),
|
||||
ConnectingNode: encodeKey(channelUpdate.ConnectingNode),
|
||||
}
|
||||
}
|
||||
|
||||
closedChans := make([]*lnrpc.ClosedChannelUpdate, len(topChange.ClosedChannels))
|
||||
for i, closedChan := range topChange.ClosedChannels {
|
||||
closedChans[i] = &lnrpc.ClosedChannelUpdate{
|
||||
ChanId: closedChan.ChanID,
|
||||
Capacity: int64(closedChan.Capacity),
|
||||
ClosedHeight: closedChan.ClosedHeight,
|
||||
ChanPoint: &lnrpc.ChannelPoint{
|
||||
FundingTxid: closedChan.ChanPoint.Hash[:],
|
||||
OutputIndex: closedChan.ChanPoint.Index,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
return &lnrpc.GraphTopologyUpdate{
|
||||
NodeUpdates: nodeUpdates,
|
||||
ChannelUpdates: channelUpdates,
|
||||
ClosedChans: closedChans,
|
||||
}
|
||||
}
|
||||
|
||||
// ListPayments returns a list of all outgoing payments.
|
||||
func (r *rpcServer) ListPayments(context.Context,
|
||||
*lnrpc.ListPaymentsRequest) (*lnrpc.ListPaymentsResponse, error) {
|
||||
|
Loading…
Reference in New Issue
Block a user