package btcdnotify import ( "container/heap" "fmt" "sync" "sync/atomic" "time" "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcrpcclient" "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainntfs" ) // BtcdNotifier... type BtcdNotifier struct { started int32 // To be used atomically stopped int32 // To be used atomically chainConn *btcrpcclient.Client notificationRegistry chan interface{} // TODO(roasbeef): make map point to slices? Would allow for multiple // clients to listen for same spend. Would we ever need this? spendNotifications map[wire.OutPoint]*spendNotification confNotifications map[wire.ShaHash]*confirmationsNotification confHeap *confirmationHeap connectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn relevantTxs chan *btcutil.Tx wg sync.WaitGroup quit chan struct{} } var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // NewBtcdNotifier... // TODO(roasbeef): // * when asked for spent, request via client //func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer, // chainConn *chain.RPCClient) (*BtcdNotifier, error) { func NewBtcdNotifier(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { notifier := &BtcdNotifier{ notificationRegistry: make(chan interface{}), spendNotifications: make(map[wire.OutPoint]*spendNotification), confNotifications: make(map[wire.ShaHash]*confirmationsNotification), confHeap: newConfirmationHeap(), connectedBlockHashes: make(chan *blockNtfn, 20), disconnectedBlockHashes: make(chan *blockNtfn, 20), relevantTxs: make(chan *btcutil.Tx, 100), quit: make(chan struct{}), } ntfnCallbacks := &btcrpcclient.NotificationHandlers{ OnBlockConnected: notifier.onBlockConnected, OnBlockDisconnected: notifier.onBlockDisconnected, OnRedeemingTx: notifier.onRedeemingTx, } config.DisableConnectOnNew = true config.DisableAutoReconnect = false chainConn, err := btcrpcclient.New(config, ntfnCallbacks) if err != nil { return nil, err } notifier.chainConn = chainConn return notifier, nil } // Start... func (b *BtcdNotifier) Start() error { // Already started? if atomic.AddInt32(&b.started, 1) != 1 { return nil } if err := b.chainConn.Connect(20); err != nil { return err } if err := b.chainConn.NotifyBlocks(); err != nil { return err } b.wg.Add(1) go b.notificationDispatcher() return nil } // Stop... func (b *BtcdNotifier) Stop() error { // Already shutting down? if atomic.AddInt32(&b.stopped, 1) != 1 { return nil } b.chainConn.Shutdown() close(b.quit) b.wg.Wait() return nil } // connectedBlock... type blockNtfn struct { sha *wire.ShaHash height int32 } // onBlockConnected... func (b *BtcdNotifier) onBlockConnected(hash *wire.ShaHash, height int32, t time.Time) { select { case b.connectedBlockHashes <- &blockNtfn{hash, height}: case <-b.quit: } } // onBlockDisconnected... func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t time.Time) { b.onBlockDisconnected(hash, height, t) } // onRedeemingTx... func (b *BtcdNotifier) onRedeemingTx(transaction *btcutil.Tx, details *btcjson.BlockDetails) { select { case b.relevantTxs <- transaction: case <-b.quit: } } // notificationDispatcher... func (b *BtcdNotifier) notificationDispatcher() { out: for { select { case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { case *spendNotification: b.spendNotifications[*msg.targetOutpoint] = msg case *confirmationsNotification: b.confNotifications[*msg.txid] = msg } case staleBlockHash := <-b.disconnectedBlockHashes: // TODO(roasbeef): re-orgs // * second channel to notify of confirmation decrementing // re-org? // * notify of negative confirmations fmt.Println(staleBlockHash) case connectedBlock := <-b.connectedBlockHashes: newBlock, err := b.chainConn.GetBlock(connectedBlock.sha) if err != nil { continue } newHeight := connectedBlock.height for _, tx := range newBlock.Transactions() { // Check if the inclusion of this transaction // within a block by itself triggers a block // confirmation threshold, if so send a // notification. Otherwise, place the notification // on a heap to be triggered in the future once // additional confirmations are attained. txSha := tx.Sha() b.checkConfirmationTrigger(txSha, newHeight) } // A new block has been connected to the main // chain. Send out any N confirmation notifications // which may have been triggered by this new block. b.notifyConfs(newHeight) case newSpend := <-b.relevantTxs: // First, check if this transaction spends an output // that has an existing spend notification for it. for i, txIn := range newSpend.MsgTx().TxIn { prevOut := txIn.PreviousOutPoint // If this transaction indeed does spend an // output which we have a registered notification // for, then create a spend summary, finally // sending off the details to the notification // subscriber. if ntfn, ok := b.spendNotifications[prevOut]; ok { spenderSha := newSpend.Sha() spendDetails := &chainntnfs.SpendDetail{ SpentOutPoint: ntfn.targetOutpoint, SpenderTxHash: spenderSha, // TODO(roasbeef): copy tx? SpendingTx: newSpend.MsgTx(), SpenderInputIndex: uint32(i), } ntfn.spendChan <- spendDetails delete(b.spendNotifications, prevOut) } } case <-b.quit: break out } } b.wg.Done() } // notifyConfs... func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) { // If the heap is empty, we have nothing to do. if b.confHeap.Len() == 0 { return } // Traverse our confirmation heap. The heap is a // min-heap, so the confirmation notification which requires // the smallest block-height will always be at the top // of the heap. If a confirmation notification is eligible // for triggering, then fire it off, and check if another // is eligible until there are no more eligible entries. nextConf := heap.Pop(b.confHeap).(*confEntry) for nextConf.triggerHeight <= uint32(newBlockHeight) { nextConf.finConf <- struct{}{} if b.confHeap.Len() == 0 { return } nextConf = heap.Pop(b.confHeap).(*confEntry) } heap.Push(b.confHeap, nextConf) } // checkConfirmationTrigger... // TODO(roasbeef): perheps lookup, then track by inputs instead? func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, blockHeight int32) { // If a confirmation notification has been registered // for this txid, then either trigger a notification // event if only a single confirmation notification was // requested, or place the notification on the // confirmation heap for future usage. if confNtfn, ok := b.confNotifications[*txSha]; ok { delete(b.confNotifications, *txSha) if confNtfn.numConfirmations == 1 { confNtfn.finConf <- struct{}{} return } // The registered notification requires more // than one confirmation before triggering. So // we create a heapConf entry for this notification. // The heapConf allows us to easily keep track of // which notification(s) we should fire off with // each incoming block. confNtfn.initialConfirmHeight = uint32(blockHeight) finalConfHeight := uint32(confNtfn.initialConfirmHeight + confNtfn.numConfirmations - 1) heapEntry := &confEntry{ confNtfn, finalConfHeight, } heap.Push(b.confHeap, heapEntry) } } // spendNotification.... type spendNotification struct { targetOutpoint *wire.OutPoint spendChan chan *chainntnfs.SpendDetail } // RegisterSpendNotification... // NOTE: eventChan MUST be buffered func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) { if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil { return nil, err } ntfn := &spendNotification{ targetOutpoint: outpoint, spendChan: make(chan *chainntnfs.SpendDetail, 1), } b.notificationRegistry <- ntfn return &chainntnfs.SpendEvent{ntfn.spendChan}, nil } // confirmationNotification... // TODO(roasbeef): re-org funny business type confirmationsNotification struct { txid *wire.ShaHash initialConfirmHeight uint32 numConfirmations uint32 finConf chan struct{} negativeConf chan int32 } // RegisterConfirmationsNotification... func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash, numConfs uint32) (*chainntnfs.ConfirmationEvent, error) { ntfn := &confirmationsNotification{ txid: txid, numConfirmations: numConfs, finConf: make(chan struct{}, 1), negativeConf: make(chan int32, 1), } b.notificationRegistry <- ntfn return &chainntnfs.ConfirmationEvent{ Confirmed: ntfn.finConf, NegativeConf: ntfn.negativeConf, }, nil }