cmd/chihaya: refactor root run command

This change refactors a bunch of the state of execution into its own
object. It also attempts to simplify stopping and adjusts some other
packages to integrate with the stopper interface.

Fixes #309.
This commit is contained in:
Jimmy Zelinskie
2017-04-29 22:29:27 -04:00
parent 20d1cbf537
commit ea0dba3a3d
5 changed files with 330 additions and 312 deletions

View File

@@ -64,18 +64,20 @@ func New(cfg Config) (storage.PeerStore, error) {
}
ps := &peerStore{
shards: make([]*peerShard, shardCount*2),
closed: make(chan struct{}),
shards: make([]*peerShard, shardCount*2),
closing: make(chan struct{}),
}
for i := 0; i < shardCount*2; i++ {
ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
for {
select {
case <-ps.closed:
case <-ps.closing:
return
case <-time.After(cfg.GarbageCollectionInterval):
before := time.Now().Add(-cfg.PeerLifetime)
@@ -102,19 +104,20 @@ type swarm struct {
}
type peerStore struct {
shards []*peerShard
closed chan struct{}
shards []*peerShard
closing chan struct{}
wg sync.WaitGroup
}
var _ storage.PeerStore = &peerStore{}
func (s *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, af bittorrent.AddressFamily) uint32 {
// There are twice the amount of shards specified by the user, the first
// half is dedicated to IPv4 swarms and the second half is dedicated to
// IPv6 swarms.
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(s.shards)) / 2)
idx := binary.BigEndian.Uint32(infoHash[:4]) % (uint32(len(ps.shards)) / 2)
if af == bittorrent.IPv6 {
idx += uint32(len(s.shards) / 2)
idx += uint32(len(ps.shards) / 2)
}
return idx
}
@@ -146,16 +149,16 @@ func decodePeerKey(pk serializedPeer) bittorrent.Peer {
return peer
}
func (s *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -172,16 +175,16 @@ func (s *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
return nil
}
func (s *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -205,16 +208,16 @@ func (s *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) erro
return nil
}
func (s *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -231,16 +234,16 @@ func (s *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
return nil
}
func (s *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -264,16 +267,16 @@ func (s *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) err
return nil
}
func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
pk := newPeerKey(p)
shard := s.shards[s.shardIndex(ih, p.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, p.IP.AddressFamily)]
shard.Lock()
if _, ok := shard.swarms[ih]; !ok {
@@ -292,14 +295,14 @@ func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) e
return nil
}
func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
func (ps *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
shard := s.shards[s.shardIndex(ih, announcer.IP.AddressFamily)]
shard := ps.shards[ps.shardIndex(ih, announcer.IP.AddressFamily)]
shard.RLock()
if _, ok := shard.swarms[ih]; !ok {
@@ -354,15 +357,15 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i
return
}
func (s *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent.AddressFamily) (resp bittorrent.Scrape) {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
resp.InfoHash = ih
shard := s.shards[s.shardIndex(ih, addressFamily)]
shard := ps.shards[ps.shardIndex(ih, addressFamily)]
shard.RLock()
if _, ok := shard.swarms[ih]; !ok {
@@ -382,9 +385,9 @@ func (s *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, addressFamily bittorrent
//
// This function must be able to execute while other methods on this interface
// are being executed in parallel.
func (s *peerStore) collectGarbage(cutoff time.Time) error {
func (ps *peerStore) collectGarbage(cutoff time.Time) error {
select {
case <-s.closed:
case <-ps.closing:
panic("attempted to interact with stopped memory store")
default:
}
@@ -392,7 +395,7 @@ func (s *peerStore) collectGarbage(cutoff time.Time) error {
var ihDelta float64
cutoffUnix := cutoff.UnixNano()
start := time.Now()
for _, shard := range s.shards {
for _, shard := range ps.shards {
shard.RLock()
var infohashes []bittorrent.InfoHash
for ih := range shard.swarms {
@@ -440,16 +443,21 @@ func (s *peerStore) collectGarbage(cutoff time.Time) error {
return nil
}
func (s *peerStore) Stop() <-chan error {
toReturn := make(chan error)
func (ps *peerStore) Stop() <-chan error {
c := make(chan error)
go func() {
shards := make([]*peerShard, len(s.shards))
for i := 0; i < len(s.shards); i++ {
close(ps.closing)
ps.wg.Wait()
// Explicitly deallocate our storage.
shards := make([]*peerShard, len(ps.shards))
for i := 0; i < len(ps.shards); i++ {
shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)}
}
s.shards = shards
close(s.closed)
close(toReturn)
ps.shards = shards
close(c)
}()
return toReturn
return c
}