(tested) fix redis to pass tests

* delete info hash count key from redis (replaced with SCARD on infohash set)
* add GC test
* add peer.Addr() functio to always return unwrapped address if 4to6 appear
This commit is contained in:
Lawrence, Rendall
2022-04-15 01:29:57 +03:00
parent 5c2471ca9b
commit 397e106396
17 changed files with 287 additions and 261 deletions

View File

@@ -1,6 +1,7 @@
![Mochi (source image: https://www.flaticon.com/free-icon/mochi_5392004)](mochi.svg)
# Modified Chihaya (MoChi)
[![Build Status](https://github.com/sot-tech/mochi/workflows/Build%20&%20Test/badge.svg)](https://github.com/sot-tech/mochi/actions)
[![Docker Repository on Quay](https://quay.io/repository/eramde/mochi/status "Docker Repository on Quay")](https://quay.io/repository/eramde/mochi)
[![License](https://img.shields.io/badge/license-BSD-blue.svg)](https://opensource.org/licenses/BSD-2-Clause)

View File

@@ -243,7 +243,7 @@ func NewPeer(data string) (Peer, error) {
peer = Peer{
ID: peerID,
AddrPort: netip.AddrPortFrom(
addr,
addr.Unmap(),
binary.BigEndian.Uint16(b[PeerIDLen:PeerIDLen+2]),
),
}
@@ -276,7 +276,7 @@ func (p Peer) RawString() string {
func (p Peer) LogFields() log.Fields {
return log.Fields{
"ID": p.ID,
"IP": p.Addr().String(),
"IP": p.Addr(),
"port": p.Port(),
}
}
@@ -290,6 +290,11 @@ func (p Peer) EqualEndpoint(x Peer) bool {
p.Addr().Compare(x.Addr()) == 0
}
// Addr returns unmapped peer's IP address
func (p Peer) Addr() netip.Addr {
return p.AddrPort.Addr().Unmap()
}
// ClientError represents an error that should be exposed to the client over
// the BitTorrent protocol implementation.
type ClientError string

View File

@@ -27,7 +27,7 @@ func SanitizeAnnounce(r *AnnounceRequest, maxNumWant, defaultNumWant uint32) err
r.NumWant = maxNumWant
}
r.AddrPort = netip.AddrPortFrom(r.Addr().Unmap(), r.Port())
r.AddrPort = netip.AddrPortFrom(r.Addr(), r.Port())
if !r.Addr().IsValid() || r.Addr().IsUnspecified() {
return ErrInvalidIP
}

View File

@@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"fmt"
"os/signal"
"runtime"
"strings"
@@ -46,7 +47,7 @@ func NewRun(configFilePath string) (*Run, error) {
func (r *Run) Start(ps storage.Storage) error {
configFile, err := ParseConfigFile(r.configFilePath)
if err != nil {
return errors.New("failed to read config: " + err.Error())
return fmt.Errorf("failed to read config: %w", err)
}
cfg := configFile.Conf
@@ -63,7 +64,7 @@ func (r *Run) Start(ps storage.Storage) error {
log.Info("starting storage", log.Fields{"name": cfg.Storage.Name})
ps, err = storage.NewStorage(cfg.Storage.Name, cfg.Storage.Config)
if err != nil {
return errors.New("failed to create storage: " + err.Error())
return fmt.Errorf("failed to create storage: %w", err)
}
log.Info("started storage", ps)
}
@@ -71,11 +72,11 @@ func (r *Run) Start(ps storage.Storage) error {
preHooks, err := middleware.HooksFromHookConfigs(cfg.PreHooks, r.storage)
if err != nil {
return errors.New("failed to validate hook config: " + err.Error())
return fmt.Errorf("failed to validate hook config: %w", err)
}
postHooks, err := middleware.HooksFromHookConfigs(cfg.PostHooks, r.storage)
if err != nil {
return errors.New("failed to validate hook config: " + err.Error())
return fmt.Errorf("failed to validate hook config: %w", err)
}
log.Info("starting tracker logic", log.Fields{

View File

@@ -297,7 +297,7 @@ func (f *Frontend) announceRoute(w http.ResponseWriter, r *http.Request, ps http
WriteError(w, err)
return
}
addr = req.AddrPort.Addr()
addr = req.Addr()
ctx := injectRouteParamsToContext(context.Background(), ps)
ctx, resp, err := f.logic.HandleAnnounce(ctx, req)

View File

@@ -178,5 +178,5 @@ func requestedIP(r *http.Request, p bittorrent.Params, opts ParseOptions) (netip
}
addrPort, err := netip.ParseAddrPort(r.RemoteAddr)
return addrPort.Addr(), false, err
return addrPort.Addr().Unmap(), false, err
}

View File

@@ -98,17 +98,17 @@ func WriteScrapeResponse(w http.ResponseWriter, resp *bittorrent.ScrapeResponse)
}
func compact4(peer bittorrent.Peer) (buf []byte) {
ip := peer.AddrPort.Addr().As4()
ip := peer.Addr().As4()
buf = append(buf, ip[:]...)
port := peer.AddrPort.Port()
port := peer.Port()
buf = append(buf, byte(port>>8), byte(port&0xff))
return
}
func compact6(peer bittorrent.Peer) (buf []byte) {
ip := peer.AddrPort.Addr().As16()
ip := peer.Addr().As16()
buf = append(buf, ip[:]...)
port := peer.AddrPort.Port()
port := peer.Port()
buf = append(buf, byte(port>>8), byte(port&0xff))
return
}

View File

@@ -207,7 +207,7 @@ func (t *Frontend) serve() error {
defer pool.Put(buffer)
// Handle the request.
addr := addrPort.Addr()
addr := addrPort.Addr().Unmap()
var start time.Time
if t.EnableRequestTiming {
start = time.Now()

View File

@@ -125,7 +125,7 @@ func (h *responseHook) appendPeers(req *bittorrent.AnnounceRequest, resp *bittor
}
switch addr := req.Peer.Addr(); {
case addr.Is4(), addr.Is4In6():
case addr.Is4():
resp.IPv4Peers = mergePeers(resp.IPv4Peers, peers, max)
case addr.Is6():
resp.IPv6Peers = mergePeers(resp.IPv6Peers, peers, max)

View File

@@ -95,7 +95,7 @@ func NewHook(cfg Config) (middleware.Hook, error) {
log.Debug("performing initial fetch of JWKs")
if err := h.updateKeys(); err != nil {
return nil, errors.New("failed to fetch initial JWK Set: " + err.Error())
return nil, fmt.Errorf("failed to fetch initial JWK Set: %w", err)
}
go func() {

186
mochi.svg
View File

@@ -3,97 +3,97 @@
<!-- Source image: https://www.flaticon.com/free-icon/mochi_5392004 -->
<svg
width="100mm"
height="100mm"
viewBox="0 0 100 99.999997"
version="1.1"
id="svg5"
sodipodi:docname="mochi.svg"
inkscape:version="1.1.2 (1:1.1+202202050950+0a00cf5339)"
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
xmlns="http://www.w3.org/2000/svg"
xmlns:svg="http://www.w3.org/2000/svg">
<sodipodi:namedview
id="namedview7"
pagecolor="#000000"
bordercolor="#666666"
borderopacity="1.0"
inkscape:pageshadow="2"
inkscape:pageopacity="0"
inkscape:pagecheckerboard="true"
inkscape:document-units="mm"
showgrid="false"
inkscape:zoom="1.4142136"
inkscape:cx="128.33988"
inkscape:cy="254.55844"
inkscape:window-width="1920"
inkscape:window-height="1007"
inkscape:window-x="0"
inkscape:window-y="44"
inkscape:window-maximized="1"
inkscape:current-layer="layer2"
fit-margin-top="0"
fit-margin-left="0"
fit-margin-right="0"
fit-margin-bottom="0"
width="120mm" />
<defs
id="defs2" />
<g
inkscape:label="Слой 1"
inkscape:groupmode="layer"
id="layer1"
transform="translate(-44.737202,-67.807678)">
<g
id="g637"
transform="translate(-1.8000249,9.9550588)" />
<g
inkscape:groupmode="layer"
id="layer3"
inkscape:label="green">
<g
id="g9381"
transform="matrix(0.73992929,0,0,0.74958725,10.302944,24.442092)">
<path
style="fill:#d0ef9d;stroke-width:0.264583"
d="m 69.35694,137.89312 c -9.3017,-1.44558 -17.24881,-7.79995 -20.6979,-16.54965 -3.55592,-9.02072 -2.6443,-18.84027 2.74959,-29.6175 9.15946,-18.300963 24.50366,-30.308717 42.43339,-33.20667 2.25338,-0.364211 2.76601,-0.364282 5.02546,-6.99e-4 8.92921,1.436853 17.48116,5.276915 25.00894,11.229707 9.49568,7.508966 18.29724,20.781336 20.89461,31.508172 0.92187,3.80722 1.28705,8.29879 0.9454,11.62807 -0.15206,1.48178 -0.27647,2.85258 -0.27647,3.04622 0,0.26064 -0.59196,0.15196 -2.27975,-0.41854 -5.65359,-1.91104 -11.63233,-2.7717 -17.81042,-2.56387 -10.90925,0.36698 -20.85133,3.90879 -30.04837,10.70457 -3.14406,2.32317 -9.31808,8.51893 -11.87013,11.9119 l -2.03983,2.71198 -4.95127,-0.0262 c -2.7232,-0.0144 -5.91066,-0.17528 -7.08325,-0.35751 z"
id="path647"
sodipodi:nodetypes="cscsssssscssscccc" />
<path
style="fill:#baea6b;stroke-width:0.264583"
d="m 145.40517,116.09953 c 0.0388,-0.50954 -0.0617,-0.50007 0.18268,-2.98109 0.71622,-7.27182 -1.05319,-14.436184 -5.24685,-22.475781 C 133.83606,78.172174 124.7869,68.70184 113.29309,63.133914 108.19374,60.663648 104.17878,59.865863 98.947649,58.772149 L 96.470538,58.25424 99.27017,57.969969 c 4.90427,-0.497973 12.67858,0.633696 18.5024,2.54607 13.97148,4.587823 25.96934,15.268411 33.61361,29.923104 4.62428,8.86511 6.17653,16.818537 4.86967,24.951197 -0.30821,1.91801 -0.56467,4.61431 -0.78711,4.56874 -4.0899,-1.6687 -6.43546,-2.47349 -10.06357,-3.85955 z"
id="path645"
sodipodi:nodetypes="csssscsssscc" />
<path
style="fill:#8ac626;stroke-width:0.264583"
d="m 92.949805,98.374772 c -0.32758,-0.32758 -0.52916,-0.881943 -0.52916,-1.455208 0,-1.801892 1.92997,-2.648183 3.12888,-1.37201 1.19593,1.273011 0.40339,3.356385 -1.2768,3.356385 -0.44097,0 -1.02894,-0.235185 -1.32292,-0.529167 z M 78.166237,96.1509 c -0.93127,-0.344498 -1.455502,-1.247584 -1.378729,-2.11843 0.09126,-1.035211 0.986483,-2.061076 2.167865,-2.027487 1.03201,0.02934 1.888474,1.092177 1.888474,2.136651 0,1.406115 -1.402579,2.480929 -2.67761,2.009266 z M 93.952635,84.072035 c -0.58957,-0.343713 -0.87329,-1.883873 -0.48179,-2.6154 1.0133,-1.893371 4.2374,-0.498958 3.58669,1.551234 -0.39087,1.231522 -1.91928,1.755365 -3.1049,1.064166 z"
id="path639"
sodipodi:nodetypes="ssssssssssssss" />
</g>
</g>
<g
inkscape:groupmode="layer"
id="layer2"
inkscape:label="white">
<g
id="g9376"
transform="matrix(0.73992929,0,0,0.74958725,10.698698,24.384445)">
<path
style="fill:#f8f2f0;stroke-width:0.264583"
d="m 94.16125,191.00133 c -9.0368,-1.61147 -16.64021,-7.7879 -20.132668,-16.35423 -1.53397,-3.76253 -2.0457,-6.56472 -2.05925,-11.27643 -0.0186,-6.46808 1.23547,-11.41581 4.62382,-18.24242 8.166048,-16.4524 21.628388,-28.092 37.341138,-32.28531 4.96807,-1.32584 7.56973,-1.6169 10.38946,-1.16228 10.45163,1.68506 20.19865,6.61814 28.68154,14.51604 6.33873,5.90161 13.30736,16.57636 16.01921,24.53868 3.62408,10.64077 2.68567,20.92662 -2.63936,28.92964 -3.65758,5.497 -8.95871,9.24437 -15.60306,11.02979 -2.09107,0.5619 -2.43331,0.56948 -28.31041,0.62638 -21.70854,0.0477 -26.55619,-0.007 -28.31042,-0.31986 z"
id="path651"
sodipodi:nodetypes="csssscsssscc" />
<path
style="fill:#ede1db;stroke-width:0.264583"
d="m 146.68843,191.0529 c 8.04246,-1.17698 14.50227,-5.16412 18.77806,-11.59024 6.4894,-9.75296 6.43222,-22.37123 -0.16036,-35.38894 -6.489,-12.81317 -16.11693,-22.54939 -27.80069,-28.11337 -5.05514,-2.40733 -10.61562,-3.81103 -14.77082,-4.28113 -2.65985,-0.30093 -0.95006,-0.89 3.10433,-0.89437 17.12083,-0.0185 32.83058,8.17961 43.88995,22.90382 11.10003,14.77833 14.22613,29.47509 8.86584,41.68106 -3.31447,7.5474 -9.63237,12.89683 -17.77645,15.05149 -1.94612,0.51488 -3.01602,0.59825 -8.57361,0.66807 -3.4925,0.0439 -5.99281,0.0275 -5.55625,-0.0364 z"
id="path649"
sodipodi:nodetypes="ssssscssssss" />
<path
style="fill:#d67e49;stroke-width:0.264583"
d="m 98.059161,170.86619 c -3.14661,-0.45953 -5.92984,-2.31772 -7.26671,-4.85154 -0.62977,-1.19362 -0.706871,-1.62863 -0.707061,-3.98927 -2e-4,-2.62092 0.0134,-2.67775 1.260311,-5.25683 9.809329,-20.28991 30.301209,-27.30099 47.122639,-16.12247 5.19843,3.45456 9.94621,8.98558 13.18357,15.35844 1.61767,3.18444 1.64553,3.27598 1.75433,5.76357 0.10532,2.40813 0.0712,2.60763 -0.72058,4.21601 -1.35535,2.7531 -3.66511,4.32918 -7.17026,4.89266 -2.09432,0.33668 -45.14413,0.32709 -47.456239,-0.0106 z"
id="path641" />
</g>
</g>
</g>
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
width="100mm"
height="100mm"
viewBox="0 0 100 99.999997"
version="1.1"
id="svg5"
sodipodi:docname="mochi.svg"
inkscape:version="1.1.2 (1:1.1+202202050950+0a00cf5339)"
xmlns="http://www.w3.org/2000/svg"
>
<sodipodi:namedview
id="namedview7"
pagecolor="#000000"
bordercolor="#666666"
borderopacity="1.0"
inkscape:pageshadow="2"
inkscape:pageopacity="0"
inkscape:pagecheckerboard="true"
inkscape:document-units="mm"
showgrid="false"
inkscape:zoom="1.4142136"
inkscape:cx="128.33988"
inkscape:cy="254.55844"
inkscape:window-width="1920"
inkscape:window-height="1007"
inkscape:window-x="0"
inkscape:window-y="44"
inkscape:window-maximized="1"
inkscape:current-layer="layer2"
fit-margin-top="0"
fit-margin-left="0"
fit-margin-right="0"
fit-margin-bottom="0"
width="120mm"/>
<defs
id="defs2"/>
<g
inkscape:label="Слой 1"
inkscape:groupmode="layer"
id="layer1"
transform="translate(-44.737202,-67.807678)">
<g
id="g637"
transform="translate(-1.8000249,9.9550588)"/>
<g
inkscape:groupmode="layer"
id="layer3"
inkscape:label="green">
<g
id="g9381"
transform="matrix(0.73992929,0,0,0.74958725,10.302944,24.442092)">
<path
style="fill:#d0ef9d;stroke-width:0.264583"
d="m 69.35694,137.89312 c -9.3017,-1.44558 -17.24881,-7.79995 -20.6979,-16.54965 -3.55592,-9.02072 -2.6443,-18.84027 2.74959,-29.6175 9.15946,-18.300963 24.50366,-30.308717 42.43339,-33.20667 2.25338,-0.364211 2.76601,-0.364282 5.02546,-6.99e-4 8.92921,1.436853 17.48116,5.276915 25.00894,11.229707 9.49568,7.508966 18.29724,20.781336 20.89461,31.508172 0.92187,3.80722 1.28705,8.29879 0.9454,11.62807 -0.15206,1.48178 -0.27647,2.85258 -0.27647,3.04622 0,0.26064 -0.59196,0.15196 -2.27975,-0.41854 -5.65359,-1.91104 -11.63233,-2.7717 -17.81042,-2.56387 -10.90925,0.36698 -20.85133,3.90879 -30.04837,10.70457 -3.14406,2.32317 -9.31808,8.51893 -11.87013,11.9119 l -2.03983,2.71198 -4.95127,-0.0262 c -2.7232,-0.0144 -5.91066,-0.17528 -7.08325,-0.35751 z"
id="path647"
sodipodi:nodetypes="cscsssssscssscccc"/>
<path
style="fill:#baea6b;stroke-width:0.264583"
d="m 145.40517,116.09953 c 0.0388,-0.50954 -0.0617,-0.50007 0.18268,-2.98109 0.71622,-7.27182 -1.05319,-14.436184 -5.24685,-22.475781 C 133.83606,78.172174 124.7869,68.70184 113.29309,63.133914 108.19374,60.663648 104.17878,59.865863 98.947649,58.772149 L 96.470538,58.25424 99.27017,57.969969 c 4.90427,-0.497973 12.67858,0.633696 18.5024,2.54607 13.97148,4.587823 25.96934,15.268411 33.61361,29.923104 4.62428,8.86511 6.17653,16.818537 4.86967,24.951197 -0.30821,1.91801 -0.56467,4.61431 -0.78711,4.56874 -4.0899,-1.6687 -6.43546,-2.47349 -10.06357,-3.85955 z"
id="path645"
sodipodi:nodetypes="csssscsssscc"/>
<path
style="fill:#8ac626;stroke-width:0.264583"
d="m 92.949805,98.374772 c -0.32758,-0.32758 -0.52916,-0.881943 -0.52916,-1.455208 0,-1.801892 1.92997,-2.648183 3.12888,-1.37201 1.19593,1.273011 0.40339,3.356385 -1.2768,3.356385 -0.44097,0 -1.02894,-0.235185 -1.32292,-0.529167 z M 78.166237,96.1509 c -0.93127,-0.344498 -1.455502,-1.247584 -1.378729,-2.11843 0.09126,-1.035211 0.986483,-2.061076 2.167865,-2.027487 1.03201,0.02934 1.888474,1.092177 1.888474,2.136651 0,1.406115 -1.402579,2.480929 -2.67761,2.009266 z M 93.952635,84.072035 c -0.58957,-0.343713 -0.87329,-1.883873 -0.48179,-2.6154 1.0133,-1.893371 4.2374,-0.498958 3.58669,1.551234 -0.39087,1.231522 -1.91928,1.755365 -3.1049,1.064166 z"
id="path639"
sodipodi:nodetypes="ssssssssssssss"/>
</g>
</g>
<g
inkscape:groupmode="layer"
id="layer2"
inkscape:label="white">
<g
id="g9376"
transform="matrix(0.73992929,0,0,0.74958725,10.698698,24.384445)">
<path
style="fill:#f8f2f0;stroke-width:0.264583"
d="m 94.16125,191.00133 c -9.0368,-1.61147 -16.64021,-7.7879 -20.132668,-16.35423 -1.53397,-3.76253 -2.0457,-6.56472 -2.05925,-11.27643 -0.0186,-6.46808 1.23547,-11.41581 4.62382,-18.24242 8.166048,-16.4524 21.628388,-28.092 37.341138,-32.28531 4.96807,-1.32584 7.56973,-1.6169 10.38946,-1.16228 10.45163,1.68506 20.19865,6.61814 28.68154,14.51604 6.33873,5.90161 13.30736,16.57636 16.01921,24.53868 3.62408,10.64077 2.68567,20.92662 -2.63936,28.92964 -3.65758,5.497 -8.95871,9.24437 -15.60306,11.02979 -2.09107,0.5619 -2.43331,0.56948 -28.31041,0.62638 -21.70854,0.0477 -26.55619,-0.007 -28.31042,-0.31986 z"
id="path651"
sodipodi:nodetypes="csssscsssscc"/>
<path
style="fill:#ede1db;stroke-width:0.264583"
d="m 146.68843,191.0529 c 8.04246,-1.17698 14.50227,-5.16412 18.77806,-11.59024 6.4894,-9.75296 6.43222,-22.37123 -0.16036,-35.38894 -6.489,-12.81317 -16.11693,-22.54939 -27.80069,-28.11337 -5.05514,-2.40733 -10.61562,-3.81103 -14.77082,-4.28113 -2.65985,-0.30093 -0.95006,-0.89 3.10433,-0.89437 17.12083,-0.0185 32.83058,8.17961 43.88995,22.90382 11.10003,14.77833 14.22613,29.47509 8.86584,41.68106 -3.31447,7.5474 -9.63237,12.89683 -17.77645,15.05149 -1.94612,0.51488 -3.01602,0.59825 -8.57361,0.66807 -3.4925,0.0439 -5.99281,0.0275 -5.55625,-0.0364 z"
id="path649"
sodipodi:nodetypes="ssssscssssss"/>
<path
style="fill:#d67e49;stroke-width:0.264583"
d="m 98.059161,170.86619 c -3.14661,-0.45953 -5.92984,-2.31772 -7.26671,-4.85154 -0.62977,-1.19362 -0.706871,-1.62863 -0.707061,-3.98927 -2e-4,-2.62092 0.0134,-2.67775 1.260311,-5.25683 9.809329,-20.28991 30.301209,-27.30099 47.122639,-16.12247 5.19843,3.45456 9.94621,8.98558 13.18357,15.35844 1.61767,3.18444 1.64553,3.27598 1.75433,5.76357 0.10532,2.40813 0.0712,2.60763 -0.72058,4.21601 -1.35535,2.7531 -3.66511,4.32918 -7.17026,4.89266 -2.09432,0.33668 -45.14413,0.32709 -47.456239,-0.0106 z"
id="path641"/>
</g>
</g>
</g>
</svg>

Before

Width:  |  Height:  |  Size: 6.3 KiB

After

Width:  |  Height:  |  Size: 6.0 KiB

View File

@@ -139,16 +139,18 @@ func New(provided Config) (storage.Storage, error) {
ps.wg.Add(1)
go func() {
defer ps.wg.Done()
t := time.NewTimer(cfg.GarbageCollectionInterval)
defer t.Stop()
for {
select {
case <-ps.closed:
return
case <-time.After(cfg.GarbageCollectionInterval):
case <-t.C:
before := time.Now().Add(-cfg.PeerLifetime)
log.Debug("storage: purging peers with no announces since", log.Fields{"before": before})
if err := ps.collectGarbage(before); err != nil {
log.Error(err)
}
start := time.Now()
ps.GC(before)
recordGCDuration(time.Since(start))
}
}
}()
@@ -546,20 +548,19 @@ func (ps *store) Delete(ctx string, keys ...string) error {
return nil
}
// collectGarbage deletes all Peers from the Storage which are older than the
// GC deletes all Peers from the Storage which are older than the
// cutoff time.
//
// This function must be able to execute while other methods on this interface
// are being executed in parallel.
func (ps *store) collectGarbage(cutoff time.Time) error {
func (ps *store) GC(cutoff time.Time) {
select {
case <-ps.closed:
return nil
return
default:
}
cutoffUnix := cutoff.UnixNano()
start := time.Now()
for _, shard := range ps.shards {
shard.RLock()
@@ -603,10 +604,6 @@ func (ps *store) collectGarbage(cutoff time.Time) error {
runtime.Gosched()
}
recordGCDuration(time.Since(start))
return nil
}
func (ps *store) Stop() stop.Result {

View File

@@ -2,24 +2,21 @@
// BitTorrent tracker keeping peer data in redis with hash.
// There two categories of hash:
//
// - CHI_{4,6}_{L,S}_infohash
// - CHI_{4,6}_{L,S}_<HASH> (hash type)
// To save peers that hold the infohash, used for fast searching,
// deleting, and timeout handling
//
// - CHI_{4,6}
// - CHI_{4,6}_I (set type)
// To save all the infohashes, used for garbage collection,
// metrics aggregation and leecher graduation
//
// Tree keys are used to record the count of swarms, seeders
// and leechers for each group (IPv4, IPv6).
//
// - CHI_{4,6}_I_C
// To record the number of infohashes.
//
// - CHI_{4,6}_S_C
// - CHI_{4,6}_S_C (key type)
// To record the number of seeders.
//
// - CHI_{4,6}_L_C
// - CHI_{4,6}_L_C (key type)
// To record the number of leechers.
package redis
@@ -64,8 +61,6 @@ const (
cnt6SeederKey = "CHI_6_C_S"
cnt4LeecherKey = "CHI_4_C_L"
cnt6LeecherKey = "CHI_6_C_L"
cnt4InfoHashKey = "CHI_4_C_I"
cnt6InfoHashKey = "CHI_6_C_I"
)
// ErrSentinelAndClusterChecked returned from initializer if both Config.Sentinel and Config.Cluster provided
@@ -277,10 +272,12 @@ func New(conf Config) (storage.Storage, error) {
ps.logFields = cfg.LogFields()
// Start a goroutine for garbage collection.
ps.wg.Add(1)
go ps.scheduleGC(cfg.GarbageCollectionInterval, cfg.PeerLifetime)
if cfg.PrometheusReportingInterval > 0 {
// Start a goroutine for reporting statistics to Prometheus.
ps.wg.Add(1)
go ps.schedulerProm(cfg.PrometheusReportingInterval)
} else {
log.Info("prometheus disabled because of zero reporting interval")
@@ -290,7 +287,6 @@ func New(conf Config) (storage.Storage, error) {
}
func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) {
ps.wg.Add(1)
defer ps.wg.Done()
t := time.NewTimer(gcInterval)
defer t.Stop()
@@ -299,12 +295,8 @@ func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) {
case <-ps.closed:
return
case <-t.C:
before := time.Now().Add(-peerLifeTime)
log.Debug("storage: purging peers with no announces since", log.Fields{"before": before})
cutoffUnix := before.UnixNano()
start := time.Now()
ps.gc(cutoffUnix, false)
ps.gc(cutoffUnix, true)
ps.GC(time.Now().Add(-peerLifeTime))
duration := time.Since(start).Milliseconds()
log.Debug("storage: recordGCDuration", log.Fields{"timeTaken(ms)": duration})
storage.PromGCDurationMilliseconds.Observe(float64(duration))
@@ -314,7 +306,6 @@ func (ps *store) scheduleGC(gcInterval, peerLifeTime time.Duration) {
}
func (ps *store) schedulerProm(reportInterval time.Duration) {
ps.wg.Add(1)
defer ps.wg.Done()
t := time.NewTicker(reportInterval)
for {
@@ -339,10 +330,16 @@ type store struct {
logFields log.Fields
}
func (ps *store) count(key string) (n uint64) {
func (ps *store) count(key string, getLength bool) (n uint64) {
var err error
if n, err = ps.con.Get(ps.ctx, key).Uint64(); err != nil && !errors.Is(err, redis.Nil) {
log.Error("storage: GET counter failure", log.Fields{
if getLength {
n, err = ps.con.SCard(ps.ctx, key).Uint64()
} else {
n, err = ps.con.Get(ps.ctx, key).Uint64()
}
err = asNil(err)
if err != nil {
log.Error("storage: len/counter failure", log.Fields{
"key": key,
"error": err,
})
@@ -355,15 +352,15 @@ func (ps *store) count(key string) (n uint64) {
func (ps *store) populateProm() {
numInfoHashes, numSeeders, numLeechers := new(uint64), new(uint64), new(uint64)
fetchFn := func(v6 bool) {
var cntSeederKey, cntLeecherKey, cntInfoHashKey string
var cntSeederKey, cntLeecherKey, ihSummaryKey string
if v6 {
cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt6SeederKey, cnt6LeecherKey, cnt6InfoHashKey
cntSeederKey, cntLeecherKey, ihSummaryKey = cnt6SeederKey, cnt6LeecherKey, ih6Key
} else {
cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt4SeederKey, cnt4LeecherKey, cnt4InfoHashKey
cntSeederKey, cntLeecherKey, ihSummaryKey = cnt4SeederKey, cnt4LeecherKey, ih4Key
}
*numInfoHashes += ps.count(cntInfoHashKey)
*numSeeders += ps.count(cntSeederKey)
*numLeechers += ps.count(cntLeecherKey)
*numInfoHashes += ps.count(ihSummaryKey, true)
*numSeeders += ps.count(cntSeederKey, false)
*numLeechers += ps.count(cntLeecherKey, false)
}
fetchFn(false)
@@ -403,15 +400,15 @@ func asNil(err error) error {
}
func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
var ihSummaryKey, ihPeerKey, cntPeerKey, cntInfoHashKey string
var ihSummaryKey, ihPeerKey, cntPeerKey string
log.Debug("storage: PutSeeder", log.Fields{
"InfoHash": ih,
"Peer": peer,
})
if peer.Addr().Is6() {
ihSummaryKey, ihPeerKey, cntPeerKey, cntInfoHashKey = ih6Key, ih6SeederKey, cnt6SeederKey, cnt6InfoHashKey
ihSummaryKey, ihPeerKey, cntPeerKey = ih6Key, ih6SeederKey, cnt6SeederKey
} else {
ihSummaryKey, ihPeerKey, cntPeerKey, cntInfoHashKey = ih4Key, ih4SeederKey, cnt4SeederKey, cnt4InfoHashKey
ihSummaryKey, ihPeerKey, cntPeerKey = ih4Key, ih4SeederKey, cnt4SeederKey
}
ihPeerKey += ih.RawString()
now := ps.getClock()
@@ -423,13 +420,7 @@ func (ps *store) PutSeeder(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
if err = ps.con.Incr(ps.ctx, cntPeerKey).Err(); err != nil {
return
}
var added int64
if added, err = ps.con.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Result(); err != nil {
return
}
if added > 0 {
err = ps.con.Incr(ps.ctx, cntInfoHashKey).Err()
}
err = ps.con.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err()
return
})
}
@@ -473,16 +464,14 @@ func (ps *store) PutLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error
}
ihPeerKey += ih.RawString()
now := ps.getClock()
return ps.tx(func(tx redis.Pipeliner) (err error) {
if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), now).Err(); err != nil {
if err = tx.HSet(ps.ctx, ihPeerKey, peer.RawString(), ps.getClock()).Err(); err != nil {
return
}
if err = tx.Incr(ps.ctx, cntPeerKey).Err(); err != nil {
return err
}
err = tx.HSet(ps.ctx, ihSummaryKey, ihPeerKey, now).Err()
err = tx.SAdd(ps.ctx, ihSummaryKey, ihPeerKey).Err()
return
})
}
@@ -515,7 +504,7 @@ func (ps *store) DeleteLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) err
}
func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) error {
var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey, cntInfoHashKey string
var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey string
log.Debug("storage: GraduateLeecher", log.Fields{
"InfoHash": ih,
"Peer": peer,
@@ -523,16 +512,14 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
if peer.Addr().Is6() {
ihSummaryKey, ihSeederKey, cntSeederKey = ih6Key, ih6SeederKey, cnt6SeederKey
ihLeecherKey, cntLeecherKey, cntInfoHashKey = ih6LeecherKey, cnt6LeecherKey, cnt6InfoHashKey
ihLeecherKey, cntLeecherKey = ih6LeecherKey, cnt6LeecherKey
} else {
ihSummaryKey, ihSeederKey, cntSeederKey = ih4Key, ih4SeederKey, cnt4SeederKey
ihLeecherKey, cntLeecherKey, cntInfoHashKey = ih4LeecherKey, cnt4LeecherKey, cnt4InfoHashKey
ihLeecherKey, cntLeecherKey = ih4LeecherKey, cnt4LeecherKey
}
infoHash, peerKey := ih.RawString(), peer.RawString()
ihSeederKey, ihLeecherKey = ihSeederKey+infoHash, ihLeecherKey+infoHash
now := ps.getClock()
return ps.tx(func(tx redis.Pipeliner) error {
deleted, err := tx.HDel(ps.ctx, ihLeecherKey, peerKey).Uint64()
err = asNil(err)
@@ -542,16 +529,13 @@ func (ps *store) GraduateLeecher(ih bittorrent.InfoHash, peer bittorrent.Peer) e
}
}
if err == nil {
err = tx.HSet(ps.ctx, ihSeederKey, peerKey, now).Err()
err = tx.HSet(ps.ctx, ihSeederKey, peerKey, ps.getClock()).Err()
}
if err == nil {
err = tx.Incr(ps.ctx, cntSeederKey).Err()
}
if err == nil {
err = tx.HSet(ps.ctx, ihSummaryKey, ihSeederKey, now).Err()
}
if err == nil {
err = tx.Incr(ps.ctx, cntInfoHashKey).Err()
err = tx.SAdd(ps.ctx, ihSummaryKey, ihSeederKey).Err()
}
return err
})
@@ -567,7 +551,7 @@ func (ps *store) AnnouncePeers(ih bittorrent.InfoHash, seeder bool, numWant int,
})
if peer.Addr().Is6() {
ihSeederKey, ihLeecherKey = ih6SeederKey, cnt6LeecherKey
ihSeederKey, ihLeecherKey = ih6SeederKey, ih6LeecherKey
} else {
ihSeederKey, ihLeecherKey = ih4SeederKey, ih4LeecherKey
}
@@ -647,7 +631,7 @@ func (ps *store) ScrapeSwarm(ih bittorrent.InfoHash, peer bittorrent.Peer) (resp
})
resp.InfoHash = ih
if peer.Addr().Is6() {
ihSeederKey, ihLeecherKey = ih6SeederKey, cnt6LeecherKey
ihSeederKey, ihLeecherKey = ih6SeederKey, ih6LeecherKey
} else {
ihSeederKey, ihLeecherKey = ih4SeederKey, ih4LeecherKey
}
@@ -737,6 +721,13 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) {
return
}
func (ps *store) GC(cutoff time.Time) {
log.Debug("storage: purging peers with no announces since", log.Fields{"before": cutoff})
cutoffUnix := cutoff.UnixNano()
ps.gc(cutoffUnix, false)
ps.gc(cutoffUnix, true)
}
// gc deletes all Peers from the Storage which are older than the
// cutoff time.
//
@@ -782,14 +773,14 @@ func (ps *store) Delete(ctx string, keys ...string) (err error) {
// - If the change happens after the HLEN, we will not even attempt to make the
// transaction. The infohash key will remain in the addressFamil hash and
// we'll attempt to clean it up the next time gc runs.
func (ps *store) gc(cutoffUnix int64, v6 bool) {
func (ps *store) gc(cutoffNanos int64, v6 bool) {
// list all infoHashKeys in the group
var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey, cntInfoHashKey string
var ihSummaryKey, ihSeederKey, ihLeecherKey, cntSeederKey, cntLeecherKey string
if v6 {
cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt6SeederKey, cnt6LeecherKey, cnt6InfoHashKey
cntSeederKey, cntLeecherKey = cnt6SeederKey, cnt6LeecherKey
ihSummaryKey, ihSeederKey, ihLeecherKey = ih6Key, ih6SeederKey, ih6LeecherKey
} else {
cntSeederKey, cntLeecherKey, cntInfoHashKey = cnt4SeederKey, cnt4LeecherKey, cnt4InfoHashKey
cntSeederKey, cntLeecherKey = cnt4SeederKey, cnt4LeecherKey
ihSummaryKey, ihSeederKey, ihLeecherKey = ih4Key, ih4SeederKey, ih4LeecherKey
}
infoHashKeys, err := ps.con.SMembers(ps.ctx, ihSummaryKey).Result()
@@ -818,7 +809,7 @@ func (ps *store) gc(cutoffUnix int64, v6 bool) {
var peer bittorrent.Peer
if peer, err = bittorrent.NewPeer(peerKey); err == nil {
if mtime, err := strconv.ParseInt(timeStamp, 10, 64); err == nil {
if mtime <= cutoffUnix {
if mtime <= cutoffNanos {
log.Debug("storage: Redis: deleting peer", log.Fields{
"Peer": peer,
})
@@ -853,22 +844,15 @@ func (ps *store) gc(cutoffUnix int64, v6 bool) {
}
}
// use WATCH to avoid race condition
// https://redis.io/topics/transactions
err = asNil(ps.con.Watch(ps.ctx, func(tx *redis.Tx) (err error) {
var infoHashCount int64
infoHashCount, err = ps.con.HLen(ps.ctx, infoHashKey).Result()
var infoHashCount uint64
infoHashCount, err = ps.con.HLen(ps.ctx, infoHashKey).Uint64()
err = asNil(err)
if err == nil && infoHashCount == 0 {
// Empty hashes are not shown among existing keys,
// in other words, it's removed automatically after `HDEL` the last field.
// _, err := ps.con.Del(ps.ctx, infoHashKey)
var deletedCount int64
deletedCount, err = ps.con.SRem(ps.ctx, ihSummaryKey, infoHashKey).Result()
err = asNil(err)
if err == nil && seeder && deletedCount > 0 {
err = ps.con.Decr(ps.ctx, cntInfoHashKey).Err()
}
err = asNil(ps.con.SRem(ps.ctx, ihSummaryKey, infoHashKey).Err())
}
return err
}, infoHashKey))

View File

@@ -3,6 +3,7 @@ package storage
import (
"errors"
"sync"
"time"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/log"
@@ -128,6 +129,9 @@ type Storage interface {
// Delete used to delete arbitrary data in specified context by its keys
Delete(ctx string, keys ...string) error
// GC used to delete stale data, such as timed out seeders/leechers
GC(cutoff time.Time)
// Stopper is an interface that expects a Stop method to stop the Storage.
// For more details see the documentation in the stop package.
stop.Stopper

View File

@@ -2,50 +2,46 @@ package test
import (
"math/rand"
"net"
"net/netip"
"runtime"
"sync/atomic"
"testing"
"github.com/sot-tech/mochi/bittorrent"
// used for seeding global math.Rand
_ "github.com/sot-tech/mochi/pkg/randseed"
"github.com/sot-tech/mochi/storage"
)
type benchData struct {
infohashes [1000]bittorrent.InfoHash
peers [1000]bittorrent.Peer
infoHashes [1000]bittorrent.InfoHash
peers [10000]bittorrent.Peer
}
func generateInfoHashes() (a [1000]bittorrent.InfoHash) {
for i := range a {
b := make([]byte, bittorrent.InfoHashV1Len)
rand.Read(b)
a[i], _ = bittorrent.NewInfoHash(b)
a[i] = randIH(rand.Int63()%2 == 0)
}
return
}
func generatePeers() (a [1000]bittorrent.Peer) {
func generatePeers() (a [10000]bittorrent.Peer) {
for i := range a {
ip := make([]byte, 4)
n, err := rand.Read(ip)
if err != nil || n != 4 {
panic("unable to create random bytes")
}
id := [bittorrent.PeerIDLen]byte{}
n, err = rand.Read(id[:])
if err != nil || n != bittorrent.InfoHashV1Len {
panic("unable to create random bytes")
var ip []byte
if rand.Int63()%2 == 0 {
ip = make([]byte, net.IPv4len)
} else {
ip = make([]byte, net.IPv6len)
}
rand.Read(ip)
addr, ok := netip.AddrFromSlice(ip)
if !ok {
panic("unable to create ip from random bytes")
}
port := uint16(rand.Uint32())
a[i] = bittorrent.Peer{
ID: id,
ID: randPeerID(),
AddrPort: netip.AddrPortFrom(addr, port),
}
}
@@ -122,7 +118,7 @@ func (bh *benchHolder) Nop(b *testing.B) {
// Put can run in parallel.
func (bh *benchHolder) Put(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
return ps.PutSeeder(bd.infohashes[0], bd.peers[0])
return ps.PutSeeder(bd.infoHashes[0], bd.peers[0])
})
}
@@ -132,27 +128,27 @@ func (bh *benchHolder) Put(b *testing.B) {
// Put1k can run in parallel.
func (bh *benchHolder) Put1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
return ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000])
return ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000])
})
}
// Put1kInfoHash benchmarks the PutSeeder method of a storage.Storage by cycling
// through 1000 infohashes and putting the same peer into their swarms.
// through 1000 infoHashes and putting the same peer into their swarms.
//
// Put1kInfoHash can run in parallel.
func (bh *benchHolder) Put1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
return ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0])
return ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0])
})
}
// Put1kInfoHash1k benchmarks the PutSeeder method of a storage.Storage by cycling
// through 1000 infohashes and 1000 Peers and calling Put with them.
// through 1000 infoHashes and 1000 Peers and calling Put with them.
//
// Put1kInfoHash1k can run in parallel.
func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return err
})
}
@@ -163,11 +159,11 @@ func (bh *benchHolder) Put1kInfoHash1k(b *testing.B) {
// PutDelete can not run in parallel.
func (bh *benchHolder) PutDelete(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[0], bd.peers[0])
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infohashes[0], bd.peers[0])
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0])
})
}
@@ -177,39 +173,39 @@ func (bh *benchHolder) PutDelete(b *testing.B) {
// PutDelete1k can not run in parallel.
func (bh *benchHolder) PutDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[0], bd.peers[i%1000])
err := ps.PutSeeder(bd.infoHashes[0], bd.peers[i%1000])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000])
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000])
})
}
// PutDelete1kInfoHash behaves like PutDelete1k with 1000 infohashes instead of
// PutDelete1kInfoHash behaves like PutDelete1k with 1000 infoHashes instead of
// 1000 Peers.
//
// PutDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[0])
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0])
return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0])
})
}
// PutDelete1kInfoHash1k behaves like PutDelete1k with 1000 infohashes in
// PutDelete1kInfoHash1k behaves like PutDelete1k with 1000 infoHashes in
// addition to 1000 Peers.
//
// PutDelete1kInfoHash1k can not run in parallel.
func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
err := ps.PutSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
if err != nil {
return err
}
err = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return err
})
}
@@ -220,7 +216,7 @@ func (bh *benchHolder) PutDelete1kInfoHash1k(b *testing.B) {
// DeleteNonexist can run in parallel.
func (bh *benchHolder) DeleteNonexist(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infohashes[0], bd.peers[0])
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0])
return nil
})
}
@@ -231,18 +227,18 @@ func (bh *benchHolder) DeleteNonexist(b *testing.B) {
// DeleteNonexist can run in parallel.
func (bh *benchHolder) DeleteNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000])
_ = ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000])
return nil
})
}
// DeleteNonexist1kInfoHash benchmarks the DeleteSeeder method of a storage.Storage by
// attempting to delete one Peer from one of 1000 infohashes.
// attempting to delete one Peer from one of 1000 infoHashes.
//
// DeleteNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0])
_ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0])
return nil
})
}
@@ -253,7 +249,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash(b *testing.B) {
// DeleteNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
_ = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return nil
})
}
@@ -264,7 +260,7 @@ func (bh *benchHolder) DeleteNonexist1kInfoHash1k(b *testing.B) {
// GradNonexist can run in parallel.
func (bh *benchHolder) GradNonexist(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infohashes[0], bd.peers[0])
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0])
return nil
})
}
@@ -275,7 +271,7 @@ func (bh *benchHolder) GradNonexist(b *testing.B) {
// GradNonexist1k can run in parallel.
func (bh *benchHolder) GradNonexist1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infohashes[0], bd.peers[i%1000])
_ = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000])
return nil
})
}
@@ -286,19 +282,19 @@ func (bh *benchHolder) GradNonexist1k(b *testing.B) {
// GradNonexist1kInfoHash can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0])
_ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0])
return nil
})
}
// GradNonexist1kInfoHash1k benchmarks the GraduateLeecher method of a storage.Storage
// by attempting to graduate one of 1000 nonexistent Peers for one of 1000
// infohashes.
// infoHashes.
//
// GradNonexist1kInfoHash1k can run in parallel.
func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, true, nil, func(i int, ps storage.Storage, bd *benchData) error {
_ = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
_ = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return nil
})
}
@@ -310,15 +306,15 @@ func (bh *benchHolder) GradNonexist1kInfoHash1k(b *testing.B) {
// PutGradDelete can not run in parallel.
func (bh *benchHolder) PutGradDelete(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[0], bd.peers[0])
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infohashes[0], bd.peers[0])
err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infohashes[0], bd.peers[0])
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[0])
})
}
@@ -327,51 +323,51 @@ func (bh *benchHolder) PutGradDelete(b *testing.B) {
// PutGradDelete1k can not run in parallel.
func (bh *benchHolder) PutGradDelete1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[0], bd.peers[i%1000])
err := ps.PutLeecher(bd.infoHashes[0], bd.peers[i%1000])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infohashes[0], bd.peers[i%1000])
err = ps.GraduateLeecher(bd.infoHashes[0], bd.peers[i%1000])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infohashes[0], bd.peers[i%1000])
return ps.DeleteSeeder(bd.infoHashes[0], bd.peers[i%1000])
})
}
// PutGradDelete1kInfoHash behaves like PutGradDelete with one of 1000
// infohashes.
// infoHashes.
//
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[0])
err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[0])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[0])
err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[0])
if err != nil {
return err
}
return ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[0])
return ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[0])
})
}
// PutGradDelete1kInfoHash1k behaves like PutGradDelete with one of 1000 Peers
// and one of 1000 infohashes.
// and one of 1000 infoHashes.
//
// PutGradDelete1kInfoHash can not run in parallel.
func (bh *benchHolder) PutGradDelete1kInfoHash1k(b *testing.B) {
bh.runBenchmark(b, false, nil, func(i int, ps storage.Storage, bd *benchData) error {
err := ps.PutLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
err := ps.PutLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
if err != nil {
return err
}
err = ps.GraduateLeecher(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
err = ps.GraduateLeecher(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
if err != nil {
return err
}
err = ps.DeleteSeeder(bd.infohashes[i%1000], bd.peers[(i*3)%1000])
err = ps.DeleteSeeder(bd.infoHashes[i%1000], bd.peers[(i*3)%1000])
return err
})
}
@@ -381,9 +377,9 @@ func putPeers(ps storage.Storage, bd *benchData) error {
for j := 0; j < 1000; j++ {
var err error
if j < 1000/2 {
err = ps.PutLeecher(bd.infohashes[i], bd.peers[j])
err = ps.PutLeecher(bd.infoHashes[i], bd.peers[j])
} else {
err = ps.PutSeeder(bd.infohashes[i], bd.peers[j])
err = ps.PutSeeder(bd.infoHashes[i], bd.peers[j])
}
if err != nil {
return err
@@ -400,18 +396,18 @@ func putPeers(ps storage.Storage, bd *benchData) error {
// AnnounceLeecher can run in parallel.
func (bh *benchHolder) AnnounceLeecher(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[0], false, 50, bd.peers[0])
_, err := ps.AnnouncePeers(bd.infoHashes[0], false, 50, bd.peers[0])
return err
})
}
// AnnounceLeecher1kInfoHash behaves like AnnounceLeecher with one of 1000
// infohashes.
// infoHashes.
//
// AnnounceLeecher1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[i%1000], false, 50, bd.peers[0])
_, err := ps.AnnouncePeers(bd.infoHashes[i%1000], false, 50, bd.peers[0])
return err
})
}
@@ -422,18 +418,18 @@ func (bh *benchHolder) AnnounceLeecher1kInfoHash(b *testing.B) {
// AnnounceSeeder can run in parallel.
func (bh *benchHolder) AnnounceSeeder(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[0], true, 50, bd.peers[0])
_, err := ps.AnnouncePeers(bd.infoHashes[0], true, 50, bd.peers[0])
return err
})
}
// AnnounceSeeder1kInfoHash behaves like AnnounceSeeder with one of 1000
// infohashes.
// infoHashes.
//
// AnnounceSeeder1kInfoHash can run in parallel.
func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
_, err := ps.AnnouncePeers(bd.infohashes[i%1000], true, 50, bd.peers[0])
_, err := ps.AnnouncePeers(bd.infoHashes[i%1000], true, 50, bd.peers[0])
return err
})
}
@@ -444,17 +440,17 @@ func (bh *benchHolder) AnnounceSeeder1kInfoHash(b *testing.B) {
// ScrapeSwarm can run in parallel.
func (bh *benchHolder) ScrapeSwarm(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
ps.ScrapeSwarm(bd.infohashes[0], bd.peers[0])
ps.ScrapeSwarm(bd.infoHashes[0], bd.peers[0])
return nil
})
}
// ScrapeSwarm1kInfoHash behaves like ScrapeSwarm with one of 1000 infohashes.
// ScrapeSwarm1kInfoHash behaves like ScrapeSwarm with one of 1000 infoHashes.
//
// ScrapeSwarm1kInfoHash can run in parallel.
func (bh *benchHolder) ScrapeSwarm1kInfoHash(b *testing.B) {
bh.runBenchmark(b, true, putPeers, func(i int, ps storage.Storage, bd *benchData) error {
ps.ScrapeSwarm(bd.infohashes[i%1000], bd.peers[0])
ps.ScrapeSwarm(bd.infoHashes[i%1000], bd.peers[0])
return nil
})
}

View File

@@ -2,6 +2,7 @@ package test
import (
"testing"
"time"
"github.com/stretchr/testify/require"
@@ -244,6 +245,19 @@ func (th *testHolder) CustomBulkPutContainsLoadDelete(t *testing.T) {
}
}
func (th *testHolder) GC(t *testing.T) {
for _, c := range testData {
require.Nil(t, th.st.PutSeeder(c.ih, c.peer))
require.Nil(t, th.st.PutSeeder(c.ih, v4Peer))
require.Nil(t, th.st.PutSeeder(c.ih, v6Peer))
}
th.st.GC(time.Now().Add(time.Hour))
for _, c := range testData {
_, err := th.st.AnnouncePeers(c.ih, false, 100, v4Peer)
require.Equal(t, storage.ErrResourceDoesNotExist, err)
}
}
// RunTests tests a Storage implementation against the interface.
func RunTests(t *testing.T, p storage.Storage) {
th := testHolder{st: p}
@@ -275,6 +289,8 @@ func RunTests(t *testing.T, p storage.Storage) {
t.Run("CustomPutContainsLoadDelete", th.CustomPutContainsLoadDelete)
t.Run("CustomBulkPutContainsLoadDelete", th.CustomBulkPutContainsLoadDelete)
t.Run("GC", th.GC)
e := th.st.Stop()
require.Nil(t, <-e)
}

View File

@@ -1,9 +1,12 @@
package test
import (
"math/rand"
"net/netip"
"github.com/sot-tech/mochi/bittorrent"
// used for seeding global math.Rand
_ "github.com/sot-tech/mochi/pkg/randseed"
)
var (
@@ -13,13 +16,32 @@ var (
v4Peer, v6Peer bittorrent.Peer
)
func randIH(v2 bool) (ih bittorrent.InfoHash) {
var b []byte
if v2 {
b = make([]byte, bittorrent.InfoHashV2Len)
} else {
b = make([]byte, bittorrent.InfoHashV1Len)
}
rand.Read(b)
ih, _ = bittorrent.NewInfoHash(b)
return
}
func randPeerID() (ih bittorrent.PeerID) {
b := make([]byte, bittorrent.PeerIDLen)
rand.Read(b)
ih, _ = bittorrent.NewPeerID(b)
return
}
func init() {
testIh1, _ = bittorrent.NewInfoHash("00000000000000000001")
testIh2, _ = bittorrent.NewInfoHash("00000000000000000002")
testPeerID0, _ = bittorrent.NewPeerID([]byte("00000000000000000001"))
testPeerID1, _ = bittorrent.NewPeerID([]byte("00000000000000000002"))
testPeerID2, _ = bittorrent.NewPeerID([]byte("99999999999999999994"))
testPeerID3, _ = bittorrent.NewPeerID([]byte("99999999999999999996"))
testIh1 = randIH(false)
testIh2 = randIH(true)
testPeerID0 = randPeerID()
testPeerID1 = randPeerID()
testPeerID2 = randPeerID()
testPeerID3 = randPeerID()
testData = []hashPeer{
{
testIh1,