Merge pull request #106 from sot-tech/lmdb

Implement LMDB storage
This commit is contained in:
SOT-TECH
2024-07-01 08:37:12 +00:00
committed by GitHub
26 changed files with 1337 additions and 166 deletions
+1 -1
View File
@@ -13,7 +13,7 @@ Modified version of [Chihaya](https://github.com/chihaya/chihaya), an open sourc
* Supports BittorrentV2 hashes (SHA-256 and _hybrid_
SHA-256-to-160 [BEP52](https://www.bittorrent.org/beps/bep_0052.html), tested with qBittorrent);
* Supports storage in middleware modules to persist useful data;
* Supports [KeyDB](https://keydb.dev) and [PostgreSQL](https://www.postgresql.org) storages;
* Supports [KeyDB](https://keydb.dev), [PostgreSQL](https://www.postgresql.org) and [LMDB](https://www.symas.com/lmdb) storages;
* Metrics can be turned off (not enabled till it really needed);
* Allows mixed peers: IPv4 requesters can fetch IPv6 peers or vice versa;
* Contains some internal improvements.
+1 -2
View File
@@ -11,6 +11,7 @@ import (
"net/netip"
"github.com/rs/zerolog"
"github.com/sot-tech/mochi/pkg/str2bytes"
)
@@ -26,8 +27,6 @@ var ErrInvalidPeerIDSize = fmt.Errorf("peer ID must be %d bytes", PeerIDLen)
var zeroPeerID PeerID
// NewPeerID creates a PeerID from a byte slice.
//
// It panics if b is not 20 bytes long.
func NewPeerID(b []byte) (PeerID, error) {
if len(b) != PeerIDLen {
return zeroPeerID, ErrInvalidPeerIDSize
+1
View File
@@ -19,6 +19,7 @@ import (
// Imports to register storage drivers.
_ "github.com/sot-tech/mochi/storage/keydb"
_ "github.com/sot-tech/mochi/storage/mdb"
sm "github.com/sot-tech/mochi/storage/memory"
_ "github.com/sot-tech/mochi/storage/pg"
_ "github.com/sot-tech/mochi/storage/redis"
+1 -1
View File
@@ -33,7 +33,7 @@ func (r *Server) Run(cfg *Config) (err error) {
log.Info().Msg("metrics disabled because of empty address")
}
r.storage, err = storage.NewStorage(cfg.Storage)
r.storage, err = storage.NewPeerStorage(cfg.Storage)
if err != nil {
return fmt.Errorf("failed to create storage: %w", err)
}
+5 -2
View File
@@ -206,8 +206,11 @@ prehooks:
# - name: torrent approval
# config:
# initial_source: list
# Save data provided by source in storage above
# preserve: false
# Save data provided by source in specific storage. If name is empty or 'internal', provided above 'storage'
# is used, but another storage may be provided (configuration is the same as for 'storage' above)
# storage:
# name: internal
# config:
# configuration:
# hash_list:
# - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5"
+93
View File
@@ -0,0 +1,93 @@
# @formatter:off
# Note: see `example_config.yaml` for `frontends` and `*hooks` config description
announce_interval: 30m
min_announce_interval: 15m
metrics_addr: ""
frontends:
- name: http
config:
addr: "0.0.0.0:6969"
tls: false
tls_cert_path: ""
tls_key_path: ""
reuse_port: true
read_timeout: 5s
write_timeout: 5s
enable_keepalive: false
idle_timeout: 30s
enable_request_timing: false
announce_routes:
- "/announce"
scrape_routes:
- "/scrape"
ping_routes:
- "/ping"
allow_ip_spoofing: false
filter_private_ips: false
real_ip_header: "x-real-ip"
max_numwant: 100
default_numwant: 50
max_scrape_infohashes: 50
- name: udp
config:
addr: "0.0.0.0:6969"
reuse_port: true
workers: 1
max_clock_skew: 10s
private_key: "paste a random string here that will be used to hmac connection IDs"
enable_request_timing: false
allow_ip_spoofing: false
filter_private_ips: false
max_numwant: 100
default_numwant: 50
max_scrape_infohashes: 50
# This block defines configuration used for redis storage.
storage:
name: lmdb
config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
gc_interval: 3m
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 31m
# Path to LMDB folder. Required.
Path: ""
# File mode of created database files, default is 0o640
mode: 0
# Name of database to store KV data. If not provided, root DB is used (not recommended)
data_db: ""
# Name of database to store peers data. If not provided, root DB is used (not recommended)
peers_db: ""
# Maximum size of database, default is 1GiB
max_size: 0
# Maximum number of threads/reader slots for the LMDB environment,
# default is 126.
max_readers: 0
# Set MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk.
# The installation of the flag can highly speed up writes, but there is a risk of DB damage
# or loss of last committed data if the application crashes.
async_write: true
# Set MDB_NOMETASYNC flag. Omit the metadata flush on commit.
# Can a little accelerate writes if `async_write` not set, but last committed data
# bay be lost if the application crashes.
no_sync_meta: false
posthooks: []
prehooks: []
+211
View File
@@ -0,0 +1,211 @@
# Hardware
* CPU: Intel i5-12500H
* RAM: 16GiB (2x8 Samsung M471A1K43EB1-CWE)
* Storage: NVME SSD Samsung 980PRO
* OS: Ubuntu 22.04
# Benchmarks
# Memory
```
goos: linux
goarch: amd64
pkg: github.com/sot-tech/mochi/storage/memory
cpu: 12th Gen Intel(R) Core(TM) i5-12500H
BenchmarkStorage/BenchmarkNop-16 1000000000 0.1822 ns/op 0 B/op 0 allocs/op
BenchmarkStorage/BenchmarkPut-16 7071364 173.1 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkPut1k-16 5342302 280.4 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash-16 17775769 65.81 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 15953836 68.41 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkPutDelete-16 4975660 237.0 ns/op 160 B/op 4 allocs/op
BenchmarkStorage/BenchmarkPutDelete1k-16 4842673 240.3 ns/op 160 B/op 4 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 4597555 248.2 ns/op 160 B/op 4 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 4776769 265.1 ns/op 160 B/op 4 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist-16 19164670 60.77 ns/op 96 B/op 3 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1k-16 29773471 42.81 ns/op 96 B/op 3 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 38686660 32.11 ns/op 96 B/op 3 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 36604658 32.88 ns/op 96 B/op 3 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete-16 2982174 419.8 ns/op 240 B/op 6 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1k-16 2881537 407.3 ns/op 240 B/op 6 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 2654642 443.4 ns/op 240 B/op 6 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 2630800 462.8 ns/op 240 B/op 6 allocs/op
BenchmarkStorage/BenchmarkGradNonexist-16 6837140 203.3 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1k-16 5347960 267.2 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 15773694 75.18 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 16675423 73.16 ns/op 80 B/op 2 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher-16 1368090 823.9 ns/op 4496 B/op 3 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 1401063 823.8 ns/op 4496 B/op 3 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder-16 4446224 273.7 ns/op 1424 B/op 2 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 4279449 280.8 ns/op 1424 B/op 2 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm-16 16012303 67.37 ns/op 16 B/op 1 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 82122622 14.58 ns/op 16 B/op 1 allocs/op
PASS
ok github.com/sot-tech/mochi/storage/memory 41.848s
```
# Redis
Version: 6.0.16
Configuration: OOTB
```
goos: linux
goarch: amd64
pkg: github.com/sot-tech/mochi/storage/redis
cpu: 12th Gen Intel(R) Core(TM) i5-12500H
BenchmarkStorage/BenchmarkNop-16 1000000000 0.1611 ns/op 0 B/op 0 allocs/op
BenchmarkStorage/BenchmarkPut-16 180381 6148 ns/op 1257 B/op 37 allocs/op
BenchmarkStorage/BenchmarkPut1k-16 203150 6314 ns/op 1273 B/op 37 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash-16 196033 6187 ns/op 1249 B/op 37 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 202513 6176 ns/op 1265 B/op 37 allocs/op
BenchmarkStorage/BenchmarkPutDelete-16 26925 40429 ns/op 1736 B/op 56 allocs/op
BenchmarkStorage/BenchmarkPutDelete1k-16 27751 39310 ns/op 1768 B/op 56 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 28142 41585 ns/op 1720 B/op 56 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 29500 39063 ns/op 1752 B/op 56 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist-16 281184 4451 ns/op 320 B/op 13 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1k-16 235394 4316 ns/op 334 B/op 13 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 272566 4569 ns/op 312 B/op 13 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 278380 4315 ns/op 326 B/op 13 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete-16 20154 64004 ns/op 3664 B/op 108 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1k-16 19230 61428 ns/op 3712 B/op 108 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 18270 62749 ns/op 3632 B/op 108 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 20653 60017 ns/op 3680 B/op 108 allocs/op
BenchmarkStorage/BenchmarkGradNonexist-16 157063 7336 ns/op 1929 B/op 52 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1k-16 161649 7598 ns/op 1945 B/op 52 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 163005 7436 ns/op 1913 B/op 52 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 151135 7524 ns/op 1929 B/op 52 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher-16 70124 16813 ns/op 15277 B/op 83 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 71144 17011 ns/op 15261 B/op 83 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder-16 149967 8055 ns/op 6814 B/op 42 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 141770 8202 ns/op 6806 B/op 42 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm-16 55156 21739 ns/op 1120 B/op 41 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 58994 21821 ns/op 1087 B/op 41 allocs/op
PASS
ok github.com/sot-tech/mochi/storage/redis 43.235s
```
## KeyDB
Version: 6.3.4
Configuration: OOTB
```
goos: linux
goarch: amd64
pkg: github.com/sot-tech/mochi/storage/keydb
cpu: 12th Gen Intel(R) Core(TM) i5-12500H
BenchmarkStorage/BenchmarkNop-16 1000000000 0.1873 ns/op 0 B/op 0 allocs/op
BenchmarkStorage/BenchmarkPut-16 141714 8824 ns/op 553 B/op 21 allocs/op
BenchmarkStorage/BenchmarkPut1k-16 141138 9215 ns/op 566 B/op 21 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash-16 113070 8939 ns/op 546 B/op 21 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 134258 8578 ns/op 558 B/op 21 allocs/op
BenchmarkStorage/BenchmarkPutDelete-16 31476 37899 ns/op 856 B/op 33 allocs/op
BenchmarkStorage/BenchmarkPutDelete1k-16 34111 35877 ns/op 880 B/op 33 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 31716 35344 ns/op 840 B/op 33 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 33234 37156 ns/op 864 B/op 33 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist-16 292579 4302 ns/op 320 B/op 13 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1k-16 289604 4401 ns/op 334 B/op 13 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 272372 4297 ns/op 312 B/op 13 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 284304 4165 ns/op 326 B/op 13 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete-16 23744 49850 ns/op 1304 B/op 48 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1k-16 23378 53921 ns/op 1344 B/op 48 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 25268 49024 ns/op 1272 B/op 48 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 21583 50219 ns/op 1312 B/op 48 allocs/op
BenchmarkStorage/BenchmarkGradNonexist-16 133311 8960 ns/op 669 B/op 24 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1k-16 134439 8884 ns/op 683 B/op 24 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 130093 9005 ns/op 653 B/op 24 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 137900 8774 ns/op 667 B/op 24 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher-16 75067 16103 ns/op 15276 B/op 83 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 72286 16315 ns/op 15261 B/op 83 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder-16 165186 7705 ns/op 6814 B/op 42 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 151609 7799 ns/op 6806 B/op 42 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm-16 54744 21244 ns/op 1120 B/op 41 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 54865 21736 ns/op 1088 B/op 41 allocs/op
PASS
ok github.com/sot-tech/mochi/storage/keydb 44.466s
```
## PostgreSQL
Version: 14.12
Configuration: OOTB
```
goos: linux
goarch: amd64
pkg: github.com/sot-tech/mochi/storage/pg
cpu: 12th Gen Intel(R) Core(TM) i5-12500H
BenchmarkStorage/BenchmarkNop-16 1000000000 0.1687 ns/op 0 B/op 0 allocs/op
BenchmarkStorage/BenchmarkPut-16 58521 19857 ns/op 2213 B/op 44 allocs/op
BenchmarkStorage/BenchmarkPut1k-16 153538 7259 ns/op 2208 B/op 44 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash-16 151515 7319 ns/op 2204 B/op 44 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 165950 7452 ns/op 2211 B/op 44 allocs/op
BenchmarkStorage/BenchmarkPutDelete-16 17130 61564 ns/op 4274 B/op 81 allocs/op
BenchmarkStorage/BenchmarkPutDelete1k-16 19200 60632 ns/op 4285 B/op 81 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 19923 59570 ns/op 4273 B/op 81 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 20510 61910 ns/op 4285 B/op 80 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist-16 184810 5485 ns/op 2098 B/op 37 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1k-16 187735 5514 ns/op 2108 B/op 37 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 217992 5621 ns/op 2099 B/op 37 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 215710 5569 ns/op 2109 B/op 37 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete-16 8280 139091 ns/op 7306 B/op 143 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1k-16 9010 133127 ns/op 7320 B/op 143 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 8030 133810 ns/op 7305 B/op 143 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 8217 136543 ns/op 7321 B/op 143 allocs/op
BenchmarkStorage/BenchmarkGradNonexist-16 20883 56024 ns/op 3101 B/op 62 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1k-16 20932 55863 ns/op 3106 B/op 62 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 77368 15365 ns/op 3067 B/op 62 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 65448 15568 ns/op 3081 B/op 62 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher-16 55521 21614 ns/op 22565 B/op 380 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 54933 21813 ns/op 22571 B/op 380 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder-16 117950 10443 ns/op 9239 B/op 190 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 113031 10490 ns/op 9242 B/op 190 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm-16 103208 11945 ns/op 2951 B/op 46 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 101088 12110 ns/op 2952 B/op 46 allocs/op
PASS
ok github.com/sot-tech/mochi/storage/pg 64.953s
```
## LMDB
Version: 0.9.31
```
goos: linux
goarch: amd64
pkg: github.com/sot-tech/mochi/storage/mdb
cpu: 12th Gen Intel(R) Core(TM) i5-12500H
BenchmarkStorage/BenchmarkNop-16 1000000000 0.1618 ns/op 0 B/op 0 allocs/op
BenchmarkStorage/BenchmarkPut-16 356353 3095 ns/op 264 B/op 5 allocs/op
BenchmarkStorage/BenchmarkPut1k-16 402933 3055 ns/op 264 B/op 5 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash-16 381813 2966 ns/op 256 B/op 5 allocs/op
BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 376526 3083 ns/op 256 B/op 5 allocs/op
BenchmarkStorage/BenchmarkPutDelete-16 423085 2595 ns/op 528 B/op 10 allocs/op
BenchmarkStorage/BenchmarkPutDelete1k-16 443782 2656 ns/op 528 B/op 10 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 434588 2738 ns/op 512 B/op 10 allocs/op
BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 393522 2770 ns/op 512 B/op 10 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist-16 887683 1353 ns/op 300 B/op 7 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1k-16 914343 1323 ns/op 300 B/op 7 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 921487 1328 ns/op 292 B/op 7 allocs/op
BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 911662 1322 ns/op 292 B/op 7 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete-16 210553 5038 ns/op 796 B/op 16 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1k-16 205284 5106 ns/op 796 B/op 16 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 193164 5751 ns/op 772 B/op 16 allocs/op
BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 199144 5867 ns/op 772 B/op 16 allocs/op
BenchmarkStorage/BenchmarkGradNonexist-16 264282 4037 ns/op 304 B/op 8 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1k-16 274296 3930 ns/op 304 B/op 8 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 268438 4886 ns/op 296 B/op 8 allocs/op
BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 191442 5425 ns/op 296 B/op 8 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher-16 647808 1658 ns/op 3648 B/op 15 allocs/op
BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 684120 1813 ns/op 3636 B/op 15 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder-16 877350 1282 ns/op 3416 B/op 10 allocs/op
BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 847063 1307 ns/op 3404 B/op 10 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm-16 614961 1767 ns/op 712 B/op 20 allocs/op
BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 659012 1870 ns/op 700 B/op 20 allocs/op
PASS
ok github.com/sot-tech/mochi/storage/mdb 42.032s
```
+114
View File
@@ -0,0 +1,114 @@
# LMDB Storage
This storage uses LMDB persistent key-value database to store peer and/or arbitrary key-value data.
_Note: **requires** CGO-enabled build (`CGO_ENABLED=1 go install...`),
because of native bindings to C LMDB code._
# Use case
LMDB may be used in local installation to read/write data from/to (local or network mounted)
filesystem directly, without network stack.
It supports multi-thread/process (multi-reader, single-writer) access, and DB size larger than RAM,
so *may* be used within several applications as shared store, e.g. external app can modify allowed/denied
info hashes for `torrentapproval` middleware or mochi cluster with shared DB over NFS.
As it filesystem database, performance is also depends on filesystem/disk performance, so it *may*
be slower than tuned PostgreSQL or Redis.
## Configuration and implementation notes
Basically LDMB environment is a directory with 2 files: data.mdb and lock.mdb, which will be created on start.
Each *environment* may contain several named databases. If name for specific database (for peers and arbitrary data)
is not provided, default (0-th) database is used, which is *NOT* recommended, because is contains internal information.
Both stored key and value are byte arrays.
Arbitrary DB key format is `<PREFIX>_<KEY>`, value is byte array converted string.
Peers DB format is:
1. Key `<PREFIX>_<INFOHASH>_<PEERID><IPADDRESS><PORT>`, value - BE-encoded unix timestamp.
Fields:
* `<PREFIX>` - `L4`, `L6`, `S4`, `S6` string for leecher with IPv4 or IPv6 address, or seeder with IPv4 or IPv6 address (accordingly)
* `<INFOHASH>` - 20 or 32 bytes of info hash (V1 or V2 accordingly)
* `<PEERID>` - 20 bytes of peer ID
* `<IPADDRESS>` - 16 bytes of BE-encoded IP address (real IPv6 or IPv4-mapped IPv6 address)
* `<PORT>` - 2 bytes of BE-encoded port
2. Key `DC_<INFOHASH>_` - downloaded count of specified `<INFOHASH>` (20 or 32 bytes), value - BE-encoded unsigned 32-bit integer.
Write speed may be increased with `no_sync_meta` and `async_write` configuration options,
but the risk of DB corruption is also increase.
With `async_write` option, write speed will be a little faster than locally installed OOTB Redis,
but application will use more RAM (dirty pages).
Without it, you will get dramatically slow write speed: every write transaction will be flushed to disk.
Option `no_sync_meta` **without** `async_write` will increase write speed up to 3 times
(with potential loss of last transaction if application crashed/killed).
Both enabled options don't make much sense and also don't affect read performance.
Benchmarks with combinations of options above:
```
async_write=false, no_sync_meta=false
BenchmarkStorage/BenchmarkPutGradDelete-16 123 9000653 ns/op
async_write=false, no_sync_meta=true
BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 306 4001353 ns/op
async_write=true, no_sync_meta=false
BenchmarkStorage/BenchmarkPutGradDelete-16 152656 7754 ns/op
async_write=true, no_sync_meta=true
BenchmarkStorage/BenchmarkPutGradDelete-16 150116 7735 ns/op
```
**Sample configuration:**
```yaml
storage:
name: lmdb
config:
# The frequency which stale peers are removed.
# This balances between
# - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value)
# - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value).
gc_interval: 3m
# The amount of time until a peer is considered stale.
# To avoid churn, keep this slightly larger than `announce_interval`
peer_lifetime: 31m
# Path to LMDB folder. Required.
Path: "/some/path/to/lmdb/directory"
# File mode of created database files, default is 0o640
mode: 0640
# Name of database to store KV data. If not provided, root DB is used (not recommended)
data_db: "DATA"
# Name of database to store peers data. If not provided, root DB is used (not recommended)
peers_db: "PEERS"
# Maximum size of database, default is 1GiB.
# It's better specify enough space, because if environment is full,
# storage will fail to add new records and restart and specifying larger size
# (or online resizing with external tool) will be required.
# See: http://www.lmdb.tech/doc/group__mdb.html#gaa2506ec8dab3d969b0e609cd82e619e5
max_size: 1073741824
# Maximum number of threads/reader slots for the LMDB environment, default is 126.
# See: http://www.lmdb.tech/doc/group__mdb.html#gae687966c24b790630be2a41573fe40e2
max_readers: 126
# Set flags to use asynchronous flushes to disk.
# See: MDB_WRITEMAP and MDB_MAPASYNC description in http://www.lmdb.tech/doc/group__mdb.html#ga32a193c6bf4d7d5c5d579e71f22e9340
async_write: true
# Set MDB_NOMETASYNC flag. Omit the metadata flush on commit.
# See: MDB_NOMETASYNC description in http://www.lmdb.tech/doc/group__mdb.html#ga32a193c6bf4d7d5c5d579e71f22e9340
no_sync_meta: false
```
+1 -1
View File
@@ -3,7 +3,7 @@
This storage uses PostgreSQL-like database to store peer and arbitrary key-value data.
'PostgreSQL-like' means, that you can use any database which _understand_ PostgreSQL protocol
i.e. _real_ [PostgreSQL](https://www.postgresql.org) or [CockroachDB](https://www.cockroachlabs.com).
e.g. _real_ [PostgreSQL](https://www.postgresql.org) or [CockroachDB](https://www.cockroachlabs.com).
_(YugabyteDB is not recommended (at the moment), because of some problems with
concurrent inserts while benchmarks.)_
+1 -1
View File
@@ -16,7 +16,7 @@ func init() {
}
func TestStartStopRaceIssue437(t *testing.T) {
ps, err := storage.NewStorage(conf.NamedMapConfig{
ps, err := storage.NewPeerStorage(conf.NamedMapConfig{
Name: "memory",
Config: conf.MapConfig{},
})
+16 -14
View File
@@ -3,10 +3,11 @@ module github.com/sot-tech/mochi
go 1.22
require (
code.cloudfoundry.org/go-diodes v0.0.0-20240515174142-71582f284718
code.cloudfoundry.org/go-diodes v0.0.0-20240604201846-c756bfed2ed3
github.com/MicahParks/jwkset v0.5.18
github.com/MicahParks/keyfunc/v3 v3.3.3
github.com/anacrolix/torrent v1.56.0
github.com/PowerDNS/lmdb-go v1.9.2
github.com/anacrolix/torrent v1.56.1
github.com/cespare/xxhash/v2 v2.3.0
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/jackc/pgx/v5 v5.6.0
@@ -14,10 +15,10 @@ require (
github.com/minio/sha256-simd v1.0.1
github.com/mitchellh/mapstructure v1.5.0
github.com/prometheus/client_golang v1.19.1
github.com/redis/go-redis/v9 v9.5.2
github.com/redis/go-redis/v9 v9.5.3
github.com/rs/zerolog v1.33.0
github.com/stretchr/testify v1.9.0
github.com/valyala/fasthttp v1.54.0
github.com/valyala/fasthttp v1.55.0
gopkg.in/yaml.v3 v3.0.1
)
@@ -34,12 +35,12 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
@@ -47,16 +48,17 @@ require (
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/common v0.54.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
google.golang.org/protobuf v1.34.2 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)
+36 -34
View File
@@ -1,7 +1,7 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
code.cloudfoundry.org/go-diodes v0.0.0-20240515174142-71582f284718 h1:6wazSuHaJjatGy8pvchSy6L+4M67WPfjgK9yh7cJLMs=
code.cloudfoundry.org/go-diodes v0.0.0-20240515174142-71582f284718/go.mod h1:eVHabU/rcpC5ocoIAXvnaySkdH6+PgORRVlY5l3SDys=
code.cloudfoundry.org/go-diodes v0.0.0-20240604201846-c756bfed2ed3 h1:4WCYwJmqSfV7ChDohsJB8Z0aDVklIE+n8OTBJxpif0c=
code.cloudfoundry.org/go-diodes v0.0.0-20240604201846-c756bfed2ed3/go.mod h1:8O5g1DEzJU9ktEmykKPhY4mZOM/dBENWVHKVInuuch8=
crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk=
crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
@@ -9,6 +9,8 @@ github.com/MicahParks/jwkset v0.5.18 h1:WLdyMngF7rCrnstQxA7mpRoxeaWqGzPM/0z40PJU
github.com/MicahParks/jwkset v0.5.18/go.mod h1:q8ptTGn/Z9c4MwbcfeCDssADeVQb3Pk7PnVxrvi+2QY=
github.com/MicahParks/keyfunc/v3 v3.3.3 h1:c6j9oSu1YUo0k//KwF1miIQlEMtqNlj7XBFLB8jtEmY=
github.com/MicahParks/keyfunc/v3 v3.3.3/go.mod h1:f/UMyXdKfkZzmBeBFUeYk+zu066J1Fcl48f7Wnl5Z48=
github.com/PowerDNS/lmdb-go v1.9.2 h1:Cmgerh9y3ZKBZGz1irxSShhfmFyRUh+Zdk4cZk7ZJvU=
github.com/PowerDNS/lmdb-go v1.9.2/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU=
github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w=
github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI=
github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo=
@@ -47,8 +49,8 @@ github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQ
github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw=
github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8=
github.com/anacrolix/torrent v1.56.0 h1:g/sM0K/BaWUv4Htu2bblLBhIxGdFZ1MUCoD7lcvemlo=
github.com/anacrolix/torrent v1.56.0/go.mod h1:5DMHbeIM1TuC5wTQ99XieKKLiYZYz6iB2lyZpKZEr6w=
github.com/anacrolix/torrent v1.56.1 h1:QeJMOP0NuhpQ5dATsOqEL0vUO85aPMNMGP2FACNt0Eg=
github.com/anacrolix/torrent v1.56.1/go.mod h1:5DMHbeIM1TuC5wTQ99XieKKLiYZYz6iB2lyZpKZEr6w=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
@@ -142,12 +144,12 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.0.0/go.mod h1:4qWG/gcEcfX4z/mBDHJ++3ReCw9ibxbsNJbcucJdbSo=
github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4=
github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU=
github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI=
github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA=
github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
@@ -158,10 +160,10 @@ github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVY
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -200,8 +202,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo/v2 v2.17.3 h1:oJcvKpIb7/8uLpDDtnQuf18xVnwKp8DTD7DQ6gTd/MU=
github.com/onsi/ginkgo/v2 v2.17.3/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc=
github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA=
github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk=
github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0=
@@ -228,8 +230,8 @@ github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQy
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE=
github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U=
github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8=
github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
@@ -238,8 +240,8 @@ github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/go-redis/v9 v9.5.2 h1:L0L3fcSNReTRGyZ6AqAEN0K56wYeYAwapBIhkvh0f3E=
github.com/redis/go-redis/v9 v9.5.2/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU=
github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
@@ -268,8 +270,8 @@ github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW
github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0=
github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM=
github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8=
github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM=
github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
@@ -277,11 +279,11 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc h1:O9NuF4s+E/PvMIy+9IUZB9znFwUIXEWSstNjek6VpVg=
golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY=
golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@@ -296,8 +298,8 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -322,12 +324,12 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -336,8 +338,8 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw=
golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA=
golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
@@ -353,8 +355,8 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg=
google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+1 -1
View File
@@ -22,7 +22,7 @@ type Hook interface {
// to check if it is operational. Used in frontend.Logic.
//
// It may be useful in cases when Hook performs foreign requests to
// some external resources (i.e. storage) and `ping` request should
// some external resources (e.g. storage) and `ping` request should
// also check resource availability.
type Pinger interface {
Ping(ctx context.Context) error
+26 -9
View File
@@ -4,6 +4,7 @@ package torrentapproval
import (
"context"
"errors"
"fmt"
"io"
@@ -11,19 +12,20 @@ import (
"github.com/sot-tech/mochi/middleware"
"github.com/sot-tech/mochi/middleware/torrentapproval/container"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/storage/memory"
"github.com/sot-tech/mochi/storage"
// import directory watcher to enable appropriate support
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory"
// import static list to enable appropriate support
_ "github.com/sot-tech/mochi/middleware/torrentapproval/container/list"
"github.com/sot-tech/mochi/storage"
)
// Name is the name by which this middleware is registered with Conf.
const Name = "torrent approval"
const internalStore = "internal"
func init() {
middleware.RegisterBuilder(Name, build)
}
@@ -31,9 +33,10 @@ func init() {
type baseConfig struct {
// Source - name of container for initial values
Source string `cfg:"initial_source"`
// Preserve - if true, container will receive real registered storage if it is NOT `memory`
// if false - temporary in-memory storage will be used or created
// Deprecated: use Storage parameter
Preserve bool
// Storage where to hold provided data by Source
Storage conf.NamedMapConfig
// Configuration depends on used container
Configuration conf.MapConfig
}
@@ -52,14 +55,22 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er
return nil, fmt.Errorf("invalid config for middleware %s: config not provided", Name)
}
var ds storage.DataStorage = st
if !cfg.Preserve && ds.Preservable() {
ds = memory.NewDataStorage()
if cfg.Preserve {
return nil, errors.New("preserve option is deprecated, use store parameter")
}
var ds, dsc storage.DataStorage
if len(cfg.Storage.Name) == 0 || cfg.Storage.Name == internalStore {
ds = st
} else if ds, err = storage.NewDataStorage(cfg.Storage); err == nil {
dsc = ds
} else {
return
}
var c container.Container
if c, err = container.GetContainer(cfg.Source, cfg.Configuration, ds); err == nil {
h = &hook{c}
h = &hook{c, dsc}
}
return h, err
}
@@ -68,7 +79,8 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er
var ErrTorrentUnapproved = bittorrent.ClientError("torrent not allowed by mochi")
type hook struct {
hashContainer container.Container
hashContainer container.Container
providedStorage storage.DataStorage
}
func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) {
@@ -90,5 +102,10 @@ func (h *hook) Close() (err error) {
if cl, isOk := h.hashContainer.(io.Closer); isOk {
err = cl.Close()
}
if h.providedStorage != nil {
if stErr := h.providedStorage.Close(); stErr != nil {
err = errors.Join(err, stErr)
}
}
return err
}
@@ -71,8 +71,7 @@ var cases = []struct {
}
func TestHandleAnnounce(t *testing.T) {
config := memory.Config{}.Validate()
storage, err := memory.NewPeerStorage(config)
storage, err := memory.Builder{}.NewPeerStorage(make(conf.MapConfig))
require.Nil(t, err)
for _, tt := range cases {
t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) {
@@ -84,10 +83,10 @@ func TestHandleAnnounce(t *testing.T) {
req := &bittorrent.AnnounceRequest{}
resp := &bittorrent.AnnounceResponse{}
hashinfo, err := bittorrent.NewInfoHashString(tt.ih)
ih, err := bittorrent.NewInfoHashString(tt.ih)
require.Nil(t, err)
req.InfoHash = hashinfo
req.InfoHash = ih
nctx, err := h.HandleAnnounce(ctx, req, resp)
require.Equal(t, ctx, nctx)
+14 -2
View File
@@ -33,10 +33,12 @@ var (
func init() {
// Register the storage driver.
storage.RegisterDriver("keydb", builder)
storage.RegisterDriver("keydb", builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
type builder struct{}
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg r.Config
var err error
@@ -47,6 +49,16 @@ func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
return newStore(cfg)
}
func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
var cfg r.Config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return r.NewStore(cfg)
}
func newStore(cfg r.Config) (*store, error) {
var err error
if cfg, err = cfg.Validate(); err != nil {
+598
View File
@@ -0,0 +1,598 @@
//go:build cgo
// Package mdb implements LMDB data and peer storage
package mdb
import (
"bytes"
"context"
"encoding/binary"
"errors"
"net/netip"
"os"
"sync"
"time"
"github.com/PowerDNS/lmdb-go/exp/lmdbsync"
"github.com/PowerDNS/lmdb-go/lmdb"
"github.com/PowerDNS/lmdb-go/lmdbscan"
"github.com/sot-tech/mochi/bittorrent"
"github.com/sot-tech/mochi/pkg/conf"
"github.com/sot-tech/mochi/pkg/log"
"github.com/sot-tech/mochi/pkg/timecache"
"github.com/sot-tech/mochi/storage"
)
const (
// Name - registered name of the storage
Name = "lmdb"
defaultMode = 0o640
defaultMapSize = 1 << 30
defaultMaxReaders = 126
)
var logger = log.NewLogger("storage/lmdb")
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, builder{})
}
type builder struct{}
func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
return b.NewPeerStorage(icfg)
}
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return newStorage(cfg)
}
type config struct {
Path string
Mode uint32
DataDBName string `cfg:"data_db"`
PeersDBName string `cfg:"peers_db"`
// MaxSize - size of the memory map to use for lmdb environment.
// The size should be a multiple of the OS page size.
// Mochi's default is 1GiB.
MaxSize int64 `cfg:"max_size"`
// MaxReaders - maximum number of threads/reader slots for the LMDB environment.
// LMDB library's default is 126.
MaxReaders int `cfg:"max_readers"`
// AsyncWrite sets MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk.
AsyncWrite bool `cfg:"async_write"`
// NoMetaSync sets MDB_NOMETASYNC flag, omit the metadata flush.
NoMetaSync bool `cfg:"no_sync_meta"`
}
var (
errPathNotProvided = errors.New("lmdb path not provided")
errPathNotDirectory = errors.New("lmdb path is not directory")
)
func (cfg config) validate() (config, error) {
validCfg := cfg
if len(cfg.Path) == 0 {
return cfg, errPathNotProvided
} else if stat, err := os.Stat(cfg.Path); err != nil {
return cfg, err
} else if !stat.IsDir() {
return cfg, errPathNotDirectory
}
if cfg.Mode == 0 {
validCfg.Mode = defaultMode
logger.Warn().
Str("name", "mode").
Stringer("provided", os.FileMode(cfg.Mode)).
Stringer("default", os.FileMode(validCfg.Mode)).
Msg("falling back to default configuration")
}
if cfg.MaxSize <= 0 {
validCfg.MaxSize = defaultMapSize
logger.Warn().
Str("name", "max_size").
Int64("provided", cfg.MaxSize).
Int64("default", validCfg.MaxSize).
Msg("falling back to default configuration")
}
if cfg.MaxReaders <= 0 {
validCfg.MaxReaders = defaultMaxReaders
logger.Warn().
Str("name", "max_readers").
Int("provided", cfg.MaxReaders).
Int("default", 126).
Msg("falling back to default configuration")
}
return validCfg, nil
}
type lmdbEnv interface {
SetMaxDBs(int) error
SetMapSize(int64) error
SetMaxReaders(int) error
Open(string, uint, os.FileMode) error
View(lmdb.TxnOp) error
Update(lmdb.TxnOp) error
Close() error
Sync(bool) error
Info() (*lmdb.EnvInfo, error)
}
type mdb struct {
lmdbEnv
dataDB, peersDB lmdb.DBI
onceCloser sync.Once
closed chan any
wg sync.WaitGroup
}
func newStorage(cfg config) (*mdb, error) {
var err error
if cfg, err = cfg.validate(); err != nil {
return nil, err
}
lmEnv, err := lmdb.NewEnv()
if err != nil {
return nil, err
}
var env lmdbEnv
if env, err = lmdbsync.NewEnv(lmEnv,
lmdbsync.MapResizedHandler(lmdbsync.MapResizedDefaultRetry, lmdbsync.MapResizedDefaultDelay),
); err != nil {
return nil, err
}
if err = env.SetMaxDBs(2); err != nil {
return nil, err
}
if err = env.SetMapSize(cfg.MaxSize); err != nil {
return nil, err
}
if cfg.MaxReaders > 0 {
if err = env.SetMaxReaders(cfg.MaxReaders); err != nil {
return nil, err
}
}
var flags uint
if cfg.AsyncWrite {
flags |= lmdb.WriteMap | lmdb.MapAsync
}
if cfg.NoMetaSync {
flags |= lmdb.NoMetaSync
}
if err = env.Open(cfg.Path, flags, os.FileMode(cfg.Mode)); err != nil {
return nil, err
}
var dataDB, peersDB lmdb.DBI
if err = env.Update(func(txn *lmdb.Txn) (err error) {
if len(cfg.DataDBName) > 0 {
dataDB, err = txn.CreateDBI(cfg.DataDBName)
} else {
dataDB, err = txn.OpenRoot(0)
}
if err != nil {
return
}
if len(cfg.PeersDBName) > 0 {
peersDB, err = txn.CreateDBI(cfg.PeersDBName)
} else {
peersDB, err = txn.OpenRoot(0)
}
return
}); err != nil {
_ = env.Close()
return nil, err
}
return &mdb{
lmdbEnv: env,
dataDB: dataDB,
peersDB: peersDB,
closed: make(chan any),
}, nil
}
func (*mdb) Preservable() bool {
return true
}
func (m *mdb) Close() (err error) {
m.onceCloser.Do(func() {
if m.lmdbEnv != nil {
close(m.closed)
m.wg.Wait()
logger.Info().Msg("LMDB exiting. Flushing databases to disk")
_ = m.Sync(true)
err = m.lmdbEnv.Close()
}
})
return
}
const keySeparator = '_'
func ignoreNotFound(err error) error {
if lmdb.IsNotFound(err) {
err = nil
}
return err
}
func ignoreNotFoundData(data []byte, err error) ([]byte, error) {
if lmdb.IsNotFound(err) {
err = nil
}
return data, err
}
func composeKey(ctx, key string) []byte {
ctxLen := len(ctx)
res := make([]byte, ctxLen+len(key)+1)
copy(res, ctx)
res[ctxLen] = keySeparator
copy(res[ctxLen+1:], key)
return res
}
func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (err error) {
if len(values) > 0 {
err = m.Update(func(txn *lmdb.Txn) (err error) {
var data []byte
for _, kv := range values {
vl := len(kv.Value)
if data, err = txn.PutReserve(m.dataDB, composeKey(storeCtx, kv.Key), vl, 0); err == nil {
copy(data, kv.Value)
} else {
break
}
}
return
})
}
return
}
func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains bool, err error) {
err = m.View(func(txn *lmdb.Txn) (err error) {
_, err = txn.Get(m.dataDB, composeKey(storeCtx, key))
return
})
if err == nil {
contains = true
} else if lmdb.IsNotFound(err) {
err = nil
}
return
}
func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) {
err = m.View(func(txn *lmdb.Txn) (err error) {
v, err = ignoreNotFoundData(txn.Get(m.dataDB, composeKey(storeCtx, key)))
return
})
return
}
func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err error) {
if len(keys) > 0 {
err = m.Update(func(txn *lmdb.Txn) (err error) {
for _, k := range keys {
if err = ignoreNotFound(txn.Del(m.dataDB, composeKey(storeCtx, k), nil)); err != nil {
break
}
}
return
})
}
return
}
const (
ipLen = 16
packedPeerLen = bittorrent.PeerIDLen + ipLen + 2 // peer_id + ipv6 + port
seederPrefix = 'S'
leecherPrefix = 'L'
ipv4Prefix = '4'
ipv6Prefix = '6'
countPrefix = 'C'
downloadedPrefix = 'D'
)
func packPeer(peer bittorrent.Peer, out []byte) {
_ = out[packedPeerLen-1]
copy(out, peer.ID.Bytes())
a := peer.Addr().As16()
copy(out[bittorrent.PeerIDLen:], a[:])
binary.BigEndian.PutUint16(out[bittorrent.PeerIDLen+ipLen:], peer.Port())
}
func unpackPeer(arr []byte) (peer bittorrent.Peer) {
_ = arr[packedPeerLen-1]
peerID, _ := bittorrent.NewPeerID(arr[:bittorrent.PeerIDLen])
peer = bittorrent.Peer{
ID: peerID,
AddrPort: netip.AddrPortFrom(netip.AddrFrom16([ipLen]byte(arr[bittorrent.PeerIDLen:])).Unmap(),
binary.BigEndian.Uint16(arr[bittorrent.PeerIDLen+ipLen:])),
}
return
}
func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) {
ihLen := len(ih)
ihKey = make([]byte, ihLen+4+suffixLen) // prefix{L/S} + prefix{4/6} + separator + infoHash + separator
if seeder {
ihKey[0] = seederPrefix
} else {
ihKey[0] = leecherPrefix
}
if v6 {
ihKey[1] = ipv6Prefix
} else {
ihKey[1] = ipv4Prefix
}
ihKey[2], ihKey[ihLen+3] = keySeparator, keySeparator
copy(ihKey[3:], ih)
suffixStart = len(ihKey) - suffixLen
return
}
func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) {
ihKey, start := composeIHKeyPrefix(ih.Bytes(), seeder, peer.Addr().Is6(), packedPeerLen)
packPeer(peer, ihKey[start:])
return
}
func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error {
ihKey := composeIHKey(ih, peer, seeder)
return m.Update(func(txn *lmdb.Txn) (err error) {
var b []byte
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil {
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix()))
}
return
})
}
func (m *mdb) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error {
ihKey := composeIHKey(ih, peer, seeder)
return m.Update(func(txn *lmdb.Txn) error {
return ignoreNotFound(txn.Del(m.peersDB, ihKey, nil))
})
}
func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return m.putPeer(ih, peer, true)
}
func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return m.delPeer(ih, peer, true)
}
func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return m.putPeer(ih, peer, false)
}
func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
return m.delPeer(ih, peer, false)
}
func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error {
ihKey := composeIHKey(ih, peer, false)
return m.Update(func(txn *lmdb.Txn) (err error) {
if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err != nil {
return
}
ihKey[0] = seederPrefix
var b []byte
if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err != nil {
return
}
binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano()))
ihPrefix := ihKey[:len(ihKey)-packedPeerLen]
ihPrefix[0], ihPrefix[1] = downloadedPrefix, countPrefix
var v int
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, ihPrefix)); err != nil {
return
}
if len(b) >= 4 {
v = int(binary.BigEndian.Uint32(b))
}
v++
if b, err = txn.PutReserve(m.peersDB, ihPrefix, 4, 0); err == nil {
binary.BigEndian.PutUint32(b, uint32(v))
}
return
})
}
func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn func(k, v []byte) bool) (err error) {
m.wg.Add(1)
prefixLen := len(prefix)
err = m.View(func(txn *lmdb.Txn) (err error) {
txn.RawRead = readRaw
scanner := lmdbscan.New(txn, m.peersDB)
var op uint = lmdb.SetRange
if prefixLen == 0 {
op = lmdb.First
}
if scanner.SetNext(prefix, nil, op, lmdb.Next) {
loop:
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
k := scanner.Key()
if !bytes.HasPrefix(k, prefix) {
break loop
}
if prefixLen == 0 || len(k) == prefixLen+packedPeerLen {
if !fn(k, scanner.Val()) {
break loop
}
} else {
logger.Warn().Int("expected", prefixLen+packedPeerLen).Int("got", len(k)).
Msg("Invalid key length")
}
}
}
err = scanner.Err()
}
scanner.Close()
return
})
m.wg.Done()
return
}
func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) {
peers = make([]bittorrent.Peer, 0, numWant)
prefix, prefixLen := composeIHKeyPrefix(ih.Bytes(), false, v6, 0)
appendFn := func(k, _ []byte) bool {
peers = append(peers, unpackPeer(k[prefixLen:]))
numWant--
return numWant > 0
}
if forSeeder {
err = m.scanPeers(ctx, prefix, true, appendFn)
} else {
prefix[0] = seederPrefix
if err = m.scanPeers(ctx, prefix, true, appendFn); err == nil && numWant > 0 {
prefix[0] = leecherPrefix
err = m.scanPeers(ctx, prefix, true, appendFn)
}
}
return
}
func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, err error) {
m.wg.Add(1)
err = m.View(func(txn *lmdb.Txn) (err error) {
txn.RawRead = true
scanner := lmdbscan.New(txn, m.peersDB)
if scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) {
var prevKey []byte
loop:
for scanner.Scan() {
select {
case <-ctx.Done():
return ctx.Err()
default:
k := scanner.Key()
if len(k) == len(scanPrefix)+packedPeerLen && bytes.HasPrefix(scanPrefix, k[:len(k)-packedPeerLen]) {
if !bytes.Equal(k, prevKey) {
cnt++
prevKey = k
}
} else if scanPrefix[1] == ipv4Prefix {
scanPrefix[1] = ipv6Prefix
if !scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) {
break loop
}
} else {
break loop
}
}
}
}
err = scanner.Err()
scanner.Close()
return
})
m.wg.Done()
return
}
func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) {
scanPrefix, _ := composeIHKeyPrefix(ih.Bytes(), false, false, 0)
if leechers, err = m.countPeers(ctx, scanPrefix); err != nil {
return
}
scanPrefix[0], scanPrefix[1] = seederPrefix, ipv4Prefix
if seeders, err = m.countPeers(ctx, scanPrefix); err != nil {
return
}
scanPrefix[0], scanPrefix[1] = downloadedPrefix, countPrefix
err = m.View(func(txn *lmdb.Txn) (err error) {
var b []byte
if b, err = ignoreNotFoundData(txn.Get(m.peersDB, scanPrefix)); err != nil {
return
}
if len(b) >= 4 {
snatched = binary.BigEndian.Uint32(b)
}
return
})
return
}
const (
v1IHKeyLen = bittorrent.InfoHashV1Len + 4 + packedPeerLen
v2IHKeyPen = bittorrent.InfoHashV2Len + 4 + packedPeerLen
)
func (m *mdb) gc(cutoff time.Time) {
toDel := make([][]byte, 0, 50)
cutoffUnix := cutoff.Unix()
err := m.scanPeers(context.Background(), nil, false, func(k, v []byte) bool {
if l := len(k); (l == v1IHKeyLen || l == v2IHKeyPen) &&
(k[0] == seederPrefix || k[0] == leecherPrefix) &&
(k[1] == ipv4Prefix || k[1] == ipv6Prefix) &&
k[2] == keySeparator && len(v) >= 8 && cutoffUnix >= int64(binary.BigEndian.Uint64(v)) {
toDel = append(toDel, k)
}
return true
})
if err == nil {
err = m.Update(func(txn *lmdb.Txn) (err error) {
for _, k := range toDel {
if err = txn.Del(m.peersDB, k, nil); err != nil {
break
}
}
return
})
}
if err == nil {
_ = m.Sync(true)
} else {
logger.Err(err).Msg("Error occurred while GC")
}
}
func (m *mdb) ScheduleGC(gcInterval, peerLifeTime time.Duration) {
m.wg.Add(1)
go func() {
defer m.wg.Done()
t := time.NewTimer(gcInterval)
defer t.Stop()
for {
select {
case <-m.closed:
return
case <-t.C:
m.gc(time.Now().Add(-peerLifeTime))
t.Reset(gcInterval)
}
}
}()
}
func (m *mdb) Ping(_ context.Context) error {
_, err := m.Info()
return err
}
+3
View File
@@ -0,0 +1,3 @@
//go:build !cgo
package mdb
+59
View File
@@ -0,0 +1,59 @@
//go:build cgo
package mdb
import (
"fmt"
"os"
"testing"
s "github.com/sot-tech/mochi/storage"
"github.com/sot-tech/mochi/storage/test"
)
const tmpPath = ""
var cfg = config{
Path: "",
Mode: defaultMode,
DataDBName: "KV",
PeersDBName: "PEERS",
MaxSize: defaultMapSize,
MaxReaders: defaultMaxReaders,
AsyncWrite: true,
NoMetaSync: false,
}
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
ps, err = newStorage(cfg)
if err != nil {
panic(fmt.Sprint("Unable to open/create LMDB: ", err))
}
return ps
}
func TestStorage(t *testing.T) {
tmpDir, err := os.MkdirTemp(tmpPath, "lmdb*")
if err != nil {
t.Error(err)
}
t.Cleanup(func() {
_ = os.RemoveAll(tmpDir)
})
cfg.Path = tmpDir
test.RunTests(t, createNew())
}
func BenchmarkStorage(b *testing.B) {
tmpDir, err := os.MkdirTemp(tmpPath, "lmdb*")
if err != nil {
b.Error(err)
}
b.Cleanup(func() {
_ = os.RemoveAll(tmpDir)
})
cfg.Path = tmpDir
test.RunBenchmarks(b, createNew)
}
+19 -17
View File
@@ -32,27 +32,31 @@ var logger = log.NewLogger("storage/memory")
func init() {
// Register the storage driver.
storage.RegisterDriver(Name, builder)
storage.RegisterDriver(Name, Builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
// Builder is structure to create new in-memory peer or data storage
type Builder struct{}
// NewDataStorage creates new in-memory KV storage. Does not need configuration
func (Builder) NewDataStorage(conf.MapConfig) (storage.DataStorage, error) {
return dataStorage(), nil
}
// NewPeerStorage creates new in-memory peer storage
func (Builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg config
if err := icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return NewPeerStorage(cfg)
return peerStorage(cfg)
}
// Config holds the configuration of a memory PeerStorage.
type Config struct {
type config struct {
ShardCount int `cfg:"shard_count"`
}
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() Config {
func (cfg config) validate() config {
validcfg := cfg
if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) {
@@ -67,12 +71,11 @@ func (cfg Config) Validate() Config {
return validcfg
}
// NewPeerStorage creates a new PeerStorage backed by memory.
func NewPeerStorage(provided Config) (storage.PeerStorage, error) {
cfg := provided.Validate()
func peerStorage(provided config) (storage.PeerStorage, error) {
cfg := provided.validate()
ps := &peerStore{
shards: make([]*peerShard, cfg.ShardCount*2),
DataStorage: NewDataStorage(),
DataStorage: dataStorage(),
closed: make(chan any),
}
@@ -453,8 +456,7 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee
return
}
// NewDataStorage creates new in-memory data store
func NewDataStorage() storage.DataStorage {
func dataStorage() storage.DataStorage {
return new(dataStore)
}
+1 -1
View File
@@ -8,7 +8,7 @@ import (
)
func createNew() storage.PeerStorage {
ps, err := NewPeerStorage(Config{ShardCount: 1024})
ps, err := peerStorage(config{ShardCount: 1024})
if err != nil {
panic(err)
}
+63 -40
View File
@@ -51,13 +51,37 @@ var (
func init() {
// Register the storage builder.
storage.RegisterDriver("pg", builder)
storage.RegisterDriver("pg", builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
type builder struct{}
if err := icfg.Unmarshal(&cfg); err != nil {
func (builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
var cfg config
err := icfg.Unmarshal(&cfg)
if err != nil {
return nil, err
}
cfg, err = cfg.validateDataStore()
if err != nil {
return nil, err
}
return newStore(cfg)
}
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg config
err := icfg.Unmarshal(&cfg)
if err != nil {
return nil, err
}
cfg, err = cfg.validateFull()
if err != nil {
return nil, err
}
@@ -73,19 +97,14 @@ func noResultErr(err error) error {
return err
}
func newStore(cfg Config) (storage.PeerStorage, error) {
cfg, err := cfg.Validate()
if err != nil {
return nil, err
}
func newStore(cfg config) (storage.PeerStorage, error) {
con, err := pgxpool.New(context.Background(), cfg.ConnectionString)
if err != nil {
return nil, err
}
return &store{
Config: cfg,
config: cfg,
Pool: con,
wg: sync.WaitGroup{},
closed: make(chan any),
@@ -121,8 +140,14 @@ type downloadQueryConf struct {
IncrementQuery string `cfg:"inc_query"`
}
// Config holds the configuration of a redis PeerStorage.
type Config struct {
func checkParameter(p *string, name string) (err error) {
if *p = strings.TrimSpace(*p); len(*p) == 0 {
err = fmt.Errorf(errRequiredParameterNotSetMsg, name)
}
return
}
type config struct {
ConnectionString string `cfg:"connection_string"`
PingQuery string `cfg:"ping_query"`
Peer peerQueryConf
@@ -133,11 +158,7 @@ type Config struct {
InfoHashCountQuery string `cfg:"info_hash_count_query"`
}
// Validate sanity checks values set in a config and returns a new config with
// default values replacing anything that is invalid.
//
// This function warns to the logger when a value is changed.
func (cfg Config) Validate() (Config, error) {
func (cfg config) validateDataStore() (config, error) {
validCfg := cfg
validCfg.ConnectionString = strings.TrimSpace(validCfg.ConnectionString)
if len(validCfg.ConnectionString) == 0 {
@@ -153,66 +174,68 @@ func (cfg Config) Validate() (Config, error) {
Msg("falling back to default configuration")
}
fn := func(p *string, name string) (err error) {
if *p = strings.TrimSpace(*p); len(*p) == 0 {
err = fmt.Errorf(errRequiredParameterNotSetMsg, name)
}
return
}
if err := fn(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil {
if err := checkParameter(&validCfg.Data.AddQuery, "data.addQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil {
if err := checkParameter(&validCfg.Data.GetQuery, "data.getQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil {
if err := checkParameter(&validCfg.Data.DelQuery, "data.delQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil {
return validCfg, nil
}
func (cfg config) validateFull() (config, error) {
validCfg, err := cfg.validateDataStore()
if err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil {
if err = checkParameter(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.Query, "announce.query"); err != nil {
if err = checkParameter(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil {
if err = checkParameter(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Data.AddQuery, "data.addQuery"); err != nil {
if err = checkParameter(&validCfg.Announce.Query, "announce.query"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Data.GetQuery, "data.getQuery"); err != nil {
if err = checkParameter(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil {
return cfg, err
}
if err := fn(&validCfg.Data.DelQuery, "data.delQuery"); err != nil {
if err = checkParameter(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil {
return cfg, err
}
if err = checkParameter(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil {
return cfg, err
}
@@ -227,7 +250,7 @@ func (cfg Config) Validate() (Config, error) {
}
type store struct {
Config
config
*pgxpool.Pool
wg sync.WaitGroup
closed chan any
+7 -3
View File
@@ -12,7 +12,7 @@ import (
const (
createTablesQuery = `
DROP TABLE IF EXISTS mo_peers;
CREATE TABLE mo_peers (
CREATE UNLOGGED TABLE mo_peers (
info_hash bytea NOT NULL,
peer_id bytea NOT NULL,
address inet NOT NULL,
@@ -27,7 +27,7 @@ CREATE INDEX mo_peers_created_idx ON mo_peers(created);
CREATE INDEX mo_peers_announce_idx ON mo_peers(info_hash, is_seeder, is_v6);
DROP TABLE IF EXISTS mo_downloads;
CREATE TABLE mo_downloads (
CREATE UNLOGGED TABLE mo_downloads (
info_hash bytea PRIMARY KEY NOT NULL,
downloads int NOT NULL DEFAULT 1
);
@@ -42,7 +42,7 @@ CREATE TABLE mo_kv (
`
)
var cfg = Config{
var cfg = config{
ConnectionString: "host=127.0.0.1 database=test user=postgres pool_max_conns=50",
PingQuery: "SELECT 1",
Peer: peerQueryConf{
@@ -76,6 +76,10 @@ var cfg = Config{
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
cfg, err = cfg.validateFull()
if err != nil {
panic(fmt.Sprint("invalid configuration: ", err))
}
ps, err = newStore(cfg)
if err != nil {
panic(fmt.Sprint("Unable to create PostgreSQL connection: ", err, "\nThis driver needs real PostgreSQL instance"))
+30 -24
View File
@@ -35,6 +35,7 @@ import (
"time"
"github.com/redis/go-redis/v9"
"github.com/sot-tech/mochi/pkg/str2bytes"
"github.com/sot-tech/mochi/bittorrent"
@@ -79,21 +80,28 @@ var (
func init() {
// Register the storage builder.
storage.RegisterDriver("redis", builder)
storage.RegisterDriver("redis", builder{})
}
func builder(icfg conf.MapConfig) (storage.PeerStorage, error) {
// Unmarshal the bytes into the proper config type.
var cfg Config
type builder struct{}
if err := icfg.Unmarshal(&cfg); err != nil {
func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) {
var cfg Config
var err error
if err = icfg.Unmarshal(&cfg); err != nil {
return nil, err
}
return newStore(cfg)
return NewStore(cfg)
}
func newStore(cfg Config) (*store, error) {
func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) {
return b.NewPeerStorage(icfg)
}
// NewStore creates new redis peer storage with provided configuration structure
func NewStore(cfg Config) (storage.PeerStorage, error) {
cfg, err := cfg.Validate()
if err != nil {
return nil, err
@@ -456,31 +464,29 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe
// peerMinimumLen is the least allowed length of string serialized Peer
const peerMinimumLen = bittorrent.PeerIDLen + 2 + net.IPv4len
var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (InfoHash + Port + IPv4))", peerMinimumLen)
var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (PeerID + Port + IPv4))", peerMinimumLen)
// UnpackPeer constructs Peer from serialized by Peer.PackPeer data: PeerID[20by]Port[2by]net.IP[4/16by]
func UnpackPeer(data string) (bittorrent.Peer, error) {
var peer bittorrent.Peer
func UnpackPeer(data string) (peer bittorrent.Peer, err error) {
if len(data) < peerMinimumLen {
return peer, errInvalidPeerDataSize
err = errInvalidPeerDataSize
return
}
b := str2bytes.StringToBytes(data)
peerID, err := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen])
if err == nil {
if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk {
peer = bittorrent.Peer{
ID: peerID,
AddrPort: netip.AddrPortFrom(
addr.Unmap(),
binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]),
),
}
} else {
err = bittorrent.ErrInvalidIP
peerID, _ := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen])
if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk {
peer = bittorrent.Peer{
ID: peerID,
AddrPort: netip.AddrPortFrom(
addr.Unmap(),
binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]),
),
}
} else {
err = bittorrent.ErrInvalidIP
}
return peer, err
return
}
func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) {
+1 -1
View File
@@ -19,7 +19,7 @@ var cfg = Config{
func createNew() s.PeerStorage {
var ps s.PeerStorage
var err error
ps, err = newStore(cfg)
ps, err = NewStore(cfg)
if err != nil {
panic(fmt.Sprint("Unable to create Redis connection: ", err, "\nThis driver needs real Redis instance"))
}
+31 -8
View File
@@ -82,9 +82,16 @@ type Entry struct {
Value []byte
}
// Driver is the function used to initialize a new PeerStorage
// Driver is the interface used to initialize a new DataStorage or PeerStorage
// with provided configuration.
type Driver func(conf.MapConfig) (PeerStorage, error)
type Driver interface {
// NewDataStorage function prototype for creating new instance of data (KV) storage
// with provided configuration
NewDataStorage(cfg conf.MapConfig) (DataStorage, error)
// NewPeerStorage function prototype for creating new instance of peer storage
// with provided configuration
NewPeerStorage(cfg conf.MapConfig) (PeerStorage, error)
}
// ErrResourceDoesNotExist is the error returned by all delete methods and the
// AnnouncePeers method of the PeerStorage interface if the requested resource
@@ -231,15 +238,31 @@ func RegisterDriver(name string, d Driver) {
drivers[name] = d
}
// NewStorage attempts to initialize a new PeerStorage instance from
// NewDataStorage attempts to initialize a new DataStorage instance from
// the list of registered drivers.
func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
func NewDataStorage(cfg conf.NamedMapConfig) (DataStorage, error) {
driversMU.RLock()
defer driversMU.RUnlock()
logger.Debug().Object("config", cfg).Msg("staring storage")
logger.Debug().Object("config", cfg).Msg("starting data storage")
var b Driver
b, ok := drivers[cfg.Name]
var d Driver
d, ok := drivers[cfg.Name]
if !ok {
return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name)
}
return d.NewPeerStorage(cfg.Config)
}
// NewPeerStorage attempts to initialize a new PeerStorage instance from
// the list of registered drivers.
func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
driversMU.RLock()
defer driversMU.RUnlock()
logger.Debug().Object("config", cfg).Msg("starting peer storage")
var d Driver
d, ok := drivers[cfg.Name]
if !ok {
return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name)
}
@@ -249,7 +272,7 @@ func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) {
return
}
if ps, err = b(cfg.Config); err != nil {
if ps, err = d.NewPeerStorage(cfg.Config); err != nil {
return
}