peer: we now load active channel during startup, not creation

This commit is contained in:
Olaoluwa Osuntokun 2017-04-23 19:23:15 -07:00
parent fe3c90362e
commit adce64e21c
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2

116
peer.go

@ -190,67 +190,9 @@ func newPeer(conn net.Conn, connReq *connmgr.ConnReq, server *server,
quit: make(chan struct{}), quit: make(chan struct{}),
} }
// Fetch and then load all the active channels we have with this
// remote peer from the database.
activeChans, err := server.chanDB.FetchOpenChannels(p.addr.IdentityKey)
if err != nil {
peerLog.Errorf("unable to fetch active chans "+
"for peer %v: %v", p, err)
return nil, err
}
peerLog.Debugf("Loaded %v active channels from database with peerID(%v)",
len(activeChans), p.id)
if err := p.loadActiveChannels(activeChans); err != nil {
return nil, err
}
return p, nil return p, nil
} }
// loadActiveChannels creates indexes within the peer for tracking all active
// channels returned by the database.
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
for _, dbChan := range chans {
if dbChan.IsPending {
continue
}
lnChan, err := lnwallet.NewLightningChannel(p.server.lnwallet.Signer,
p.server.chainNotifier, dbChan)
if err != nil {
return err
}
chanPoint := *dbChan.ChanID
chanID := lnwire.NewChanIDFromOutPoint(&chanPoint)
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint)
p.server.breachArbiter.newContracts <- lnChan
// Register this new channel link with the HTLC Switch. This is
// necessary to properly route multi-hop payments, and forward
// new payments triggered by RPC clients.
downstreamLink := make(chan *htlcPacket, 10)
plexChan := p.server.htlcSwitch.RegisterLink(p,
dbChan.Snapshot(), downstreamLink)
upstreamLink := make(chan lnwire.Message, 10)
p.htlcManMtx.Lock()
p.htlcManagers[chanID] = upstreamLink
p.htlcManMtx.Unlock()
p.wg.Add(1)
go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink)
}
return nil
}
// Start starts all helper goroutines the peer needs for normal operations. In // Start starts all helper goroutines the peer needs for normal operations. In
// the case this peer has already been started, then this function is a loop. // the case this peer has already been started, then this function is a loop.
func (p *peer) Start() error { func (p *peer) Start() error {
@ -310,6 +252,24 @@ func (p *peer) Start() error {
go p.channelManager() go p.channelManager()
go p.pingHandler() go p.pingHandler()
// Fetch and then load all the active channels we have with this remote
// peer from the database.
activeChans, err := p.server.chanDB.FetchOpenChannels(p.addr.IdentityKey)
if err != nil {
peerLog.Errorf("unable to fetch active chans "+
"for peer %v: %v", p, err)
return err
}
// Next, load all the active channels we have with this peer,
// registering them with the switch and launching the necessary
// goroutines required to operate them.
peerLog.Debugf("Loaded %v active channels from database with "+
"peerID(%v)", len(activeChans), p.id)
if err := p.loadActiveChannels(activeChans); err != nil {
return fmt.Errorf("unable to load channels: %v", err)
}
return nil return nil
} }
@ -321,13 +281,53 @@ func (p *peer) Stop() error {
if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) { if !atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
return nil return nil
} }
// loadActiveChannels creates indexes within the peer for tracking all active
// channels returned by the database.
func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
for _, dbChan := range chans {
// If the channel isn't yet open, then we don't need to process
// it any further.
if dbChan.IsPending {
continue
}
// Ensure that the TCP connection is properly closed before continuing. // Ensure that the TCP connection is properly closed before continuing.
p.conn.Close() p.conn.Close()
lnChan, err := lnwallet.NewLightningChannel(p.server.lnwallet.Signer,
p.server.chainNotifier, dbChan)
if err != nil {
return err
}
// Signal all worker goroutines to gracefully exit. // Signal all worker goroutines to gracefully exit.
close(p.quit) close(p.quit)
p.wg.Wait() p.wg.Wait()
chanPoint := *dbChan.ChanID
chanID := lnwire.NewChanIDFromOutPoint(&chanPoint)
p.activeChanMtx.Lock()
p.activeChannels[chanID] = lnChan
p.activeChanMtx.Unlock()
peerLog.Infof("peerID(%v) loaded ChannelPoint(%v)", p.id, chanPoint)
p.server.breachArbiter.newContracts <- lnChan
// Register this new channel link with the HTLC Switch. This is
// necessary to properly route multi-hop payments, and forward
// new payments triggered by RPC clients.
downstreamLink := make(chan *htlcPacket, 10)
plexChan := p.server.htlcSwitch.RegisterLink(p,
dbChan.Snapshot(), downstreamLink)
upstreamLink := make(chan lnwire.Message, 10)
p.htlcManMtx.Lock()
p.htlcManagers[chanID] = upstreamLink
p.htlcManMtx.Unlock()
p.wg.Add(1)
go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink)
}
return nil return nil
} }