clean the slate!

This commit is contained in:
Jimmy Zelinskie
2016-01-25 00:39:16 -05:00
parent e37f453b34
commit 5c27c960f0
42 changed files with 0 additions and 5076 deletions
-241
View File
@@ -1,241 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"github.com/chihaya/chihaya/stats"
"github.com/chihaya/chihaya/tracker/models"
)
// HandleAnnounce encapsulates all of the logic of handling a BitTorrent
// client's Announce without being coupled to any transport protocol.
func (tkr *Tracker) HandleAnnounce(ann *models.Announce, w Writer) (err error) {
if tkr.Config.ClientWhitelistEnabled {
if err = tkr.ClientApproved(ann.ClientID()); err != nil {
return err
}
}
if tkr.Config.JWKSetURI != "" {
err := tkr.validateJWT(ann.JWT, ann.Infohash)
if err != nil {
return err
}
}
torrent, err := tkr.FindTorrent(ann.Infohash)
if err == models.ErrTorrentDNE && tkr.Config.CreateOnAnnounce {
torrent = &models.Torrent{
Infohash: ann.Infohash,
Seeders: models.NewPeerMap(true, tkr.Config),
Leechers: models.NewPeerMap(false, tkr.Config),
}
tkr.PutTorrent(torrent)
stats.RecordEvent(stats.NewTorrent)
} else if err != nil {
return err
}
ann.BuildPeer(torrent)
_, err = tkr.updateSwarm(ann)
if err != nil {
return err
}
_, err = tkr.handleEvent(ann)
if err != nil {
return err
}
if tkr.Config.PurgeInactiveTorrents && torrent.PeerCount() == 0 {
// Rather than deleting the torrent explicitly, let the tracker driver
// ensure there are no race conditions.
tkr.PurgeInactiveTorrent(torrent.Infohash)
stats.RecordEvent(stats.DeletedTorrent)
}
stats.RecordEvent(stats.Announce)
return w.WriteAnnounce(newAnnounceResponse(ann))
}
// updateSwarm handles the changes to a torrent's swarm given an announce.
func (tkr *Tracker) updateSwarm(ann *models.Announce) (created bool, err error) {
var createdv4, createdv6 bool
tkr.TouchTorrent(ann.Torrent.Infohash)
if ann.HasIPv4() {
createdv4, err = tkr.updatePeer(ann, ann.PeerV4)
if err != nil {
return
}
}
if ann.HasIPv6() {
createdv6, err = tkr.updatePeer(ann, ann.PeerV6)
if err != nil {
return
}
}
return createdv4 || createdv6, nil
}
func (tkr *Tracker) updatePeer(ann *models.Announce, peer *models.Peer) (created bool, err error) {
p, t := ann.Peer, ann.Torrent
switch {
case t.Seeders.Contains(p.Key()):
err = tkr.PutSeeder(t.Infohash, p)
if err != nil {
return
}
case t.Leechers.Contains(p.Key()):
err = tkr.PutLeecher(t.Infohash, p)
if err != nil {
return
}
default:
if ann.Left == 0 {
err = tkr.PutSeeder(t.Infohash, p)
if err != nil {
return
}
stats.RecordPeerEvent(stats.NewSeed, p.HasIPv6())
} else {
err = tkr.PutLeecher(t.Infohash, p)
if err != nil {
return
}
stats.RecordPeerEvent(stats.NewLeech, p.HasIPv6())
}
created = true
}
return
}
// handleEvent checks to see whether an announce has an event and if it does,
// properly handles that event.
func (tkr *Tracker) handleEvent(ann *models.Announce) (snatched bool, err error) {
var snatchedv4, snatchedv6 bool
if ann.HasIPv4() {
snatchedv4, err = tkr.handlePeerEvent(ann, ann.PeerV4)
if err != nil {
return
}
}
if ann.HasIPv6() {
snatchedv6, err = tkr.handlePeerEvent(ann, ann.PeerV6)
if err != nil {
return
}
}
if snatchedv4 || snatchedv6 {
err = tkr.IncrementTorrentSnatches(ann.Torrent.Infohash)
if err != nil {
return
}
ann.Torrent.Snatches++
return true, nil
}
return false, nil
}
func (tkr *Tracker) handlePeerEvent(ann *models.Announce, p *models.Peer) (snatched bool, err error) {
p, t := ann.Peer, ann.Torrent
switch {
case ann.Event == "stopped" || ann.Event == "paused":
// updateSwarm checks if the peer is active on the torrent,
// so one of these branches must be followed.
if t.Seeders.Contains(p.Key()) {
err = tkr.DeleteSeeder(t.Infohash, p)
if err != nil {
return
}
stats.RecordPeerEvent(stats.DeletedSeed, p.HasIPv6())
} else if t.Leechers.Contains(p.Key()) {
err = tkr.DeleteLeecher(t.Infohash, p)
if err != nil {
return
}
stats.RecordPeerEvent(stats.DeletedLeech, p.HasIPv6())
}
case t.Leechers.Contains(p.Key()) && (ann.Event == "completed" || ann.Left == 0):
// A leecher has completed or this is the first time we've seen them since
// they've completed.
err = tkr.leecherFinished(t, p)
if err != nil {
return
}
// Only mark as snatched if we receive the completed event.
if ann.Event == "completed" {
snatched = true
}
}
return
}
// leecherFinished moves a peer from the leeching pool to the seeder pool.
func (tkr *Tracker) leecherFinished(t *models.Torrent, p *models.Peer) error {
if err := tkr.DeleteLeecher(t.Infohash, p); err != nil {
return err
}
if err := tkr.PutSeeder(t.Infohash, p); err != nil {
return err
}
stats.RecordPeerEvent(stats.Completed, p.HasIPv6())
return nil
}
func newAnnounceResponse(ann *models.Announce) *models.AnnounceResponse {
seedCount := ann.Torrent.Seeders.Len()
leechCount := ann.Torrent.Leechers.Len()
res := &models.AnnounceResponse{
Announce: ann,
Complete: seedCount,
Incomplete: leechCount,
Interval: ann.Config.Announce.Duration,
MinInterval: ann.Config.MinAnnounce.Duration,
Compact: ann.Compact,
}
if ann.NumWant > 0 && ann.Event != "stopped" && ann.Event != "paused" {
res.IPv4Peers, res.IPv6Peers = getPeers(ann)
if len(res.IPv4Peers)+len(res.IPv6Peers) == 0 {
models.AppendPeer(&res.IPv4Peers, &res.IPv6Peers, ann, ann.Peer)
}
}
return res
}
// getPeers returns lists IPv4 and IPv6 peers on a given torrent sized according
// to the wanted parameter.
func getPeers(ann *models.Announce) (ipv4s, ipv6s models.PeerList) {
ipv4s, ipv6s = models.PeerList{}, models.PeerList{}
if ann.Left == 0 {
// If they're seeding, give them only leechers.
return ann.Torrent.Leechers.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant)
}
// If they're leeching, prioritize giving them seeders.
ipv4s, ipv6s = ann.Torrent.Seeders.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant)
return ann.Torrent.Leechers.AppendPeers(ipv4s, ipv6s, ann, ann.NumWant-len(ipv4s)-len(ipv6s))
}
-147
View File
@@ -1,147 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"time"
oidchttp "github.com/coreos/go-oidc/http"
"github.com/coreos/go-oidc/jose"
"github.com/golang/glog"
)
const jwkTTLFallback = 5 * time.Minute
func (tkr *Tracker) updateJWKSetForever() {
defer tkr.shutdownWG.Done()
client := &http.Client{Timeout: 5 * time.Second}
// Get initial JWK Set.
err := tkr.updateJWKSet(client)
if err != nil {
glog.Warningf("Failed to get initial JWK Set: %s", err)
}
for {
select {
case <-tkr.shuttingDown:
return
case <-time.After(tkr.Config.JWKSetUpdateInterval.Duration):
err = tkr.updateJWKSet(client)
if err != nil {
glog.Warningf("Failed to update JWK Set: %s", err)
}
}
}
}
type jwkSet struct {
Keys []jose.JWK `json:"keys"`
Issuer string `json:"issuer"`
validUntil time.Time
}
func (tkr *Tracker) updateJWKSet(client *http.Client) error {
glog.Info("Attemping to update JWK Set")
resp, err := client.Get(tkr.Config.JWKSetURI)
if err != nil {
return err
}
defer resp.Body.Close()
var jwks jwkSet
err = json.NewDecoder(resp.Body).Decode(&jwks)
if err != nil {
return err
}
if len(jwks.Keys) == 0 {
return errors.New("Failed to find any keys from JWK Set URI")
}
if jwks.Issuer == "" {
return errors.New("Failed to find any issuer from JWK Set URI")
}
ttl, _, _ := oidchttp.Cacheable(resp.Header)
if ttl == 0 {
ttl = jwkTTLFallback
}
jwks.validUntil = time.Now().Add(ttl)
tkr.jwkSet = jwks
glog.Info("Successfully updated JWK Set")
return nil
}
func validateJWTSignature(jwt *jose.JWT, jwkSet *jwkSet) (bool, error) {
for _, jwk := range jwkSet.Keys {
v, err := jose.NewVerifier(jwk)
if err != nil {
return false, err
}
if err := v.Verify(jwt.Signature, []byte(jwt.Data())); err == nil {
return true, nil
}
}
return false, nil
}
func (tkr *Tracker) validateJWT(jwtStr, infohash string) error {
jwkSet := tkr.jwkSet
if time.Now().After(jwkSet.validUntil) {
return fmt.Errorf("Failed verify JWT due to stale JWK Set")
}
jwt, err := jose.ParseJWT(jwtStr)
if err != nil {
return err
}
validated, err := validateJWTSignature(&jwt, &jwkSet)
if err != nil {
return err
} else if !validated {
return errors.New("Failed to verify JWT with all available verifiers")
}
claims, err := jwt.Claims()
if err != nil {
return err
}
if claimedIssuer, ok, err := claims.StringClaim("iss"); claimedIssuer != jwkSet.Issuer || err != nil || !ok {
return errors.New("Failed to validate JWT issuer claim")
}
if claimedAudience, ok, err := claims.StringClaim("aud"); claimedAudience != tkr.Config.JWTAudience || err != nil || !ok {
return errors.New("Failed to validate JWT audience claim")
}
claimedInfohash, ok, err := claims.StringClaim("infohash")
if err != nil || !ok {
return errors.New("Failed to validate JWT infohash claim")
}
unescapedInfohash, err := url.QueryUnescape(claimedInfohash)
if err != nil {
return errors.New("Failed to unescape JWT infohash claim")
}
if unescapedInfohash != infohash {
return errors.New("Failed to match infohash claim with requested infohash")
}
return nil
}
-227
View File
@@ -1,227 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package models implements the common data types used throughout a BitTorrent
// tracker.
package models
import (
"net"
"strings"
"time"
"github.com/chihaya/chihaya/config"
)
var (
// ErrMalformedRequest is returned when a request does not contain the
// required parameters needed to create a model.
ErrMalformedRequest = ClientError("malformed request")
// ErrBadRequest is returned when a request is invalid in the peer's
// current state. For example, announcing a "completed" event while
// not a leecher or a "stopped" event while not active.
ErrBadRequest = ClientError("bad request")
// ErrTorrentDNE is returned when a torrent does not exist.
ErrTorrentDNE = NotFoundError("torrent does not exist")
// ErrClientUnapproved is returned when a clientID is not in the whitelist.
ErrClientUnapproved = ClientError("client is not approved")
)
type ClientError string
type NotFoundError ClientError
type ProtocolError ClientError
func (e ClientError) Error() string { return string(e) }
func (e NotFoundError) Error() string { return string(e) }
func (e ProtocolError) Error() string { return string(e) }
// IsPublicError determines whether an error should be propogated to the client.
func IsPublicError(err error) bool {
_, cl := err.(ClientError)
_, nf := err.(NotFoundError)
_, pc := err.(ProtocolError)
return cl || nf || pc
}
// PeerList represents a list of peers: either seeders or leechers.
type PeerList []Peer
// PeerKey is the key used to uniquely identify a peer in a swarm.
type PeerKey string
// NewPeerKey creates a properly formatted PeerKey.
func NewPeerKey(peerID string, ip net.IP) PeerKey {
return PeerKey(peerID + "//" + ip.String())
}
// IP parses and returns the IP address for a given PeerKey.
func (pk PeerKey) IP() net.IP {
ip := net.ParseIP(strings.Split(string(pk), "//")[1])
if rval := ip.To4(); rval != nil {
return rval
}
return ip
}
// PeerID returns the PeerID section of a PeerKey.
func (pk PeerKey) PeerID() string {
return strings.Split(string(pk), "//")[0]
}
// Endpoint is an IP and port pair.
//
// IP always has length net.IPv4len if IPv4, and net.IPv6len if IPv6.
type Endpoint struct {
IP net.IP `json:"ip"`
Port uint16 `json:"port"`
}
// Peer represents a participant in a BitTorrent swarm.
type Peer struct {
ID string `json:"id"`
Uploaded uint64 `json:"uploaded"`
Downloaded uint64 `json:"downloaded"`
Left uint64 `json:"left"`
LastAnnounce int64 `json:"lastAnnounce"`
Endpoint
}
// HasIPv4 determines if a peer's IP address can be represented as an IPv4
// address.
func (p *Peer) HasIPv4() bool {
return !p.HasIPv6()
}
// HasIPv6 determines if a peer's IP address can be represented as an IPv6
// address.
func (p *Peer) HasIPv6() bool {
return len(p.IP) == net.IPv6len
}
// Key returns a PeerKey for the given peer.
func (p *Peer) Key() PeerKey {
return NewPeerKey(p.ID, p.IP)
}
// Torrent represents a BitTorrent swarm and its metadata.
type Torrent struct {
Infohash string `json:"infohash"`
Snatches uint64 `json:"snatches"`
LastAction int64 `json:"lastAction"`
Seeders *PeerMap `json:"seeders"`
Leechers *PeerMap `json:"leechers"`
}
// PeerCount returns the total number of peers connected on this Torrent.
func (t *Torrent) PeerCount() int {
return t.Seeders.Len() + t.Leechers.Len()
}
// Announce is an Announce by a Peer.
type Announce struct {
Config *config.Config `json:"config"`
Compact bool `json:"compact"`
Downloaded uint64 `json:"downloaded"`
Event string `json:"event"`
IPv4 Endpoint `json:"ipv4"`
IPv6 Endpoint `json:"ipv6"`
Infohash string `json:"infohash"`
Left uint64 `json:"left"`
NumWant int `json:"numwant"`
PeerID string `json:"peer_id"`
Uploaded uint64 `json:"uploaded"`
JWT string `json:"jwt"`
Torrent *Torrent `json:"-"`
Peer *Peer `json:"-"`
PeerV4 *Peer `json:"-"` // Only valid if HasIPv4() is true.
PeerV6 *Peer `json:"-"` // Only valid if HasIPv6() is true.
}
// ClientID returns the part of a PeerID that identifies a Peer's client
// software.
func (a *Announce) ClientID() (clientID string) {
length := len(a.PeerID)
if length >= 6 {
if a.PeerID[0] == '-' {
if length >= 7 {
clientID = a.PeerID[1:7]
}
} else {
clientID = a.PeerID[:6]
}
}
return
}
// HasIPv4 determines whether or not an announce has an IPv4 endpoint.
func (a *Announce) HasIPv4() bool {
return a.IPv4.IP != nil
}
// HasIPv6 determines whether or not an announce has an IPv6 endpoint.
func (a *Announce) HasIPv6() bool {
return a.IPv6.IP != nil
}
// BuildPeer creates the Peer representation of an Announce. BuildPeer creates
// one peer for each IP in the announce, and panics if there are none.
func (a *Announce) BuildPeer(t *Torrent) {
a.Peer = &Peer{
ID: a.PeerID,
Uploaded: a.Uploaded,
Downloaded: a.Downloaded,
Left: a.Left,
LastAnnounce: time.Now().Unix(),
}
if t != nil {
a.Torrent = t
}
if a.HasIPv4() && a.HasIPv6() {
a.PeerV4 = a.Peer
a.PeerV4.Endpoint = a.IPv4
peer6 := *a.Peer
a.PeerV6 = &peer6
a.PeerV6.Endpoint = a.IPv6
} else if a.HasIPv4() {
a.PeerV4 = a.Peer
a.PeerV4.Endpoint = a.IPv4
} else if a.HasIPv6() {
a.PeerV6 = a.Peer
a.PeerV6.Endpoint = a.IPv6
} else {
panic("models: announce must have an IP")
}
return
}
// AnnounceResponse contains the information needed to fulfill an announce.
type AnnounceResponse struct {
Announce *Announce
Complete, Incomplete int
Interval, MinInterval time.Duration
IPv4Peers, IPv6Peers PeerList
Compact bool
}
// Scrape is a Scrape by a Peer.
type Scrape struct {
Config *config.Config `json:"config"`
Infohashes []string
}
// ScrapeResponse contains the information needed to fulfill a scrape.
type ScrapeResponse struct {
Files []*Torrent
}
-64
View File
@@ -1,64 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package models
import "testing"
type PeerClientPair struct {
announce Announce
clientID string
}
var TestClients = []PeerClientPair{
{Announce{PeerID: "-AZ3034-6wfG2wk6wWLc"}, "AZ3034"},
{Announce{PeerID: "-AZ3042-6ozMq5q6Q3NX"}, "AZ3042"},
{Announce{PeerID: "-BS5820-oy4La2MWGEFj"}, "BS5820"},
{Announce{PeerID: "-AR6360-6oZyyMWoOOBe"}, "AR6360"},
{Announce{PeerID: "-AG2083-s1hiF8vGAAg0"}, "AG2083"},
{Announce{PeerID: "-AG3003-lEl2Mm4NEO4n"}, "AG3003"},
{Announce{PeerID: "-MR1100-00HS~T7*65rm"}, "MR1100"},
{Announce{PeerID: "-LK0140-ATIV~nbEQAMr"}, "LK0140"},
{Announce{PeerID: "-KT2210-347143496631"}, "KT2210"},
{Announce{PeerID: "-TR0960-6ep6svaa61r4"}, "TR0960"},
{Announce{PeerID: "-XX1150-dv220cotgj4d"}, "XX1150"},
{Announce{PeerID: "-AZ2504-192gwethivju"}, "AZ2504"},
{Announce{PeerID: "-KT4310-3L4UvarKuqIu"}, "KT4310"},
{Announce{PeerID: "-AZ2060-0xJQ02d4309O"}, "AZ2060"},
{Announce{PeerID: "-BD0300-2nkdf08Jd890"}, "BD0300"},
{Announce{PeerID: "-A~0010-a9mn9DFkj39J"}, "A~0010"},
{Announce{PeerID: "-UT2300-MNu93JKnm930"}, "UT2300"},
{Announce{PeerID: "-UT2300-KT4310KT4301"}, "UT2300"},
{Announce{PeerID: "T03A0----f089kjsdf6e"}, "T03A0-"},
{Announce{PeerID: "S58B-----nKl34GoNb75"}, "S58B--"},
{Announce{PeerID: "M4-4-0--9aa757Efd5Bl"}, "M4-4-0"},
{Announce{PeerID: "AZ2500BTeYUzyabAfo6U"}, "AZ2500"}, // BitTyrant
{Announce{PeerID: "exbc0JdSklm834kj9Udf"}, "exbc0J"}, // Old BitComet
{Announce{PeerID: "FUTB0L84j542mVc84jkd"}, "FUTB0L"}, // Alt BitComet
{Announce{PeerID: "XBT054d-8602Jn83NnF9"}, "XBT054"}, // XBT
{Announce{PeerID: "OP1011affbecbfabeefb"}, "OP1011"}, // Opera
{Announce{PeerID: "-ML2.7.2-kgjjfkd9762"}, "ML2.7."}, // MLDonkey
{Announce{PeerID: "-BOWA0C-SDLFJWEIORNM"}, "BOWA0C"}, // Bits on Wheels
{Announce{PeerID: "Q1-0-0--dsn34DFn9083"}, "Q1-0-0"}, // Queen Bee
{Announce{PeerID: "Q1-10-0-Yoiumn39BDfO"}, "Q1-10-"}, // Queen Bee Alt
{Announce{PeerID: "346------SDFknl33408"}, "346---"}, // TorreTopia
{Announce{PeerID: "QVOD0054ABFFEDCCDEDB"}, "QVOD00"}, // Qvod
{Announce{PeerID: ""}, ""},
{Announce{PeerID: "-"}, ""},
{Announce{PeerID: "12345"}, ""},
{Announce{PeerID: "-12345"}, ""},
{Announce{PeerID: "123456"}, "123456"},
{Announce{PeerID: "-123456"}, "123456"},
}
func TestClientID(t *testing.T) {
for _, pair := range TestClients {
if parsedID := pair.announce.ClientID(); parsedID != pair.clientID {
t.Error("Incorrectly parsed peer ID", pair.announce.PeerID, "as", parsedID)
}
}
}
-205
View File
@@ -1,205 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package models
import (
"net"
"sync"
"sync/atomic"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/stats"
)
// PeerMap is a thread-safe map from PeerKeys to Peers. When PreferredSubnet is
// enabled, it is a thread-safe map of maps from MaskedIPs to Peerkeys to Peers.
type PeerMap struct {
Peers map[string]map[PeerKey]Peer `json:"peers"`
Seeders bool `json:"seeders"`
Config config.SubnetConfig `json:"config"`
Size int32 `json:"size"`
sync.RWMutex
}
// NewPeerMap initializes the map for a new PeerMap.
func NewPeerMap(seeders bool, cfg *config.Config) *PeerMap {
pm := &PeerMap{
Peers: make(map[string]map[PeerKey]Peer),
Seeders: seeders,
Config: cfg.NetConfig.SubnetConfig,
}
if !pm.Config.PreferredSubnet {
pm.Peers[""] = make(map[PeerKey]Peer)
}
return pm
}
// Contains is true if a peer is contained with a PeerMap.
func (pm *PeerMap) Contains(pk PeerKey) bool {
pm.RLock()
defer pm.RUnlock()
if pm.Config.PreferredSubnet {
maskedIP := pm.mask(pk.IP())
peers, exists := pm.Peers[maskedIP]
if !exists {
return false
}
_, exists = peers[pk]
return exists
}
_, exists := pm.Peers[""][pk]
return exists
}
func (pm *PeerMap) mask(ip net.IP) string {
if !pm.Config.PreferredSubnet {
return ""
}
var maskedIP net.IP
if len(ip) == net.IPv6len {
maskedIP = ip.Mask(net.CIDRMask(pm.Config.PreferredIPv6Subnet, 128))
} else {
maskedIP = ip.Mask(net.CIDRMask(pm.Config.PreferredIPv4Subnet, 32))
}
return maskedIP.String()
}
// LookUp is a thread-safe read from a PeerMap.
func (pm *PeerMap) LookUp(pk PeerKey) (peer Peer, exists bool) {
pm.RLock()
defer pm.RUnlock()
maskedIP := pm.mask(pk.IP())
peers, exists := pm.Peers[maskedIP]
if !exists {
return Peer{}, false
}
peer, exists = peers[pk]
return
}
// Put is a thread-safe write to a PeerMap.
func (pm *PeerMap) Put(p Peer) {
pm.Lock()
defer pm.Unlock()
maskedIP := pm.mask(p.IP)
_, exists := pm.Peers[maskedIP]
if !exists {
pm.Peers[maskedIP] = make(map[PeerKey]Peer)
}
_, exists = pm.Peers[maskedIP][p.Key()]
if !exists {
atomic.AddInt32(&(pm.Size), 1)
}
pm.Peers[maskedIP][p.Key()] = p
}
// Delete is a thread-safe delete from a PeerMap.
func (pm *PeerMap) Delete(pk PeerKey) {
pm.Lock()
defer pm.Unlock()
maskedIP := pm.mask(pk.IP())
_, exists := pm.Peers[maskedIP][pk]
if exists {
atomic.AddInt32(&(pm.Size), -1)
delete(pm.Peers[maskedIP], pk)
}
}
// Len returns the number of peers within a PeerMap.
func (pm *PeerMap) Len() int {
return int(atomic.LoadInt32(&pm.Size))
}
// Purge iterates over all of the peers within a PeerMap and deletes them if
// they are older than the provided time.
func (pm *PeerMap) Purge(unixtime int64) {
pm.Lock()
defer pm.Unlock()
for _, subnetmap := range pm.Peers {
for key, peer := range subnetmap {
if peer.LastAnnounce <= unixtime {
atomic.AddInt32(&(pm.Size), -1)
delete(subnetmap, key)
if pm.Seeders {
stats.RecordPeerEvent(stats.ReapedSeed, peer.HasIPv6())
} else {
stats.RecordPeerEvent(stats.ReapedLeech, peer.HasIPv6())
}
}
}
}
}
// AppendPeers adds peers to given IPv4 or IPv6 lists.
func (pm *PeerMap) AppendPeers(ipv4s, ipv6s PeerList, ann *Announce, wanted int) (PeerList, PeerList) {
maskedIP := pm.mask(ann.Peer.IP)
pm.RLock()
defer pm.RUnlock()
count := 0
// Attempt to append all the peers in the same subnet.
for _, peer := range pm.Peers[maskedIP] {
if count >= wanted {
break
} else if peersEquivalent(&peer, ann.Peer) {
continue
} else {
count += AppendPeer(&ipv4s, &ipv6s, ann, &peer)
}
}
// Add any more peers out of the other subnets.
for subnet, peers := range pm.Peers {
if subnet == maskedIP {
continue
} else {
for _, peer := range peers {
if count >= wanted {
break
} else if peersEquivalent(&peer, ann.Peer) {
continue
} else {
count += AppendPeer(&ipv4s, &ipv6s, ann, &peer)
}
}
}
}
return ipv4s, ipv6s
}
// AppendPeer adds a peer to its corresponding peerlist.
func AppendPeer(ipv4s, ipv6s *PeerList, ann *Announce, peer *Peer) int {
if ann.HasIPv6() && peer.HasIPv6() {
*ipv6s = append(*ipv6s, *peer)
return 1
} else if ann.Config.RespectAF && ann.HasIPv4() && peer.HasIPv4() {
*ipv4s = append(*ipv4s, *peer)
return 1
} else if !ann.Config.RespectAF && peer.HasIPv4() {
*ipv4s = append(*ipv4s, *peer)
return 1
}
return 0
}
// peersEquivalent checks if two peers represent the same entity.
func peersEquivalent(a, b *Peer) bool {
return a.ID == b.ID
}
-28
View File
@@ -1,28 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"github.com/chihaya/chihaya/stats"
"github.com/chihaya/chihaya/tracker/models"
)
// HandleScrape encapsulates all the logic of handling a BitTorrent client's
// scrape without being coupled to any transport protocol.
func (tkr *Tracker) HandleScrape(scrape *models.Scrape, w Writer) (err error) {
var torrents []*models.Torrent
for _, infohash := range scrape.Infohashes {
torrent, err := tkr.FindTorrent(infohash)
if err != nil {
return err
}
torrents = append(torrents, torrent)
}
stats.RecordEvent(stats.Scrape)
return w.WriteScrape(&models.ScrapeResponse{
Files: torrents,
})
}
-274
View File
@@ -1,274 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
package tracker
import (
"hash/fnv"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/stats"
"github.com/chihaya/chihaya/tracker/models"
)
type Torrents struct {
torrents map[string]*models.Torrent
sync.RWMutex
}
type Storage struct {
shards []Torrents
size int32
clients map[string]bool
clientsM sync.RWMutex
}
func NewStorage(cfg *config.Config) *Storage {
s := &Storage{
shards: make([]Torrents, cfg.TorrentMapShards),
clients: make(map[string]bool),
}
for i := range s.shards {
s.shards[i].torrents = make(map[string]*models.Torrent)
}
return s
}
func (s *Storage) Len() int {
return int(atomic.LoadInt32(&s.size))
}
func (s *Storage) getShardIndex(infohash string) uint32 {
idx := fnv.New32()
idx.Write([]byte(infohash))
return idx.Sum32() % uint32(len(s.shards))
}
func (s *Storage) getTorrentShard(infohash string, readonly bool) *Torrents {
shardindex := s.getShardIndex(infohash)
if readonly {
s.shards[shardindex].RLock()
} else {
s.shards[shardindex].Lock()
}
return &s.shards[shardindex]
}
func (s *Storage) TouchTorrent(infohash string) error {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.LastAction = time.Now().Unix()
return nil
}
func (s *Storage) FindTorrent(infohash string) (*models.Torrent, error) {
shard := s.getTorrentShard(infohash, true)
defer shard.RUnlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return nil, models.ErrTorrentDNE
}
torrentCopy := *torrent
return &torrentCopy, nil
}
func (s *Storage) PutTorrent(torrent *models.Torrent) {
shard := s.getTorrentShard(torrent.Infohash, false)
defer shard.Unlock()
_, exists := shard.torrents[torrent.Infohash]
if !exists {
atomic.AddInt32(&s.size, 1)
}
torrentCopy := *torrent
shard.torrents[torrent.Infohash] = &torrentCopy
}
func (s *Storage) DeleteTorrent(infohash string) {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
if _, exists := shard.torrents[infohash]; exists {
atomic.AddInt32(&s.size, -1)
delete(shard.torrents, infohash)
}
}
func (s *Storage) IncrementTorrentSnatches(infohash string) error {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Snatches++
return nil
}
func (s *Storage) PutLeecher(infohash string, p *models.Peer) error {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Leechers.Put(*p)
return nil
}
func (s *Storage) DeleteLeecher(infohash string, p *models.Peer) error {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Leechers.Delete(p.Key())
return nil
}
func (s *Storage) PutSeeder(infohash string, p *models.Peer) error {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Seeders.Put(*p)
return nil
}
func (s *Storage) DeleteSeeder(infohash string, p *models.Peer) error {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
torrent.Seeders.Delete(p.Key())
return nil
}
func (s *Storage) PurgeInactiveTorrent(infohash string) error {
shard := s.getTorrentShard(infohash, false)
defer shard.Unlock()
torrent, exists := shard.torrents[infohash]
if !exists {
return models.ErrTorrentDNE
}
if torrent.PeerCount() == 0 {
atomic.AddInt32(&s.size, -1)
delete(shard.torrents, infohash)
}
return nil
}
func (s *Storage) PurgeInactivePeers(purgeEmptyTorrents bool, before time.Time) error {
unixtime := before.Unix()
// Build a list of keys to process.
index := 0
maxkeys := s.Len()
keys := make([]string, maxkeys)
for i := range s.shards {
shard := &s.shards[i]
shard.RLock()
for infohash := range shard.torrents {
keys[index] = infohash
index++
if index >= maxkeys {
break
}
}
shard.RUnlock()
if index >= maxkeys {
break
}
}
// Process the keys while allowing other goroutines to run.
for _, infohash := range keys {
runtime.Gosched()
shard := s.getTorrentShard(infohash, false)
torrent := shard.torrents[infohash]
if torrent == nil {
// The torrent has already been deleted since keys were computed.
shard.Unlock()
continue
}
torrent.Seeders.Purge(unixtime)
torrent.Leechers.Purge(unixtime)
peers := torrent.PeerCount()
shard.Unlock()
if purgeEmptyTorrents && peers == 0 {
s.PurgeInactiveTorrent(infohash)
stats.RecordEvent(stats.ReapedTorrent)
}
}
return nil
}
func (s *Storage) ClientApproved(peerID string) error {
s.clientsM.RLock()
defer s.clientsM.RUnlock()
_, exists := s.clients[peerID]
if !exists {
return models.ErrClientUnapproved
}
return nil
}
func (s *Storage) PutClient(peerID string) {
s.clientsM.Lock()
defer s.clientsM.Unlock()
s.clients[peerID] = true
}
func (s *Storage) DeleteClient(peerID string) {
s.clientsM.Lock()
defer s.clientsM.Unlock()
delete(s.clients, peerID)
}
-109
View File
@@ -1,109 +0,0 @@
// Copyright 2015 The Chihaya Authors. All rights reserved.
// Use of this source code is governed by the BSD 2-Clause license,
// which can be found in the LICENSE file.
// Package tracker provides a generic interface for manipulating a
// BitTorrent tracker's fast-moving data.
package tracker
import (
"sync"
"time"
"github.com/golang/glog"
"github.com/chihaya/chihaya/config"
"github.com/chihaya/chihaya/tracker/models"
)
// Tracker represents the logic necessary to service BitTorrent announces,
// independently of the underlying data transports used.
type Tracker struct {
Config *config.Config
jwkSet jwkSet
shuttingDown chan struct{}
shutdownWG sync.WaitGroup
*Storage
}
// New creates a new Tracker, and opens any necessary connections.
// Maintenance routines are automatically spawned in the background.
func New(cfg *config.Config) (*Tracker, error) {
tkr := &Tracker{
Config: cfg,
Storage: NewStorage(cfg),
shuttingDown: make(chan struct{}),
}
glog.Info("Starting garbage collection goroutine")
tkr.shutdownWG.Add(1)
go tkr.purgeInactivePeers(
cfg.PurgeInactiveTorrents,
time.Duration(float64(cfg.MinAnnounce.Duration)*cfg.ReapRatio),
cfg.ReapInterval.Duration,
)
if tkr.Config.JWKSetURI != "" {
glog.Info("Starting JWK Set update goroutine")
tkr.shutdownWG.Add(1)
go tkr.updateJWKSetForever()
}
if cfg.ClientWhitelistEnabled {
tkr.LoadApprovedClients(cfg.ClientWhitelist)
}
return tkr, nil
}
// Close gracefully shutdowns a Tracker by closing any database connections.
func (tkr *Tracker) Close() error {
close(tkr.shuttingDown)
tkr.shutdownWG.Wait()
return nil
}
// LoadApprovedClients loads a list of client IDs into the tracker's storage.
func (tkr *Tracker) LoadApprovedClients(clients []string) {
for _, client := range clients {
tkr.PutClient(client)
}
}
// Writer serializes a tracker's responses, and is implemented for each
// response transport used by the tracker. Only one of these may be called
// per request, and only once.
//
// Note, data passed into any of these functions will not contain sensitive
// information, so it may be passed back the client freely.
type Writer interface {
WriteError(err error) error
WriteAnnounce(*models.AnnounceResponse) error
WriteScrape(*models.ScrapeResponse) error
}
// purgeInactivePeers periodically walks the torrent database and removes
// peers that haven't announced recently.
func (tkr *Tracker) purgeInactivePeers(purgeEmptyTorrents bool, threshold, interval time.Duration) {
defer tkr.shutdownWG.Done()
for {
select {
case <-tkr.shuttingDown:
return
case <-time.After(interval):
before := time.Now().Add(-threshold)
glog.V(0).Infof("Purging peers with no announces since %s", before)
err := tkr.PurgeInactivePeers(purgeEmptyTorrents, before)
if err != nil {
glog.Errorf("Error purging torrents: %s", err)
}
}
}
}