diff --git a/cmd/trakr/main.go b/cmd/trakr/main.go index 3b34214..de146ca 100644 --- a/cmd/trakr/main.go +++ b/cmd/trakr/main.go @@ -22,10 +22,11 @@ import ( type ConfigFile struct { MainConfigBlock struct { + middleware.Config PrometheusAddr string `yaml:"prometheus_addr"` HTTPConfig httpfrontend.Config `yaml:"http"` UDPConfig udpfrontend.Config `yaml:"udp"` - middleware.Config + Storage memory.Config `yaml:"storage"` } `yaml:"trakr"` } @@ -95,15 +96,18 @@ func main() { } }() - // TODO create PeerStore - // TODO create Hooks - logic := middleware.NewLogic(cfg.Config, nil, nil, nil, nil, nil) + // Force the compiler to enforce memory against the storage interface. + peerStore, err := memory.New(cfg.Storage) if err != nil { return err } - // Force the compiler to enforce memory against the storage interface. - _, _ = memory.New(memory.Config{1}) + // TODO create PeerStore + // TODO create Hooks + logic := middleware.NewLogic(cfg.Config, peerStore, nil, nil, nil, nil) + if err != nil { + return err + } errChan := make(chan error) closedChan := make(chan struct{}) diff --git a/example_config.yaml b/example_config.yaml index 36e769a..3f89637 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -18,8 +18,8 @@ trakr: name: memory config: shards: 1 - gc_interval: 15m - gc_expiration: 15m + gc_interval: 14m + peer_lifetime: 15m prehooks: - name: jwt diff --git a/storage/memory/peer_store.go b/storage/memory/peer_store.go index 8a2eeb2..3f7f9c2 100644 --- a/storage/memory/peer_store.go +++ b/storage/memory/peer_store.go @@ -16,27 +16,41 @@ import ( // Config holds the configuration of a memory PeerStore. type Config struct { - ShardCount int `yaml:"shard_count"` + GarbageCollectionInterval time.Duration `yaml:"gc_interval"` + PeerLifetime time.Duration `yaml:"peer_lifetime"` + ShardCount int `yaml:"shard_count"` } -// New creates a new memory PeerStore. -// -// The PeerStore will have at least one shard. +// New creates a new PeerStore backed by memory. func New(cfg Config) (storage.PeerStore, error) { shardCount := 1 if cfg.ShardCount > 0 { shardCount = cfg.ShardCount } - shards := make([]*peerShard, shardCount*2) - for i := 0; i < shardCount*2; i++ { - shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} + ps := &peerStore{ + shards: make([]*peerShard, shardCount*2), + closed: make(chan struct{}), } - return &peerStore{ - shards: shards, - closed: make(chan struct{}), - }, nil + for i := 0; i < shardCount*2; i++ { + ps.shards[i] = &peerShard{swarms: make(map[bittorrent.InfoHash]swarm)} + } + + go func() { + for { + select { + case <-ps.closed: + return + case <-time.After(cfg.GarbageCollectionInterval): + before := time.Now().Add(-cfg.GarbageCollectionInterval) + log.Println("memory: purging peers with no announces since ", before) + ps.collectGarbage(before) + } + } + }() + + return ps, nil } type serializedPeer string @@ -225,59 +239,6 @@ func (s *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) e return nil } -func (s *peerStore) CollectGarbage(cutoff time.Time) error { - select { - case <-s.closed: - panic("attempted to interact with stopped memory store") - default: - } - - log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String()) - cutoffUnix := cutoff.UnixNano() - for _, shard := range s.shards { - shard.RLock() - var infohashes []bittorrent.InfoHash - for ih := range shard.swarms { - infohashes = append(infohashes, ih) - } - shard.RUnlock() - runtime.Gosched() - - for _, ih := range infohashes { - shard.Lock() - - if _, stillExists := shard.swarms[ih]; !stillExists { - shard.Unlock() - runtime.Gosched() - continue - } - - for pk, mtime := range shard.swarms[ih].leechers { - if mtime <= cutoffUnix { - delete(shard.swarms[ih].leechers, pk) - } - } - - for pk, mtime := range shard.swarms[ih].seeders { - if mtime <= cutoffUnix { - delete(shard.swarms[ih].seeders, pk) - } - } - - if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { - delete(shard.swarms, ih) - } - - shard.Unlock() - runtime.Gosched() - } - - runtime.Gosched() - } - - return nil -} - func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int, announcer bittorrent.Peer) (peers []bittorrent.Peer, err error) { select { case <-s.closed: @@ -340,6 +301,64 @@ func (s *peerStore) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant i return } +// collectGarbage deletes all Peers from the PeerStore which are older than the +// cutoff time. +// +// This function must be able to execute while other methods on this interface +// are being executed in parallel. +func (s *peerStore) collectGarbage(cutoff time.Time) error { + select { + case <-s.closed: + panic("attempted to interact with stopped memory store") + default: + } + + log.Printf("memory: collecting garbage. Cutoff time: %s", cutoff.String()) + cutoffUnix := cutoff.UnixNano() + for _, shard := range s.shards { + shard.RLock() + var infohashes []bittorrent.InfoHash + for ih := range shard.swarms { + infohashes = append(infohashes, ih) + } + shard.RUnlock() + runtime.Gosched() + + for _, ih := range infohashes { + shard.Lock() + + if _, stillExists := shard.swarms[ih]; !stillExists { + shard.Unlock() + runtime.Gosched() + continue + } + + for pk, mtime := range shard.swarms[ih].leechers { + if mtime <= cutoffUnix { + delete(shard.swarms[ih].leechers, pk) + } + } + + for pk, mtime := range shard.swarms[ih].seeders { + if mtime <= cutoffUnix { + delete(shard.swarms[ih].seeders, pk) + } + } + + if len(shard.swarms[ih].seeders)|len(shard.swarms[ih].leechers) == 0 { + delete(shard.swarms, ih) + } + + shard.Unlock() + runtime.Gosched() + } + + runtime.Gosched() + } + + return nil +} + func (s *peerStore) Stop() <-chan error { toReturn := make(chan error) go func() { diff --git a/storage/storage.go b/storage/storage.go index fb24a70..fe0618b 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,8 +1,6 @@ package storage import ( - "time" - "github.com/jzelinskie/trakr/bittorrent" "github.com/jzelinskie/trakr/stopper" ) @@ -53,11 +51,6 @@ type PeerStore interface { // - if seeder is false, should ideally return more seeders than leechers AnnouncePeers(infoHash bittorrent.InfoHash, seeder bool, numWant int, p bittorrent.Peer) (peers []bittorrent.Peer, err error) - // CollectGarbage deletes all Peers from the PeerStore which are older than - // the cutoff time. This function must be able to execute while other methods - // on this interface are being executed in parallel. - CollectGarbage(cutoff time.Time) error - // Stopper is an interface that expects a Stop method to stops the PeerStore. // For more details see the documentation in the stopper package. stopper.Stopper