rpcserver: implement the SubscribeChannelBackups RPC method

This commit is contained in:
Olaoluwa Osuntokun 2019-02-08 19:44:35 -08:00
parent 056decac2a
commit 019ec2df6b
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2

@ -4900,9 +4900,57 @@ func (r *rpcServer) RestoreChannelBackups(ctx context.Context,
func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription, func (r *rpcServer) SubscribeChannelBackups(req *lnrpc.ChannelBackupSubscription,
updateStream lnrpc.Lightning_SubscribeChannelBackupsServer) error { updateStream lnrpc.Lightning_SubscribeChannelBackupsServer) error {
// TODO(roasbeef): hook up to chan notifier // First, we'll subscribe to the primary channel notifier so we can
// obtain events for new opened/closed channels.
chanSubscription, err := r.server.channelNotifier.SubscribeChannelEvents()
if err != nil {
return err
}
// TODO(roasbeef): make chanbackup.SubSwapper defer chanSubscription.Cancel()
for {
select {
// A new event has been sent by the channel notifier, we'll
// assemble, then sling out a new event to the client.
case e := <-chanSubscription.Updates():
// TODO(roasbeef): batch dispatch ntnfs
switch e.(type) {
// We only care about new/closed channels, so we'll
// skip any events for active/inactive channels.
case channelnotifier.ActiveChannelEvent:
continue
case channelnotifier.InactiveChannelEvent:
continue
}
// Now that we know the channel state has changed,
// we'll obtains the current set of single channel
// backups from disk.
chanBackups, err := chanbackup.FetchStaticChanBackups(
r.server.chanDB,
)
if err != nil {
return fmt.Errorf("unable to fetch all "+
"static chan backups: %v", err)
}
// With our backups obtained, we'll pack them into a
// snapshot and send them back to the client.
backupSnapshot, err := r.createBackupSnapshot(
chanBackups,
)
if err != nil {
return err
}
err = updateStream.Send(backupSnapshot)
if err != nil {
return err
}
case <-r.quit:
return nil return nil
}
}
} }