mirror of
https://github.com/sot-tech/mochi.git
synced 2026-06-12 07:43:32 -07:00
(untested) refactor code
This commit is contained in:
+36
-92
@@ -6,12 +6,10 @@ package bittorrent
|
||||
import (
|
||||
"crypto/sha1"
|
||||
"crypto/sha256"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"unsafe"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
@@ -33,18 +31,22 @@ func NewPeerID(b []byte) (PeerID, error) {
|
||||
if len(b) != PeerIDLen {
|
||||
return p, ErrInvalidPeerIDSize
|
||||
}
|
||||
copy(p[:], b)
|
||||
return p, nil
|
||||
return PeerID(b), nil
|
||||
}
|
||||
|
||||
// String implements fmt.Stringer, returning the base16 encoded PeerID.
|
||||
func (p PeerID) String() string {
|
||||
return hex.EncodeToString(p[:])
|
||||
return hex.EncodeToString(p.Bytes())
|
||||
}
|
||||
|
||||
// RawString returns a 20-byte string of the raw bytes of the ID.
|
||||
func (p PeerID) RawString() string {
|
||||
return string(p[:])
|
||||
return unsafe.String(&p[0], PeerIDLen)
|
||||
}
|
||||
|
||||
// Bytes returns slice of bytes represents this PeerID
|
||||
func (p PeerID) Bytes() []byte {
|
||||
return p[:]
|
||||
}
|
||||
|
||||
// InfoHash represents an infohash.
|
||||
@@ -55,16 +57,10 @@ const (
|
||||
InfoHashV1Len = sha1.Size
|
||||
// InfoHashV2Len ... sha256.Size
|
||||
InfoHashV2Len = sha256.Size
|
||||
// NoneInfoHash dummy invalid InfoHash
|
||||
NoneInfoHash InfoHash = ""
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrInvalidHashType holds error about invalid InfoHash input type
|
||||
ErrInvalidHashType = errors.New("info hash must be provided as byte slice or raw/hex string")
|
||||
// ErrInvalidHashSize holds error about invalid InfoHash size
|
||||
ErrInvalidHashSize = fmt.Errorf("info hash must be either %d (for torrent V1) or %d (V2) bytes", InfoHashV1Len, InfoHashV2Len)
|
||||
)
|
||||
// ErrInvalidHashSize holds error about invalid InfoHash size
|
||||
var ErrInvalidHashSize = fmt.Errorf("info hash must be either %d (for torrent V1) or %d (V2) bytes or same sizes x2 (if HEX encoded)", InfoHashV1Len, InfoHashV2Len)
|
||||
|
||||
// TruncateV1 returns truncated to 20-bytes length array of the corresponding InfoHash.
|
||||
// If InfoHash is V2 (32 bytes), it will be truncated to 20 bytes
|
||||
@@ -76,48 +72,32 @@ func (i InfoHash) TruncateV1() InfoHash {
|
||||
return i
|
||||
}
|
||||
|
||||
// NewInfoHash creates an InfoHash from a byte slice or raw/hex string.
|
||||
func NewInfoHash(data any) (InfoHash, error) {
|
||||
if data == nil {
|
||||
return NoneInfoHash, ErrInvalidHashType
|
||||
}
|
||||
var ba []byte
|
||||
switch t := data.(type) {
|
||||
case [InfoHashV1Len]byte:
|
||||
ba = t[:]
|
||||
case [InfoHashV2Len]byte:
|
||||
ba = t[:]
|
||||
case []byte:
|
||||
l := len(t)
|
||||
if l == InfoHashV1Len*2 || l == InfoHashV2Len*2 {
|
||||
ba = make([]byte, l/2)
|
||||
if _, err := hex.Decode(ba, t); err != nil {
|
||||
return NoneInfoHash, err
|
||||
}
|
||||
} else {
|
||||
ba = t
|
||||
}
|
||||
case string:
|
||||
l := len(t)
|
||||
if l == InfoHashV1Len*2 || l == InfoHashV2Len*2 {
|
||||
var err error
|
||||
if ba, err = hex.DecodeString(t); err != nil {
|
||||
return NoneInfoHash, err
|
||||
}
|
||||
} else {
|
||||
ba = []byte(t)
|
||||
// NewInfoHash creates an InfoHash from raw/hex byte slice.
|
||||
func NewInfoHash(data []byte) (InfoHash, error) {
|
||||
var ih InfoHash
|
||||
switch l := len(data); l {
|
||||
case InfoHashV1Len, InfoHashV2Len:
|
||||
ih = InfoHash(data)
|
||||
case InfoHashV1Len * 2, InfoHashV2Len * 2:
|
||||
bb := make([]byte, l/2)
|
||||
if _, err := hex.Decode(bb, data); err != nil {
|
||||
return "", err
|
||||
}
|
||||
ih = InfoHash(unsafe.String(&bb[0], len(bb)))
|
||||
default:
|
||||
return "", ErrInvalidHashSize
|
||||
}
|
||||
l := len(ba)
|
||||
if l != InfoHashV1Len && l != InfoHashV2Len {
|
||||
return NoneInfoHash, ErrInvalidHashSize
|
||||
}
|
||||
return InfoHash(ba), nil
|
||||
return ih, nil
|
||||
}
|
||||
|
||||
// NewInfoHashString creates an InfoHash from raw/hex string.
|
||||
func NewInfoHashString(data string) (InfoHash, error) {
|
||||
return NewInfoHash(unsafe.Slice(unsafe.StringData(data), len(data)))
|
||||
}
|
||||
|
||||
// String implements fmt.Stringer, returning the base16 encoded InfoHash.
|
||||
func (i InfoHash) String() string {
|
||||
return hex.EncodeToString([]byte(i))
|
||||
return hex.EncodeToString(i.Bytes())
|
||||
}
|
||||
|
||||
// RawString returns a string of the raw bytes of the InfoHash.
|
||||
@@ -125,6 +105,11 @@ func (i InfoHash) RawString() string {
|
||||
return string(i)
|
||||
}
|
||||
|
||||
// Bytes returns slice of bytes represents this InfoHash
|
||||
func (i InfoHash) Bytes() []byte {
|
||||
return unsafe.Slice(unsafe.StringData(string(i)), len(i))
|
||||
}
|
||||
|
||||
// Peer represents the connection details of a peer that is returned in an
|
||||
// announce response.
|
||||
type Peer struct {
|
||||
@@ -132,47 +117,6 @@ type Peer struct {
|
||||
netip.AddrPort
|
||||
}
|
||||
|
||||
// PeerMinimumLen is the least allowed length of string serialized Peer
|
||||
const PeerMinimumLen = PeerIDLen + 2 + net.IPv4len
|
||||
|
||||
// ErrInvalidPeerDataSize holds error about invalid Peer data size
|
||||
var ErrInvalidPeerDataSize = fmt.Errorf("invalid peer data it must be at least %d bytes (InfoHash + Port + IPv4)", PeerMinimumLen)
|
||||
|
||||
// NewPeer constructs Peer from serialized by Peer.RawString data: PeerID[20by]Port[2by]net.IP[4/16by]
|
||||
func NewPeer(data string) (Peer, error) {
|
||||
var peer Peer
|
||||
if len(data) < PeerMinimumLen {
|
||||
return peer, ErrInvalidPeerDataSize
|
||||
}
|
||||
b := []byte(data)
|
||||
peerID, err := NewPeerID(b[:PeerIDLen])
|
||||
if err == nil {
|
||||
if addr, isOk := netip.AddrFromSlice(b[PeerIDLen+2:]); isOk {
|
||||
peer = Peer{
|
||||
ID: peerID,
|
||||
AddrPort: netip.AddrPortFrom(
|
||||
addr.Unmap(),
|
||||
binary.BigEndian.Uint16(b[PeerIDLen:PeerIDLen+2]),
|
||||
),
|
||||
}
|
||||
} else {
|
||||
err = ErrInvalidIP
|
||||
}
|
||||
}
|
||||
|
||||
return peer, err
|
||||
}
|
||||
|
||||
// RawString generates concatenation of PeerID, net port and IP-address
|
||||
func (p Peer) RawString() string {
|
||||
ip := p.Addr()
|
||||
b := make([]byte, PeerIDLen+2+(ip.BitLen()/8))
|
||||
copy(b[:PeerIDLen], p.ID[:])
|
||||
binary.BigEndian.PutUint16(b[PeerIDLen:PeerIDLen+2], p.Port())
|
||||
copy(b[PeerIDLen+2:], ip.AsSlice())
|
||||
return string(b)
|
||||
}
|
||||
|
||||
// Addr returns unmapped peer's IP address
|
||||
func (p Peer) Addr() netip.Addr {
|
||||
return p.AddrPort.Addr().Unmap()
|
||||
|
||||
@@ -17,8 +17,6 @@ import (
|
||||
"github.com/sot-tech/mochi/pkg/log"
|
||||
)
|
||||
|
||||
const iterations = 10000
|
||||
|
||||
var (
|
||||
addr = fmt.Sprintf("127.0.0.1:%d", rand.Int63n(10000)+16384)
|
||||
hashes = make([]string, 10)
|
||||
@@ -87,7 +85,7 @@ func BenchmarkPing(b *testing.B) {
|
||||
Path: "ping",
|
||||
}
|
||||
us := u.String()
|
||||
for i := 0; i < iterations; i++ {
|
||||
for i := 0; i < b.N; i++ {
|
||||
if err := runGet(us, false); err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
@@ -95,7 +93,7 @@ func BenchmarkPing(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkAnnounce(b *testing.B) {
|
||||
for i := 0; i < iterations; i++ {
|
||||
for i := 0; i < b.N; i++ {
|
||||
u := url.URL{
|
||||
Scheme: "http",
|
||||
Host: addr,
|
||||
@@ -119,7 +117,7 @@ func BenchmarkAnnounce(b *testing.B) {
|
||||
}
|
||||
|
||||
func BenchmarkScrape(b *testing.B) {
|
||||
for i := 0; i < iterations; i++ {
|
||||
for i := 0; i < b.N; i++ {
|
||||
u := url.URL{
|
||||
Scheme: "http",
|
||||
Host: addr,
|
||||
|
||||
@@ -57,7 +57,6 @@ func parseAnnounce(r *fasthttp.RequestCtx, opts ParseOptions) (*bittorrent.Annou
|
||||
if len(infoHashes) > 1 {
|
||||
return nil, errMultipleInfoHashes
|
||||
}
|
||||
// FIXME: make sure that we have a copy of InfoHash
|
||||
request.InfoHash = infoHashes[0]
|
||||
|
||||
// Parse the PeerID from the request.
|
||||
|
||||
@@ -71,7 +71,6 @@ func parseAnnounce(r Request, v6Action bool, opts frontend.ParseOptions) (*bitto
|
||||
|
||||
// XXX: pure V2 hashes will cause invalid parsing,
|
||||
// but BEP-52 says, that V2 hashes SHOULD be truncated
|
||||
// FIXME: make sure that we have a copy of InfoHash
|
||||
request.InfoHash, err = bittorrent.NewInfoHash(r.Packet[16:36])
|
||||
if err != nil {
|
||||
return nil, errInvalidInfoHash
|
||||
@@ -178,7 +177,6 @@ func parseScrape(r Request, opts frontend.ParseOptions) (*bittorrent.ScrapeReque
|
||||
var request *bittorrent.ScrapeRequest
|
||||
for len(r.Packet) >= bittorrent.InfoHashV1Len {
|
||||
var ih bittorrent.InfoHash
|
||||
// FIXME: make sure that we have a copy of InfoHash
|
||||
if ih, err = bittorrent.NewInfoHash(r.Packet[:bittorrent.InfoHashV1Len]); err == nil {
|
||||
infoHashes = append(infoHashes, ih)
|
||||
r.Packet = r.Packet[bittorrent.InfoHashV1Len:]
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
module github.com/sot-tech/mochi
|
||||
|
||||
go 1.19
|
||||
go 1.20
|
||||
|
||||
require (
|
||||
code.cloudfoundry.org/go-diodes v0.0.0-20230317203753-49f1af6d2f1a
|
||||
github.com/MicahParks/keyfunc v1.9.0
|
||||
github.com/anacrolix/torrent v1.48.0
|
||||
github.com/anacrolix/torrent v1.49.0
|
||||
github.com/cespare/xxhash/v2 v2.2.0
|
||||
github.com/golang-jwt/jwt/v4 v4.5.0
|
||||
github.com/jackc/pgx/v5 v5.3.1
|
||||
|
||||
@@ -40,8 +40,8 @@ github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQ
|
||||
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
|
||||
github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
|
||||
github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
|
||||
github.com/anacrolix/torrent v1.48.0 h1:OQe1aQb8WnhDzpcI7r3yWoHzHWKyPbfhXGfO9Q/pvbY=
|
||||
github.com/anacrolix/torrent v1.48.0/go.mod h1:3UtkJ8BnxXDRwvk+eT+uwiZalfFJ8YzAhvxe4QRPSJI=
|
||||
github.com/anacrolix/torrent v1.49.0 h1:v/TAd8BKsZarYEYv7VkPNv8tY5zZCwQyxMMlKKbAF4I=
|
||||
github.com/anacrolix/torrent v1.49.0/go.mod h1:qT3yS5oQwDUHnBXy+zf3nozLPudG7SFNDL3Jl/zQwFw=
|
||||
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
|
||||
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
|
||||
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
|
||||
@@ -160,7 +160,7 @@ func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceReque
|
||||
err = ErrInvalidJWT
|
||||
} else {
|
||||
var claimIH bittorrent.InfoHash
|
||||
if claimIH, err = bittorrent.NewInfoHash(claims.InfoHash); err != nil {
|
||||
if claimIH, err = bittorrent.NewInfoHashString(claims.InfoHash); err != nil {
|
||||
logger.Info().
|
||||
Err(err).
|
||||
Object("source", req.RequestPeer).
|
||||
@@ -206,7 +206,7 @@ func (h *hook) HandleScrape(ctx context.Context, req *bittorrent.ScrapeRequest,
|
||||
} else {
|
||||
var claimIHs bittorrent.InfoHashes
|
||||
for _, s := range claims.InfoHashes {
|
||||
if providedIh, err := bittorrent.NewInfoHash(s); err == nil {
|
||||
if providedIh, err := bittorrent.NewInfoHashString(s); err == nil {
|
||||
claimIHs = append(claimIHs, providedIh)
|
||||
} else {
|
||||
logger.Info().
|
||||
|
||||
@@ -56,7 +56,7 @@ func build(conf conf.MapConfig, st storage.DataStorage) (container.Container, er
|
||||
if len(c.HashList) > 0 {
|
||||
init := make([]storage.Entry, 0, len(c.HashList))
|
||||
for _, hashString := range c.HashList {
|
||||
ih, err := bittorrent.NewInfoHash(hashString)
|
||||
ih, err := bittorrent.NewInfoHashString(hashString)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("whitelist : %s : %w", hashString, err)
|
||||
}
|
||||
|
||||
@@ -84,7 +84,7 @@ func TestHandleAnnounce(t *testing.T) {
|
||||
req := &bittorrent.AnnounceRequest{}
|
||||
resp := &bittorrent.AnnounceResponse{}
|
||||
|
||||
hashinfo, err := bittorrent.NewInfoHash(tt.ih)
|
||||
hashinfo, err := bittorrent.NewInfoHashString(tt.ih)
|
||||
require.Nil(t, err)
|
||||
|
||||
req.InfoHash = hashinfo
|
||||
|
||||
@@ -114,19 +114,19 @@ func (s *store) delPeer(ctx context.Context, infoHashKey, peerID string) error {
|
||||
}
|
||||
|
||||
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), peer.RawString())
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
|
||||
return s.addPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), peer.RawString())
|
||||
return s.delPeer(ctx, r.InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), r.PackPeer(peer))
|
||||
}
|
||||
|
||||
func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) (err error) {
|
||||
@@ -134,7 +134,7 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee
|
||||
Stringer("infoHash", ih).
|
||||
Object("peer", peer).
|
||||
Msg("graduate leecher")
|
||||
infoHash, peerID := ih.RawString(), peer.RawString()
|
||||
infoHash, peerID := ih.RawString(), r.PackPeer(peer)
|
||||
ihSeederKey := r.InfoHashKey(infoHash, true, peer.Addr().Is6())
|
||||
ihLeecherKey := r.InfoHashKey(infoHash, false, peer.Addr().Is6())
|
||||
var moved bool
|
||||
|
||||
@@ -177,14 +177,15 @@ func (p *peers) len() int {
|
||||
return len(p.m)
|
||||
}
|
||||
|
||||
func (p *peers) keys(fn func(k bittorrent.Peer) bool) {
|
||||
func (p *peers) keys(fn func(k bittorrent.Peer) bool) bool {
|
||||
p.RLock()
|
||||
defer p.RUnlock()
|
||||
for k := range p.m {
|
||||
if !fn(k) {
|
||||
break
|
||||
return false
|
||||
}
|
||||
}
|
||||
p.RUnlock()
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *peers) forEach(fn func(k bittorrent.Peer, v int64) bool) {
|
||||
@@ -268,7 +269,7 @@ func (ps *peerStore) shardIndex(infoHash bittorrent.InfoHash, v6 bool) uint32 {
|
||||
// There are twice the amount of shards specified by the user, the first
|
||||
// half is dedicated to IPv4 swarms and the second half is dedicated to
|
||||
// IPv6 swarms.
|
||||
idx := binary.BigEndian.Uint32([]byte(infoHash[:4])) % (uint32(len(ps.shards)) / 2)
|
||||
idx := binary.BigEndian.Uint32(infoHash.Bytes()[:4]) % (uint32(len(ps.shards)) / 2)
|
||||
if v6 {
|
||||
idx += uint32(len(ps.shards) / 2)
|
||||
}
|
||||
@@ -417,8 +418,7 @@ func (ps *peerStore) AnnouncePeers(_ context.Context, ih bittorrent.InfoHash, fo
|
||||
if forSeeder {
|
||||
sw.leechers.keys(rangeFn)
|
||||
} else {
|
||||
sw.seeders.keys(rangeFn)
|
||||
if numWant > 0 {
|
||||
if sw.seeders.keys(rangeFn) {
|
||||
sw.leechers.keys(rangeFn)
|
||||
}
|
||||
}
|
||||
@@ -446,8 +446,8 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
|
||||
leechers, seeders = ps.countPeers(ih, true)
|
||||
l, s := ps.countPeers(ih, false)
|
||||
leechers, seeders = ps.countPeers(ih, false)
|
||||
l, s := ps.countPeers(ih, true)
|
||||
leechers, seeders = leechers+l, seeders+s
|
||||
|
||||
return
|
||||
|
||||
+18
-17
@@ -359,14 +359,14 @@ func (s *store) ScheduleStatisticsCollection(reportInterval time.Duration) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *store) putPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) {
|
||||
func (s *store) putPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, seeder bool) (err error) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Hex("infoHash", ih).
|
||||
Object("peer", peer).
|
||||
Bool("seeder", seeder).
|
||||
Msg("put peer")
|
||||
_, err = s.Exec(ctx, s.Peer.AddQuery, pgx.NamedArgs{
|
||||
pInfoHash: []byte(ih),
|
||||
pInfoHash: ih,
|
||||
pPeerID: peer.ID[:],
|
||||
pAddress: net.IP(peer.Addr().AsSlice()),
|
||||
pPort: peer.Port(),
|
||||
@@ -377,13 +377,13 @@ func (s *store) putPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittor
|
||||
return
|
||||
}
|
||||
|
||||
func (s *store) delPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (err error) {
|
||||
func (s *store) delPeer(ctx context.Context, ih []byte, peer bittorrent.Peer, seeder bool) (err error) {
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Hex("infoHash", ih).
|
||||
Object("peer", peer).
|
||||
Msg("del peer")
|
||||
_, err = s.Exec(ctx, s.Peer.DelQuery, pgx.NamedArgs{
|
||||
pInfoHash: []byte(ih),
|
||||
pInfoHash: ih,
|
||||
pPeerID: peer.ID[:],
|
||||
pAddress: net.IP(peer.Addr().AsSlice()),
|
||||
pPort: peer.Port(),
|
||||
@@ -393,19 +393,19 @@ func (s *store) delPeer(ctx context.Context, ih bittorrent.InfoHash, peer bittor
|
||||
}
|
||||
|
||||
func (s *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.putPeer(ctx, ih, peer, true)
|
||||
return s.putPeer(ctx, ih.Bytes(), peer, true)
|
||||
}
|
||||
|
||||
func (s *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(ctx, ih, peer, true)
|
||||
return s.delPeer(ctx, ih.Bytes(), peer, true)
|
||||
}
|
||||
|
||||
func (s *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.putPeer(ctx, ih, peer, false)
|
||||
return s.putPeer(ctx, ih.Bytes(), peer, false)
|
||||
}
|
||||
|
||||
func (s *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return s.delPeer(ctx, ih, peer, false)
|
||||
return s.delPeer(ctx, ih.Bytes(), peer, false)
|
||||
}
|
||||
|
||||
func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
@@ -414,7 +414,7 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee
|
||||
Object("peer", peer).
|
||||
Msg("graduate leecher")
|
||||
var batch pgx.Batch
|
||||
ihb := []byte(ih)
|
||||
ihb := ih.Bytes()
|
||||
batch.Queue(s.Peer.GraduateQuery, pgx.NamedArgs{
|
||||
pInfoHash: ihb,
|
||||
pPeerID: peer.ID[:],
|
||||
@@ -425,10 +425,10 @@ func (s *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pee
|
||||
return s.txBatch(ctx, &batch)
|
||||
}
|
||||
|
||||
func (s *store) getPeers(ctx context.Context, ih bittorrent.InfoHash, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) {
|
||||
func (s *store) getPeers(ctx context.Context, ih []byte, seeders bool, maxCount int, isV6 bool) (peers []bittorrent.Peer, err error) {
|
||||
var rows pgx.Rows
|
||||
if rows, err = s.Query(ctx, s.Announce.Query, pgx.NamedArgs{
|
||||
pInfoHash: []byte(ih),
|
||||
pInfoHash: ih,
|
||||
pSeeder: seeders,
|
||||
pV6: isV6,
|
||||
pCount: maxCount,
|
||||
@@ -503,12 +503,13 @@ func (s *store) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSe
|
||||
Int("numWant", numWant).
|
||||
Bool("v6", v6).
|
||||
Msg("announce peers")
|
||||
ihb := ih.Bytes()
|
||||
if forSeeder {
|
||||
peers, err = s.getPeers(ctx, ih, false, numWant, v6)
|
||||
peers, err = s.getPeers(ctx, ihb, false, numWant, v6)
|
||||
} else {
|
||||
if peers, err = s.getPeers(ctx, ih, true, numWant, v6); err == nil {
|
||||
if peers, err = s.getPeers(ctx, ihb, true, numWant, v6); err == nil {
|
||||
var addPeers []bittorrent.Peer
|
||||
addPeers, err = s.getPeers(ctx, ih, false, numWant-len(peers), v6)
|
||||
addPeers, err = s.getPeers(ctx, ihb, false, numWant-len(peers), v6)
|
||||
peers = append(peers, addPeers...)
|
||||
}
|
||||
}
|
||||
@@ -568,7 +569,7 @@ func (s *store) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leeche
|
||||
logger.Trace().
|
||||
Stringer("infoHash", ih).
|
||||
Msg("scrape swarm")
|
||||
ihb := []byte(ih)
|
||||
ihb := ih.Bytes()
|
||||
if seeders, leechers, err = s.countPeers(ctx, ihb); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -24,11 +24,16 @@ package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
|
||||
@@ -389,20 +394,30 @@ func (ps *store) delPeer(ctx context.Context, infoHashKey, peerCountKey, peerID
|
||||
return err
|
||||
}
|
||||
|
||||
// PackPeer generates concatenation of PeerID, net port and IP-address
|
||||
func PackPeer(p bittorrent.Peer) string {
|
||||
ip := p.Addr()
|
||||
b := make([]byte, bittorrent.PeerIDLen+2+(ip.BitLen()/8))
|
||||
copy(b[:bittorrent.PeerIDLen], p.ID.Bytes())
|
||||
binary.BigEndian.PutUint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2], p.Port())
|
||||
copy(b[bittorrent.PeerIDLen+2:], ip.AsSlice())
|
||||
return unsafe.String(&b[0], len(b))
|
||||
}
|
||||
|
||||
func (ps *store) PutSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) DeleteSeeder(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, peer.RawString())
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), true, peer.Addr().Is6()), CountSeederKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) PutLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
|
||||
return ps.putPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) DeleteLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, peer.RawString())
|
||||
return ps.delPeer(ctx, InfoHashKey(ih.RawString(), false, peer.Addr().Is6()), CountLeecherKey, PackPeer(peer))
|
||||
}
|
||||
|
||||
func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
|
||||
@@ -411,7 +426,7 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
|
||||
Object("peer", peer).
|
||||
Msg("graduate leecher")
|
||||
|
||||
infoHash, peerID, isV6 := ih.RawString(), peer.RawString(), peer.Addr().Is6()
|
||||
infoHash, peerID, isV6 := ih.RawString(), PackPeer(peer), peer.Addr().Is6()
|
||||
ihSeederKey, ihLeecherKey := InfoHashKey(infoHash, true, isV6), InfoHashKey(infoHash, false, isV6)
|
||||
|
||||
return ps.tx(ctx, func(tx redis.Pipeliner) error {
|
||||
@@ -438,12 +453,42 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
|
||||
})
|
||||
}
|
||||
|
||||
// peerMinimumLen is the least allowed length of string serialized Peer
|
||||
const peerMinimumLen = bittorrent.PeerIDLen + 2 + net.IPv4len
|
||||
|
||||
var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (InfoHash + Port + IPv4))", peerMinimumLen)
|
||||
|
||||
// UnpackPeer constructs Peer from serialized by Peer.PackPeer data: PeerID[20by]Port[2by]net.IP[4/16by]
|
||||
func UnpackPeer(data string) (bittorrent.Peer, error) {
|
||||
var peer bittorrent.Peer
|
||||
if len(data) < peerMinimumLen {
|
||||
return peer, errInvalidPeerDataSize
|
||||
}
|
||||
b := unsafe.Slice(unsafe.StringData(data), len(data))
|
||||
peerID, err := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen])
|
||||
if err == nil {
|
||||
if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk {
|
||||
peer = bittorrent.Peer{
|
||||
ID: peerID,
|
||||
AddrPort: netip.AddrPortFrom(
|
||||
addr.Unmap(),
|
||||
binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]),
|
||||
),
|
||||
}
|
||||
} else {
|
||||
err = bittorrent.ErrInvalidIP
|
||||
}
|
||||
}
|
||||
|
||||
return peer, err
|
||||
}
|
||||
|
||||
func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
|
||||
var peerIds []string
|
||||
peerIds, err = peersResult.Result()
|
||||
if err = NoResultErr(err); err == nil {
|
||||
for _, peerID := range peerIds {
|
||||
if p, err := bittorrent.NewPeer(peerID); err == nil {
|
||||
if p, err := UnpackPeer(peerID); err == nil {
|
||||
peers = append(peers, p)
|
||||
} else {
|
||||
logger.Error().Err(err).Str("peerID", peerID).Msg("unable to decode peer")
|
||||
|
||||
Reference in New Issue
Block a user