diff --git a/README.md b/README.md index 3e9458d..2ade55a 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Modified version of [Chihaya](https://github.com/chihaya/chihaya), an open sourc * Supports BittorrentV2 hashes (SHA-256 and _hybrid_ SHA-256-to-160 [BEP52](https://www.bittorrent.org/beps/bep_0052.html), tested with qBittorrent); * Supports storage in middleware modules to persist useful data; -* Supports [KeyDB](https://keydb.dev) and [PostgreSQL](https://www.postgresql.org) storages; +* Supports [KeyDB](https://keydb.dev), [PostgreSQL](https://www.postgresql.org) and [LMDB](https://www.symas.com/lmdb) storages; * Metrics can be turned off (not enabled till it really needed); * Allows mixed peers: IPv4 requesters can fetch IPv6 peers or vice versa; * Contains some internal improvements. diff --git a/bittorrent/peer.go b/bittorrent/peer.go index 3d791da..33935b7 100644 --- a/bittorrent/peer.go +++ b/bittorrent/peer.go @@ -11,6 +11,7 @@ import ( "net/netip" "github.com/rs/zerolog" + "github.com/sot-tech/mochi/pkg/str2bytes" ) @@ -26,8 +27,6 @@ var ErrInvalidPeerIDSize = fmt.Errorf("peer ID must be %d bytes", PeerIDLen) var zeroPeerID PeerID // NewPeerID creates a PeerID from a byte slice. -// -// It panics if b is not 20 bytes long. func NewPeerID(b []byte) (PeerID, error) { if len(b) != PeerIDLen { return zeroPeerID, ErrInvalidPeerIDSize diff --git a/cmd/mochi/config.go b/cmd/mochi/config.go index aac39bd..1c1d55f 100644 --- a/cmd/mochi/config.go +++ b/cmd/mochi/config.go @@ -19,6 +19,7 @@ import ( // Imports to register storage drivers. _ "github.com/sot-tech/mochi/storage/keydb" + _ "github.com/sot-tech/mochi/storage/mdb" sm "github.com/sot-tech/mochi/storage/memory" _ "github.com/sot-tech/mochi/storage/pg" _ "github.com/sot-tech/mochi/storage/redis" diff --git a/cmd/mochi/server.go b/cmd/mochi/server.go index ab6c7ed..0a674be 100644 --- a/cmd/mochi/server.go +++ b/cmd/mochi/server.go @@ -33,7 +33,7 @@ func (r *Server) Run(cfg *Config) (err error) { log.Info().Msg("metrics disabled because of empty address") } - r.storage, err = storage.NewStorage(cfg.Storage) + r.storage, err = storage.NewPeerStorage(cfg.Storage) if err != nil { return fmt.Errorf("failed to create storage: %w", err) } diff --git a/dist/example_config.yaml b/dist/example_config.yaml index 34a9093..372f52d 100644 --- a/dist/example_config.yaml +++ b/dist/example_config.yaml @@ -206,8 +206,11 @@ prehooks: # - name: torrent approval # config: # initial_source: list -# Save data provided by source in storage above -# preserve: false +# Save data provided by source in specific storage. If name is empty or 'internal', provided above 'storage' +# is used, but another storage may be provided (configuration is the same as for 'storage' above) +# storage: +# name: internal +# config: # configuration: # hash_list: # - "a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5a1b2c3d4e5" diff --git a/dist/example_config_lmdb.yaml b/dist/example_config_lmdb.yaml new file mode 100644 index 0000000..c5e983c --- /dev/null +++ b/dist/example_config_lmdb.yaml @@ -0,0 +1,93 @@ +# @formatter:off +# Note: see `example_config.yaml` for `frontends` and `*hooks` config description + + +announce_interval: 30m +min_announce_interval: 15m +metrics_addr: "" + +frontends: + - name: http + config: + addr: "0.0.0.0:6969" + tls: false + tls_cert_path: "" + tls_key_path: "" + reuse_port: true + read_timeout: 5s + write_timeout: 5s + enable_keepalive: false + idle_timeout: 30s + enable_request_timing: false + announce_routes: + - "/announce" + scrape_routes: + - "/scrape" + ping_routes: + - "/ping" + allow_ip_spoofing: false + filter_private_ips: false + real_ip_header: "x-real-ip" + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + + - name: udp + config: + addr: "0.0.0.0:6969" + reuse_port: true + workers: 1 + max_clock_skew: 10s + private_key: "paste a random string here that will be used to hmac connection IDs" + enable_request_timing: false + allow_ip_spoofing: false + filter_private_ips: false + max_numwant: 100 + default_numwant: 50 + max_scrape_infohashes: 50 + +# This block defines configuration used for redis storage. +storage: + name: lmdb + config: + # The frequency which stale peers are removed. + # This balances between + # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) + # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). + gc_interval: 3m + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 31m + + # Path to LMDB folder. Required. + Path: "" + + # File mode of created database files, default is 0o640 + mode: 0 + + # Name of database to store KV data. If not provided, root DB is used (not recommended) + data_db: "" + + # Name of database to store peers data. If not provided, root DB is used (not recommended) + peers_db: "" + + # Maximum size of database, default is 1GiB + max_size: 0 + + # Maximum number of threads/reader slots for the LMDB environment, + # default is 126. + max_readers: 0 + + # Set MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk. + # The installation of the flag can highly speed up writes, but there is a risk of DB damage + # or loss of last committed data if the application crashes. + async_write: true + + # Set MDB_NOMETASYNC flag. Omit the metadata flush on commit. + # Can a little accelerate writes if `async_write` not set, but last committed data + # bay be lost if the application crashes. + no_sync_meta: false + +posthooks: [] +prehooks: [] \ No newline at end of file diff --git a/docs/storage/benchmarks.md b/docs/storage/benchmarks.md new file mode 100644 index 0000000..d58d2ea --- /dev/null +++ b/docs/storage/benchmarks.md @@ -0,0 +1,211 @@ +# Hardware + +* CPU: Intel i5-12500H +* RAM: 16GiB (2x8 Samsung M471A1K43EB1-CWE) +* Storage: NVME SSD Samsung 980PRO +* OS: Ubuntu 22.04 + +# Benchmarks +# Memory + +``` +goos: linux +goarch: amd64 +pkg: github.com/sot-tech/mochi/storage/memory +cpu: 12th Gen Intel(R) Core(TM) i5-12500H +BenchmarkStorage/BenchmarkNop-16 1000000000 0.1822 ns/op 0 B/op 0 allocs/op +BenchmarkStorage/BenchmarkPut-16 7071364 173.1 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkPut1k-16 5342302 280.4 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash-16 17775769 65.81 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 15953836 68.41 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkPutDelete-16 4975660 237.0 ns/op 160 B/op 4 allocs/op +BenchmarkStorage/BenchmarkPutDelete1k-16 4842673 240.3 ns/op 160 B/op 4 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 4597555 248.2 ns/op 160 B/op 4 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 4776769 265.1 ns/op 160 B/op 4 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist-16 19164670 60.77 ns/op 96 B/op 3 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1k-16 29773471 42.81 ns/op 96 B/op 3 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 38686660 32.11 ns/op 96 B/op 3 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 36604658 32.88 ns/op 96 B/op 3 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete-16 2982174 419.8 ns/op 240 B/op 6 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1k-16 2881537 407.3 ns/op 240 B/op 6 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 2654642 443.4 ns/op 240 B/op 6 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 2630800 462.8 ns/op 240 B/op 6 allocs/op +BenchmarkStorage/BenchmarkGradNonexist-16 6837140 203.3 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1k-16 5347960 267.2 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 15773694 75.18 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 16675423 73.16 ns/op 80 B/op 2 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher-16 1368090 823.9 ns/op 4496 B/op 3 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 1401063 823.8 ns/op 4496 B/op 3 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder-16 4446224 273.7 ns/op 1424 B/op 2 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 4279449 280.8 ns/op 1424 B/op 2 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm-16 16012303 67.37 ns/op 16 B/op 1 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 82122622 14.58 ns/op 16 B/op 1 allocs/op +PASS +ok github.com/sot-tech/mochi/storage/memory 41.848s +``` + +# Redis + +Version: 6.0.16 + +Configuration: OOTB + +``` +goos: linux +goarch: amd64 +pkg: github.com/sot-tech/mochi/storage/redis +cpu: 12th Gen Intel(R) Core(TM) i5-12500H +BenchmarkStorage/BenchmarkNop-16 1000000000 0.1611 ns/op 0 B/op 0 allocs/op +BenchmarkStorage/BenchmarkPut-16 180381 6148 ns/op 1257 B/op 37 allocs/op +BenchmarkStorage/BenchmarkPut1k-16 203150 6314 ns/op 1273 B/op 37 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash-16 196033 6187 ns/op 1249 B/op 37 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 202513 6176 ns/op 1265 B/op 37 allocs/op +BenchmarkStorage/BenchmarkPutDelete-16 26925 40429 ns/op 1736 B/op 56 allocs/op +BenchmarkStorage/BenchmarkPutDelete1k-16 27751 39310 ns/op 1768 B/op 56 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 28142 41585 ns/op 1720 B/op 56 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 29500 39063 ns/op 1752 B/op 56 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist-16 281184 4451 ns/op 320 B/op 13 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1k-16 235394 4316 ns/op 334 B/op 13 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 272566 4569 ns/op 312 B/op 13 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 278380 4315 ns/op 326 B/op 13 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete-16 20154 64004 ns/op 3664 B/op 108 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1k-16 19230 61428 ns/op 3712 B/op 108 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 18270 62749 ns/op 3632 B/op 108 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 20653 60017 ns/op 3680 B/op 108 allocs/op +BenchmarkStorage/BenchmarkGradNonexist-16 157063 7336 ns/op 1929 B/op 52 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1k-16 161649 7598 ns/op 1945 B/op 52 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 163005 7436 ns/op 1913 B/op 52 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 151135 7524 ns/op 1929 B/op 52 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher-16 70124 16813 ns/op 15277 B/op 83 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 71144 17011 ns/op 15261 B/op 83 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder-16 149967 8055 ns/op 6814 B/op 42 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 141770 8202 ns/op 6806 B/op 42 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm-16 55156 21739 ns/op 1120 B/op 41 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 58994 21821 ns/op 1087 B/op 41 allocs/op +PASS +ok github.com/sot-tech/mochi/storage/redis 43.235s +``` + +## KeyDB + +Version: 6.3.4 + +Configuration: OOTB + +``` +goos: linux +goarch: amd64 +pkg: github.com/sot-tech/mochi/storage/keydb +cpu: 12th Gen Intel(R) Core(TM) i5-12500H +BenchmarkStorage/BenchmarkNop-16 1000000000 0.1873 ns/op 0 B/op 0 allocs/op +BenchmarkStorage/BenchmarkPut-16 141714 8824 ns/op 553 B/op 21 allocs/op +BenchmarkStorage/BenchmarkPut1k-16 141138 9215 ns/op 566 B/op 21 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash-16 113070 8939 ns/op 546 B/op 21 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 134258 8578 ns/op 558 B/op 21 allocs/op +BenchmarkStorage/BenchmarkPutDelete-16 31476 37899 ns/op 856 B/op 33 allocs/op +BenchmarkStorage/BenchmarkPutDelete1k-16 34111 35877 ns/op 880 B/op 33 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 31716 35344 ns/op 840 B/op 33 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 33234 37156 ns/op 864 B/op 33 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist-16 292579 4302 ns/op 320 B/op 13 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1k-16 289604 4401 ns/op 334 B/op 13 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 272372 4297 ns/op 312 B/op 13 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 284304 4165 ns/op 326 B/op 13 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete-16 23744 49850 ns/op 1304 B/op 48 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1k-16 23378 53921 ns/op 1344 B/op 48 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 25268 49024 ns/op 1272 B/op 48 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 21583 50219 ns/op 1312 B/op 48 allocs/op +BenchmarkStorage/BenchmarkGradNonexist-16 133311 8960 ns/op 669 B/op 24 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1k-16 134439 8884 ns/op 683 B/op 24 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 130093 9005 ns/op 653 B/op 24 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 137900 8774 ns/op 667 B/op 24 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher-16 75067 16103 ns/op 15276 B/op 83 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 72286 16315 ns/op 15261 B/op 83 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder-16 165186 7705 ns/op 6814 B/op 42 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 151609 7799 ns/op 6806 B/op 42 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm-16 54744 21244 ns/op 1120 B/op 41 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 54865 21736 ns/op 1088 B/op 41 allocs/op +PASS +ok github.com/sot-tech/mochi/storage/keydb 44.466s +``` + +## PostgreSQL + +Version: 14.12 + +Configuration: OOTB + +``` +goos: linux +goarch: amd64 +pkg: github.com/sot-tech/mochi/storage/pg +cpu: 12th Gen Intel(R) Core(TM) i5-12500H +BenchmarkStorage/BenchmarkNop-16 1000000000 0.1687 ns/op 0 B/op 0 allocs/op +BenchmarkStorage/BenchmarkPut-16 58521 19857 ns/op 2213 B/op 44 allocs/op +BenchmarkStorage/BenchmarkPut1k-16 153538 7259 ns/op 2208 B/op 44 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash-16 151515 7319 ns/op 2204 B/op 44 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 165950 7452 ns/op 2211 B/op 44 allocs/op +BenchmarkStorage/BenchmarkPutDelete-16 17130 61564 ns/op 4274 B/op 81 allocs/op +BenchmarkStorage/BenchmarkPutDelete1k-16 19200 60632 ns/op 4285 B/op 81 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 19923 59570 ns/op 4273 B/op 81 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 20510 61910 ns/op 4285 B/op 80 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist-16 184810 5485 ns/op 2098 B/op 37 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1k-16 187735 5514 ns/op 2108 B/op 37 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 217992 5621 ns/op 2099 B/op 37 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 215710 5569 ns/op 2109 B/op 37 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete-16 8280 139091 ns/op 7306 B/op 143 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1k-16 9010 133127 ns/op 7320 B/op 143 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 8030 133810 ns/op 7305 B/op 143 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 8217 136543 ns/op 7321 B/op 143 allocs/op +BenchmarkStorage/BenchmarkGradNonexist-16 20883 56024 ns/op 3101 B/op 62 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1k-16 20932 55863 ns/op 3106 B/op 62 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 77368 15365 ns/op 3067 B/op 62 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 65448 15568 ns/op 3081 B/op 62 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher-16 55521 21614 ns/op 22565 B/op 380 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 54933 21813 ns/op 22571 B/op 380 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder-16 117950 10443 ns/op 9239 B/op 190 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 113031 10490 ns/op 9242 B/op 190 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm-16 103208 11945 ns/op 2951 B/op 46 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 101088 12110 ns/op 2952 B/op 46 allocs/op +PASS +ok github.com/sot-tech/mochi/storage/pg 64.953s +``` + +## LMDB + +Version: 0.9.31 + +``` +goos: linux +goarch: amd64 +pkg: github.com/sot-tech/mochi/storage/mdb +cpu: 12th Gen Intel(R) Core(TM) i5-12500H +BenchmarkStorage/BenchmarkNop-16 1000000000 0.1618 ns/op 0 B/op 0 allocs/op +BenchmarkStorage/BenchmarkPut-16 356353 3095 ns/op 264 B/op 5 allocs/op +BenchmarkStorage/BenchmarkPut1k-16 402933 3055 ns/op 264 B/op 5 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash-16 381813 2966 ns/op 256 B/op 5 allocs/op +BenchmarkStorage/BenchmarkPut1kInfoHash1k-16 376526 3083 ns/op 256 B/op 5 allocs/op +BenchmarkStorage/BenchmarkPutDelete-16 423085 2595 ns/op 528 B/op 10 allocs/op +BenchmarkStorage/BenchmarkPutDelete1k-16 443782 2656 ns/op 528 B/op 10 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash-16 434588 2738 ns/op 512 B/op 10 allocs/op +BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 393522 2770 ns/op 512 B/op 10 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist-16 887683 1353 ns/op 300 B/op 7 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1k-16 914343 1323 ns/op 300 B/op 7 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash-16 921487 1328 ns/op 292 B/op 7 allocs/op +BenchmarkStorage/BenchmarkDeleteNonexist1kInfoHash1k-16 911662 1322 ns/op 292 B/op 7 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete-16 210553 5038 ns/op 796 B/op 16 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1k-16 205284 5106 ns/op 796 B/op 16 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash-16 193164 5751 ns/op 772 B/op 16 allocs/op +BenchmarkStorage/BenchmarkPutGradDelete1kInfoHash1k-16 199144 5867 ns/op 772 B/op 16 allocs/op +BenchmarkStorage/BenchmarkGradNonexist-16 264282 4037 ns/op 304 B/op 8 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1k-16 274296 3930 ns/op 304 B/op 8 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash-16 268438 4886 ns/op 296 B/op 8 allocs/op +BenchmarkStorage/BenchmarkGradNonexist1kInfoHash1k-16 191442 5425 ns/op 296 B/op 8 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher-16 647808 1658 ns/op 3648 B/op 15 allocs/op +BenchmarkStorage/BenchmarkAnnounceLeecher1kInfoHash-16 684120 1813 ns/op 3636 B/op 15 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder-16 877350 1282 ns/op 3416 B/op 10 allocs/op +BenchmarkStorage/BenchmarkAnnounceSeeder1kInfoHash-16 847063 1307 ns/op 3404 B/op 10 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm-16 614961 1767 ns/op 712 B/op 20 allocs/op +BenchmarkStorage/BenchmarkScrapeSwarm1kInfoHash-16 659012 1870 ns/op 700 B/op 20 allocs/op +PASS +ok github.com/sot-tech/mochi/storage/mdb 42.032s +``` diff --git a/docs/storage/lmdb.md b/docs/storage/lmdb.md new file mode 100644 index 0000000..15310fe --- /dev/null +++ b/docs/storage/lmdb.md @@ -0,0 +1,114 @@ +# LMDB Storage + +This storage uses LMDB persistent key-value database to store peer and/or arbitrary key-value data. + +_Note: **requires** CGO-enabled build (`CGO_ENABLED=1 go install...`), +because of native bindings to C LMDB code._ + +# Use case + +LMDB may be used in local installation to read/write data from/to (local or network mounted) +filesystem directly, without network stack. + +It supports multi-thread/process (multi-reader, single-writer) access, and DB size larger than RAM, +so *may* be used within several applications as shared store, e.g. external app can modify allowed/denied +info hashes for `torrentapproval` middleware or mochi cluster with shared DB over NFS. + +As it filesystem database, performance is also depends on filesystem/disk performance, so it *may* +be slower than tuned PostgreSQL or Redis. + +## Configuration and implementation notes + +Basically LDMB environment is a directory with 2 files: data.mdb and lock.mdb, which will be created on start. +Each *environment* may contain several named databases. If name for specific database (for peers and arbitrary data) +is not provided, default (0-th) database is used, which is *NOT* recommended, because is contains internal information. + +Both stored key and value are byte arrays. +Arbitrary DB key format is `_`, value is byte array converted string. + +Peers DB format is: + +1. Key `__`, value - BE-encoded unix timestamp. +Fields: + * `` - `L4`, `L6`, `S4`, `S6` string for leecher with IPv4 or IPv6 address, or seeder with IPv4 or IPv6 address (accordingly) + * `` - 20 or 32 bytes of info hash (V1 or V2 accordingly) + * `` - 20 bytes of peer ID + * `` - 16 bytes of BE-encoded IP address (real IPv6 or IPv4-mapped IPv6 address) + * `` - 2 bytes of BE-encoded port +2. Key `DC__` - downloaded count of specified `` (20 or 32 bytes), value - BE-encoded unsigned 32-bit integer. + +Write speed may be increased with `no_sync_meta` and `async_write` configuration options, +but the risk of DB corruption is also increase. + +With `async_write` option, write speed will be a little faster than locally installed OOTB Redis, +but application will use more RAM (dirty pages). +Without it, you will get dramatically slow write speed: every write transaction will be flushed to disk. + +Option `no_sync_meta` **without** `async_write` will increase write speed up to 3 times +(with potential loss of last transaction if application crashed/killed). + +Both enabled options don't make much sense and also don't affect read performance. + +Benchmarks with combinations of options above: + +``` +async_write=false, no_sync_meta=false +BenchmarkStorage/BenchmarkPutGradDelete-16 123 9000653 ns/op + +async_write=false, no_sync_meta=true +BenchmarkStorage/BenchmarkPutDelete1kInfoHash1k-16 306 4001353 ns/op + +async_write=true, no_sync_meta=false +BenchmarkStorage/BenchmarkPutGradDelete-16 152656 7754 ns/op + +async_write=true, no_sync_meta=true +BenchmarkStorage/BenchmarkPutGradDelete-16 150116 7735 ns/op +``` + +**Sample configuration:** + +```yaml +storage: + name: lmdb + config: + # The frequency which stale peers are removed. + # This balances between + # - collecting garbage more often, potentially using more CPU time, but potentially using less memory (lower value) + # - collecting garbage less frequently, saving CPU time, but keeping old peers long, thus using more memory (higher value). + gc_interval: 3m + + # The amount of time until a peer is considered stale. + # To avoid churn, keep this slightly larger than `announce_interval` + peer_lifetime: 31m + + # Path to LMDB folder. Required. + Path: "/some/path/to/lmdb/directory" + + # File mode of created database files, default is 0o640 + mode: 0640 + + # Name of database to store KV data. If not provided, root DB is used (not recommended) + data_db: "DATA" + + # Name of database to store peers data. If not provided, root DB is used (not recommended) + peers_db: "PEERS" + + # Maximum size of database, default is 1GiB. + # It's better specify enough space, because if environment is full, + # storage will fail to add new records and restart and specifying larger size + # (or online resizing with external tool) will be required. + # See: http://www.lmdb.tech/doc/group__mdb.html#gaa2506ec8dab3d969b0e609cd82e619e5 + max_size: 1073741824 + + # Maximum number of threads/reader slots for the LMDB environment, default is 126. + # See: http://www.lmdb.tech/doc/group__mdb.html#gae687966c24b790630be2a41573fe40e2 + max_readers: 126 + + # Set flags to use asynchronous flushes to disk. + # See: MDB_WRITEMAP and MDB_MAPASYNC description in http://www.lmdb.tech/doc/group__mdb.html#ga32a193c6bf4d7d5c5d579e71f22e9340 + async_write: true + + # Set MDB_NOMETASYNC flag. Omit the metadata flush on commit. + # See: MDB_NOMETASYNC description in http://www.lmdb.tech/doc/group__mdb.html#ga32a193c6bf4d7d5c5d579e71f22e9340 + no_sync_meta: false +``` diff --git a/docs/storage/postgres.md b/docs/storage/postgres.md index 745c008..acc6e33 100644 --- a/docs/storage/postgres.md +++ b/docs/storage/postgres.md @@ -3,7 +3,7 @@ This storage uses PostgreSQL-like database to store peer and arbitrary key-value data. 'PostgreSQL-like' means, that you can use any database which _understand_ PostgreSQL protocol -i.e. _real_ [PostgreSQL](https://www.postgresql.org) or [CockroachDB](https://www.cockroachlabs.com). +e.g. _real_ [PostgreSQL](https://www.postgresql.org) or [CockroachDB](https://www.cockroachlabs.com). _(YugabyteDB is not recommended (at the moment), because of some problems with concurrent inserts while benchmarks.)_ diff --git a/frontend/udp/frontend_test.go b/frontend/udp/frontend_test.go index dda2955..9a1af53 100644 --- a/frontend/udp/frontend_test.go +++ b/frontend/udp/frontend_test.go @@ -16,7 +16,7 @@ func init() { } func TestStartStopRaceIssue437(t *testing.T) { - ps, err := storage.NewStorage(conf.NamedMapConfig{ + ps, err := storage.NewPeerStorage(conf.NamedMapConfig{ Name: "memory", Config: conf.MapConfig{}, }) diff --git a/go.mod b/go.mod index 4cc0de2..7202b4f 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,11 @@ module github.com/sot-tech/mochi go 1.22 require ( - code.cloudfoundry.org/go-diodes v0.0.0-20240515174142-71582f284718 + code.cloudfoundry.org/go-diodes v0.0.0-20240604201846-c756bfed2ed3 github.com/MicahParks/jwkset v0.5.18 github.com/MicahParks/keyfunc/v3 v3.3.3 - github.com/anacrolix/torrent v1.56.0 + github.com/PowerDNS/lmdb-go v1.9.2 + github.com/anacrolix/torrent v1.56.1 github.com/cespare/xxhash/v2 v2.3.0 github.com/golang-jwt/jwt/v5 v5.2.1 github.com/jackc/pgx/v5 v5.6.0 @@ -14,10 +15,10 @@ require ( github.com/minio/sha256-simd v1.0.1 github.com/mitchellh/mapstructure v1.5.0 github.com/prometheus/client_golang v1.19.1 - github.com/redis/go-redis/v9 v9.5.2 + github.com/redis/go-redis/v9 v9.5.3 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.9.0 - github.com/valyala/fasthttp v1.54.0 + github.com/valyala/fasthttp v1.55.0 gopkg.in/yaml.v3 v3.0.1 ) @@ -34,12 +35,12 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/huandu/xstrings v1.4.0 // indirect + github.com/huandu/xstrings v1.5.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect - github.com/klauspost/compress v1.17.8 // indirect - github.com/klauspost/cpuid/v2 v2.2.7 // indirect + github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mr-tron/base58 v1.2.0 // indirect @@ -47,16 +48,17 @@ require ( github.com/multiformats/go-varint v0.0.7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.53.0 // indirect + github.com/prometheus/common v0.54.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 // indirect + golang.org/x/net v0.26.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect golang.org/x/time v0.5.0 // indirect - google.golang.org/protobuf v1.34.1 // indirect + google.golang.org/protobuf v1.34.2 // indirect lukechampine.com/blake3 v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index db0142f..80c48f8 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -code.cloudfoundry.org/go-diodes v0.0.0-20240515174142-71582f284718 h1:6wazSuHaJjatGy8pvchSy6L+4M67WPfjgK9yh7cJLMs= -code.cloudfoundry.org/go-diodes v0.0.0-20240515174142-71582f284718/go.mod h1:eVHabU/rcpC5ocoIAXvnaySkdH6+PgORRVlY5l3SDys= +code.cloudfoundry.org/go-diodes v0.0.0-20240604201846-c756bfed2ed3 h1:4WCYwJmqSfV7ChDohsJB8Z0aDVklIE+n8OTBJxpif0c= +code.cloudfoundry.org/go-diodes v0.0.0-20240604201846-c756bfed2ed3/go.mod h1:8O5g1DEzJU9ktEmykKPhY4mZOM/dBENWVHKVInuuch8= crawshaw.io/iox v0.0.0-20181124134642-c51c3df30797/go.mod h1:sXBiorCo8c46JlQV3oXPKINnZ8mcqnye1EkVkqsectk= crawshaw.io/sqlite v0.3.2/go.mod h1:igAO5JulrQ1DbdZdtVq48mnZUBAPOeFzer7VhDWNtW4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -9,6 +9,8 @@ github.com/MicahParks/jwkset v0.5.18 h1:WLdyMngF7rCrnstQxA7mpRoxeaWqGzPM/0z40PJU github.com/MicahParks/jwkset v0.5.18/go.mod h1:q8ptTGn/Z9c4MwbcfeCDssADeVQb3Pk7PnVxrvi+2QY= github.com/MicahParks/keyfunc/v3 v3.3.3 h1:c6j9oSu1YUo0k//KwF1miIQlEMtqNlj7XBFLB8jtEmY= github.com/MicahParks/keyfunc/v3 v3.3.3/go.mod h1:f/UMyXdKfkZzmBeBFUeYk+zu066J1Fcl48f7Wnl5Z48= +github.com/PowerDNS/lmdb-go v1.9.2 h1:Cmgerh9y3ZKBZGz1irxSShhfmFyRUh+Zdk4cZk7ZJvU= +github.com/PowerDNS/lmdb-go v1.9.2/go.mod h1:TE0l+EZK8Z1B4dx070ZxkWTlp8RG1mjN0/+FkFRQMtU= github.com/RoaringBitmap/roaring v0.4.7/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= github.com/RoaringBitmap/roaring v0.4.17/go.mod h1:D3qVegWTmfCaX4Bl5CrBE9hfrSrrXIr8KVNvRsDi1NI= github.com/RoaringBitmap/roaring v0.4.23/go.mod h1:D0gp8kJQgE1A4LQ5wFLggQEyvDi06Mq5mKs52e1TwOo= @@ -47,8 +49,8 @@ github.com/anacrolix/stm v0.2.0/go.mod h1:zoVQRvSiGjGoTmbM0vSLIiaKjWtNPeTvXUSdJQ github.com/anacrolix/tagflag v0.0.0-20180109131632-2146c8d41bf0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.0.0/go.mod h1:1m2U/K6ZT+JZG0+bdMK6qauP49QT4wE5pmhJXOKKCHw= github.com/anacrolix/tagflag v1.1.0/go.mod h1:Scxs9CV10NQatSmbyjqmqmeQNwGzlNe0CMUMIxqHIG8= -github.com/anacrolix/torrent v1.56.0 h1:g/sM0K/BaWUv4Htu2bblLBhIxGdFZ1MUCoD7lcvemlo= -github.com/anacrolix/torrent v1.56.0/go.mod h1:5DMHbeIM1TuC5wTQ99XieKKLiYZYz6iB2lyZpKZEr6w= +github.com/anacrolix/torrent v1.56.1 h1:QeJMOP0NuhpQ5dATsOqEL0vUO85aPMNMGP2FACNt0Eg= +github.com/anacrolix/torrent v1.56.1/go.mod h1:5DMHbeIM1TuC5wTQ99XieKKLiYZYz6iB2lyZpKZEr6w= github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= github.com/apache/thrift v0.12.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -142,12 +144,12 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huandu/xstrings v1.0.0/go.mod h1:4qWG/gcEcfX4z/mBDHJ++3ReCw9ibxbsNJbcucJdbSo= github.com/huandu/xstrings v1.2.0/go.mod h1:DvyZB1rfVYsBIigL8HwpZgxHwXozlTgGqn63UyNX5k4= github.com/huandu/xstrings v1.3.1/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= -github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= -github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI= +github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9 h1:L0QtFUgDarD7Fpv9jeVMgy/+Ec0mtnmYuImjTz6dtDA= -github.com/jackc/pgservicefile v0.0.0-20231201235250-de7065d80cb9/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY= github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= @@ -158,10 +160,10 @@ github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVY github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= -github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= -github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= -github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= +github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= @@ -200,8 +202,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo/v2 v2.17.3 h1:oJcvKpIb7/8uLpDDtnQuf18xVnwKp8DTD7DQ6gTd/MU= -github.com/onsi/ginkgo/v2 v2.17.3/go.mod h1:nP2DPOQoNsQmsVyv5rDA8JkXQoCs6goXIvr/PRJ1eCc= +github.com/onsi/ginkgo/v2 v2.19.0 h1:9Cnnf7UHo57Hy3k6/m5k3dRfGTMXGvxhHFvkDTCTpvA= +github.com/onsi/ginkgo/v2 v2.19.0/go.mod h1:rlwLi9PilAFJ8jCg9UE1QP6VBpd6/xj3SRC0d6TU0To= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.33.1 h1:dsYjIxxSR755MDmKVsaFQTE22ChNBcuuTWgkUDSubOk= github.com/onsi/gomega v1.33.1/go.mod h1:U4R44UsT+9eLIaYRB2a5qajjtQYn0hauxvRm16AVYg0= @@ -228,8 +230,8 @@ github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQy github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= -github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE= -github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= +github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= +github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= @@ -238,8 +240,8 @@ github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4 github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= -github.com/redis/go-redis/v9 v9.5.2 h1:L0L3fcSNReTRGyZ6AqAEN0K56wYeYAwapBIhkvh0f3E= -github.com/redis/go-redis/v9 v9.5.2/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/redis/go-redis/v9 v9.5.3 h1:fOAp1/uJG+ZtcITgZOfYFmTKPE7n4Vclj1wZFgRciUU= +github.com/redis/go-redis/v9 v9.5.3/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= @@ -268,8 +270,8 @@ github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDW github.com/tinylib/msgp v1.1.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.54.0 h1:cCL+ZZR3z3HPLMVfEYVUMtJqVaui0+gu7Lx63unHwS0= -github.com/valyala/fasthttp v1.54.0/go.mod h1:6dt4/8olwq9QARP/TDuPmWyWcl4byhpvTJ4AAtcz+QM= +github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8= +github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM= github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.10/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= @@ -277,11 +279,11 @@ go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc h1:O9NuF4s+E/PvMIy+9IUZB9znFwUIXEWSstNjek6VpVg= -golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= @@ -296,8 +298,8 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -322,12 +324,12 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -336,8 +338,8 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw= -golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.22.0 h1:gqSGLZqv+AI9lIQzniJ0nZDRG5GBPsSi+DRNHWNz6yA= +golang.org/x/tools v0.22.0/go.mod h1:aCwcsjqvq7Yqt6TNyX7QMU2enbQ/Gt0bo6krSeEri+c= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.3.1/go.mod h1:6wY9I6uQWHQ8EM57III9mq/AjF+i8G65rmVagqKMtkk= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= @@ -353,8 +355,8 @@ google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= -google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= -google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/middleware/hooks.go b/middleware/hooks.go index 0d25686..f307110 100644 --- a/middleware/hooks.go +++ b/middleware/hooks.go @@ -22,7 +22,7 @@ type Hook interface { // to check if it is operational. Used in frontend.Logic. // // It may be useful in cases when Hook performs foreign requests to -// some external resources (i.e. storage) and `ping` request should +// some external resources (e.g. storage) and `ping` request should // also check resource availability. type Pinger interface { Ping(ctx context.Context) error diff --git a/middleware/torrentapproval/torrentapproval.go b/middleware/torrentapproval/torrentapproval.go index c873967..80962b1 100644 --- a/middleware/torrentapproval/torrentapproval.go +++ b/middleware/torrentapproval/torrentapproval.go @@ -4,6 +4,7 @@ package torrentapproval import ( "context" + "errors" "fmt" "io" @@ -11,19 +12,20 @@ import ( "github.com/sot-tech/mochi/middleware" "github.com/sot-tech/mochi/middleware/torrentapproval/container" "github.com/sot-tech/mochi/pkg/conf" - "github.com/sot-tech/mochi/storage/memory" + "github.com/sot-tech/mochi/storage" // import directory watcher to enable appropriate support _ "github.com/sot-tech/mochi/middleware/torrentapproval/container/directory" // import static list to enable appropriate support _ "github.com/sot-tech/mochi/middleware/torrentapproval/container/list" - "github.com/sot-tech/mochi/storage" ) // Name is the name by which this middleware is registered with Conf. const Name = "torrent approval" +const internalStore = "internal" + func init() { middleware.RegisterBuilder(Name, build) } @@ -31,9 +33,10 @@ func init() { type baseConfig struct { // Source - name of container for initial values Source string `cfg:"initial_source"` - // Preserve - if true, container will receive real registered storage if it is NOT `memory` - // if false - temporary in-memory storage will be used or created + // Deprecated: use Storage parameter Preserve bool + // Storage where to hold provided data by Source + Storage conf.NamedMapConfig // Configuration depends on used container Configuration conf.MapConfig } @@ -52,14 +55,22 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er return nil, fmt.Errorf("invalid config for middleware %s: config not provided", Name) } - var ds storage.DataStorage = st - if !cfg.Preserve && ds.Preservable() { - ds = memory.NewDataStorage() + if cfg.Preserve { + return nil, errors.New("preserve option is deprecated, use store parameter") + } + + var ds, dsc storage.DataStorage + if len(cfg.Storage.Name) == 0 || cfg.Storage.Name == internalStore { + ds = st + } else if ds, err = storage.NewDataStorage(cfg.Storage); err == nil { + dsc = ds + } else { + return } var c container.Container if c, err = container.GetContainer(cfg.Source, cfg.Configuration, ds); err == nil { - h = &hook{c} + h = &hook{c, dsc} } return h, err } @@ -68,7 +79,8 @@ func build(config conf.MapConfig, st storage.PeerStorage) (h middleware.Hook, er var ErrTorrentUnapproved = bittorrent.ClientError("torrent not allowed by mochi") type hook struct { - hashContainer container.Container + hashContainer container.Container + providedStorage storage.DataStorage } func (h *hook) HandleAnnounce(ctx context.Context, req *bittorrent.AnnounceRequest, _ *bittorrent.AnnounceResponse) (context.Context, error) { @@ -90,5 +102,10 @@ func (h *hook) Close() (err error) { if cl, isOk := h.hashContainer.(io.Closer); isOk { err = cl.Close() } + if h.providedStorage != nil { + if stErr := h.providedStorage.Close(); stErr != nil { + err = errors.Join(err, stErr) + } + } return err } diff --git a/middleware/torrentapproval/torrentapproval_test.go b/middleware/torrentapproval/torrentapproval_test.go index a6ca2b9..daaf84f 100644 --- a/middleware/torrentapproval/torrentapproval_test.go +++ b/middleware/torrentapproval/torrentapproval_test.go @@ -71,8 +71,7 @@ var cases = []struct { } func TestHandleAnnounce(t *testing.T) { - config := memory.Config{}.Validate() - storage, err := memory.NewPeerStorage(config) + storage, err := memory.Builder{}.NewPeerStorage(make(conf.MapConfig)) require.Nil(t, err) for _, tt := range cases { t.Run(fmt.Sprintf("testing hash %s", tt.ih), func(t *testing.T) { @@ -84,10 +83,10 @@ func TestHandleAnnounce(t *testing.T) { req := &bittorrent.AnnounceRequest{} resp := &bittorrent.AnnounceResponse{} - hashinfo, err := bittorrent.NewInfoHashString(tt.ih) + ih, err := bittorrent.NewInfoHashString(tt.ih) require.Nil(t, err) - req.InfoHash = hashinfo + req.InfoHash = ih nctx, err := h.HandleAnnounce(ctx, req, resp) require.Equal(t, ctx, nctx) diff --git a/storage/keydb/storage.go b/storage/keydb/storage.go index 7c3facf..d5fddd0 100644 --- a/storage/keydb/storage.go +++ b/storage/keydb/storage.go @@ -33,10 +33,12 @@ var ( func init() { // Register the storage driver. - storage.RegisterDriver("keydb", builder) + storage.RegisterDriver("keydb", builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { +type builder struct{} + +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { var cfg r.Config var err error @@ -47,6 +49,16 @@ func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { return newStore(cfg) } +func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + var cfg r.Config + + if err := icfg.Unmarshal(&cfg); err != nil { + return nil, err + } + + return r.NewStore(cfg) +} + func newStore(cfg r.Config) (*store, error) { var err error if cfg, err = cfg.Validate(); err != nil { diff --git a/storage/mdb/storage.go b/storage/mdb/storage.go new file mode 100644 index 0000000..b8b406c --- /dev/null +++ b/storage/mdb/storage.go @@ -0,0 +1,598 @@ +//go:build cgo + +// Package mdb implements LMDB data and peer storage +package mdb + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "net/netip" + "os" + "sync" + "time" + + "github.com/PowerDNS/lmdb-go/exp/lmdbsync" + "github.com/PowerDNS/lmdb-go/lmdb" + "github.com/PowerDNS/lmdb-go/lmdbscan" + + "github.com/sot-tech/mochi/bittorrent" + "github.com/sot-tech/mochi/pkg/conf" + "github.com/sot-tech/mochi/pkg/log" + "github.com/sot-tech/mochi/pkg/timecache" + "github.com/sot-tech/mochi/storage" +) + +const ( + // Name - registered name of the storage + Name = "lmdb" + defaultMode = 0o640 + defaultMapSize = 1 << 30 + defaultMaxReaders = 126 +) + +var logger = log.NewLogger("storage/lmdb") + +func init() { + // Register the storage driver. + storage.RegisterDriver(Name, builder{}) +} + +type builder struct{} + +func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + return b.NewPeerStorage(icfg) +} + +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg config + if err := icfg.Unmarshal(&cfg); err != nil { + return nil, err + } + return newStorage(cfg) +} + +type config struct { + Path string + Mode uint32 + DataDBName string `cfg:"data_db"` + PeersDBName string `cfg:"peers_db"` + // MaxSize - size of the memory map to use for lmdb environment. + // The size should be a multiple of the OS page size. + // Mochi's default is 1GiB. + MaxSize int64 `cfg:"max_size"` + // MaxReaders - maximum number of threads/reader slots for the LMDB environment. + // LMDB library's default is 126. + MaxReaders int `cfg:"max_readers"` + // AsyncWrite sets MDB_WRITEMAP and MDB_MAPASYNC flags to use asynchronous flushes to disk. + AsyncWrite bool `cfg:"async_write"` + // NoMetaSync sets MDB_NOMETASYNC flag, omit the metadata flush. + NoMetaSync bool `cfg:"no_sync_meta"` +} + +var ( + errPathNotProvided = errors.New("lmdb path not provided") + errPathNotDirectory = errors.New("lmdb path is not directory") +) + +func (cfg config) validate() (config, error) { + validCfg := cfg + if len(cfg.Path) == 0 { + return cfg, errPathNotProvided + } else if stat, err := os.Stat(cfg.Path); err != nil { + return cfg, err + } else if !stat.IsDir() { + return cfg, errPathNotDirectory + } + if cfg.Mode == 0 { + validCfg.Mode = defaultMode + logger.Warn(). + Str("name", "mode"). + Stringer("provided", os.FileMode(cfg.Mode)). + Stringer("default", os.FileMode(validCfg.Mode)). + Msg("falling back to default configuration") + } + if cfg.MaxSize <= 0 { + validCfg.MaxSize = defaultMapSize + logger.Warn(). + Str("name", "max_size"). + Int64("provided", cfg.MaxSize). + Int64("default", validCfg.MaxSize). + Msg("falling back to default configuration") + } + if cfg.MaxReaders <= 0 { + validCfg.MaxReaders = defaultMaxReaders + logger.Warn(). + Str("name", "max_readers"). + Int("provided", cfg.MaxReaders). + Int("default", 126). + Msg("falling back to default configuration") + } + return validCfg, nil +} + +type lmdbEnv interface { + SetMaxDBs(int) error + SetMapSize(int64) error + SetMaxReaders(int) error + Open(string, uint, os.FileMode) error + View(lmdb.TxnOp) error + Update(lmdb.TxnOp) error + Close() error + Sync(bool) error + Info() (*lmdb.EnvInfo, error) +} + +type mdb struct { + lmdbEnv + dataDB, peersDB lmdb.DBI + onceCloser sync.Once + closed chan any + wg sync.WaitGroup +} + +func newStorage(cfg config) (*mdb, error) { + var err error + if cfg, err = cfg.validate(); err != nil { + return nil, err + } + lmEnv, err := lmdb.NewEnv() + if err != nil { + return nil, err + } + var env lmdbEnv + + if env, err = lmdbsync.NewEnv(lmEnv, + lmdbsync.MapResizedHandler(lmdbsync.MapResizedDefaultRetry, lmdbsync.MapResizedDefaultDelay), + ); err != nil { + return nil, err + } + + if err = env.SetMaxDBs(2); err != nil { + return nil, err + } + if err = env.SetMapSize(cfg.MaxSize); err != nil { + return nil, err + } + if cfg.MaxReaders > 0 { + if err = env.SetMaxReaders(cfg.MaxReaders); err != nil { + return nil, err + } + } + + var flags uint + if cfg.AsyncWrite { + flags |= lmdb.WriteMap | lmdb.MapAsync + } + if cfg.NoMetaSync { + flags |= lmdb.NoMetaSync + } + + if err = env.Open(cfg.Path, flags, os.FileMode(cfg.Mode)); err != nil { + return nil, err + } + + var dataDB, peersDB lmdb.DBI + if err = env.Update(func(txn *lmdb.Txn) (err error) { + if len(cfg.DataDBName) > 0 { + dataDB, err = txn.CreateDBI(cfg.DataDBName) + } else { + dataDB, err = txn.OpenRoot(0) + } + if err != nil { + return + } + if len(cfg.PeersDBName) > 0 { + peersDB, err = txn.CreateDBI(cfg.PeersDBName) + } else { + peersDB, err = txn.OpenRoot(0) + } + return + }); err != nil { + _ = env.Close() + return nil, err + } + + return &mdb{ + lmdbEnv: env, + dataDB: dataDB, + peersDB: peersDB, + closed: make(chan any), + }, nil +} + +func (*mdb) Preservable() bool { + return true +} + +func (m *mdb) Close() (err error) { + m.onceCloser.Do(func() { + if m.lmdbEnv != nil { + close(m.closed) + m.wg.Wait() + logger.Info().Msg("LMDB exiting. Flushing databases to disk") + _ = m.Sync(true) + err = m.lmdbEnv.Close() + } + }) + return +} + +const keySeparator = '_' + +func ignoreNotFound(err error) error { + if lmdb.IsNotFound(err) { + err = nil + } + return err +} + +func ignoreNotFoundData(data []byte, err error) ([]byte, error) { + if lmdb.IsNotFound(err) { + err = nil + } + return data, err +} + +func composeKey(ctx, key string) []byte { + ctxLen := len(ctx) + res := make([]byte, ctxLen+len(key)+1) + copy(res, ctx) + res[ctxLen] = keySeparator + copy(res[ctxLen+1:], key) + return res +} + +func (m *mdb) Put(_ context.Context, storeCtx string, values ...storage.Entry) (err error) { + if len(values) > 0 { + err = m.Update(func(txn *lmdb.Txn) (err error) { + var data []byte + for _, kv := range values { + vl := len(kv.Value) + if data, err = txn.PutReserve(m.dataDB, composeKey(storeCtx, kv.Key), vl, 0); err == nil { + copy(data, kv.Value) + } else { + break + } + } + return + }) + } + return +} + +func (m *mdb) Contains(_ context.Context, storeCtx string, key string) (contains bool, err error) { + err = m.View(func(txn *lmdb.Txn) (err error) { + _, err = txn.Get(m.dataDB, composeKey(storeCtx, key)) + return + }) + if err == nil { + contains = true + } else if lmdb.IsNotFound(err) { + err = nil + } + return +} + +func (m *mdb) Load(_ context.Context, storeCtx string, key string) (v []byte, err error) { + err = m.View(func(txn *lmdb.Txn) (err error) { + v, err = ignoreNotFoundData(txn.Get(m.dataDB, composeKey(storeCtx, key))) + return + }) + return +} + +func (m *mdb) Delete(_ context.Context, storeCtx string, keys ...string) (err error) { + if len(keys) > 0 { + err = m.Update(func(txn *lmdb.Txn) (err error) { + for _, k := range keys { + if err = ignoreNotFound(txn.Del(m.dataDB, composeKey(storeCtx, k), nil)); err != nil { + break + } + } + return + }) + } + return +} + +const ( + ipLen = 16 + packedPeerLen = bittorrent.PeerIDLen + ipLen + 2 // peer_id + ipv6 + port + seederPrefix = 'S' + leecherPrefix = 'L' + ipv4Prefix = '4' + ipv6Prefix = '6' + countPrefix = 'C' + downloadedPrefix = 'D' +) + +func packPeer(peer bittorrent.Peer, out []byte) { + _ = out[packedPeerLen-1] + copy(out, peer.ID.Bytes()) + a := peer.Addr().As16() + copy(out[bittorrent.PeerIDLen:], a[:]) + binary.BigEndian.PutUint16(out[bittorrent.PeerIDLen+ipLen:], peer.Port()) +} + +func unpackPeer(arr []byte) (peer bittorrent.Peer) { + _ = arr[packedPeerLen-1] + peerID, _ := bittorrent.NewPeerID(arr[:bittorrent.PeerIDLen]) + peer = bittorrent.Peer{ + ID: peerID, + AddrPort: netip.AddrPortFrom(netip.AddrFrom16([ipLen]byte(arr[bittorrent.PeerIDLen:])).Unmap(), + binary.BigEndian.Uint16(arr[bittorrent.PeerIDLen+ipLen:])), + } + return +} + +func composeIHKeyPrefix(ih []byte, seeder bool, v6 bool, suffixLen int) (ihKey []byte, suffixStart int) { + ihLen := len(ih) + ihKey = make([]byte, ihLen+4+suffixLen) // prefix{L/S} + prefix{4/6} + separator + infoHash + separator + if seeder { + ihKey[0] = seederPrefix + } else { + ihKey[0] = leecherPrefix + } + if v6 { + ihKey[1] = ipv6Prefix + } else { + ihKey[1] = ipv4Prefix + } + ihKey[2], ihKey[ihLen+3] = keySeparator, keySeparator + copy(ihKey[3:], ih) + suffixStart = len(ihKey) - suffixLen + return +} + +func composeIHKey(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) (ihKey []byte) { + ihKey, start := composeIHKeyPrefix(ih.Bytes(), seeder, peer.Addr().Is6(), packedPeerLen) + packPeer(peer, ihKey[start:]) + return +} + +func (m *mdb) putPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { + ihKey := composeIHKey(ih, peer, seeder) + return m.Update(func(txn *lmdb.Txn) (err error) { + var b []byte + if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err == nil { + binary.BigEndian.PutUint64(b, uint64(timecache.NowUnix())) + } + return + }) +} + +func (m *mdb) delPeer(ih bittorrent.InfoHash, peer bittorrent.Peer, seeder bool) error { + ihKey := composeIHKey(ih, peer, seeder) + return m.Update(func(txn *lmdb.Txn) error { + return ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)) + }) +} + +func (m *mdb) PutSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return m.putPeer(ih, peer, true) +} + +func (m *mdb) DeleteSeeder(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return m.delPeer(ih, peer, true) +} + +func (m *mdb) PutLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return m.putPeer(ih, peer, false) +} + +func (m *mdb) DeleteLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + return m.delPeer(ih, peer, false) +} + +func (m *mdb) GraduateLeecher(_ context.Context, ih bittorrent.InfoHash, peer bittorrent.Peer) error { + ihKey := composeIHKey(ih, peer, false) + return m.Update(func(txn *lmdb.Txn) (err error) { + if err = ignoreNotFound(txn.Del(m.peersDB, ihKey, nil)); err != nil { + return + } + ihKey[0] = seederPrefix + var b []byte + if b, err = txn.PutReserve(m.peersDB, ihKey, 8, 0); err != nil { + return + } + binary.BigEndian.PutUint64(b, uint64(timecache.NowUnixNano())) + + ihPrefix := ihKey[:len(ihKey)-packedPeerLen] + ihPrefix[0], ihPrefix[1] = downloadedPrefix, countPrefix + var v int + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, ihPrefix)); err != nil { + return + } + if len(b) >= 4 { + v = int(binary.BigEndian.Uint32(b)) + } + v++ + if b, err = txn.PutReserve(m.peersDB, ihPrefix, 4, 0); err == nil { + binary.BigEndian.PutUint32(b, uint32(v)) + } + return + }) +} + +func (m *mdb) scanPeers(ctx context.Context, prefix []byte, readRaw bool, fn func(k, v []byte) bool) (err error) { + m.wg.Add(1) + prefixLen := len(prefix) + err = m.View(func(txn *lmdb.Txn) (err error) { + txn.RawRead = readRaw + scanner := lmdbscan.New(txn, m.peersDB) + var op uint = lmdb.SetRange + if prefixLen == 0 { + op = lmdb.First + } + if scanner.SetNext(prefix, nil, op, lmdb.Next) { + loop: + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + k := scanner.Key() + if !bytes.HasPrefix(k, prefix) { + break loop + } + if prefixLen == 0 || len(k) == prefixLen+packedPeerLen { + if !fn(k, scanner.Val()) { + break loop + } + } else { + logger.Warn().Int("expected", prefixLen+packedPeerLen).Int("got", len(k)). + Msg("Invalid key length") + } + } + } + err = scanner.Err() + } + scanner.Close() + return + }) + m.wg.Done() + + return +} + +func (m *mdb) AnnouncePeers(ctx context.Context, ih bittorrent.InfoHash, forSeeder bool, numWant int, v6 bool) (peers []bittorrent.Peer, err error) { + peers = make([]bittorrent.Peer, 0, numWant) + prefix, prefixLen := composeIHKeyPrefix(ih.Bytes(), false, v6, 0) + appendFn := func(k, _ []byte) bool { + peers = append(peers, unpackPeer(k[prefixLen:])) + numWant-- + return numWant > 0 + } + if forSeeder { + err = m.scanPeers(ctx, prefix, true, appendFn) + } else { + prefix[0] = seederPrefix + if err = m.scanPeers(ctx, prefix, true, appendFn); err == nil && numWant > 0 { + prefix[0] = leecherPrefix + err = m.scanPeers(ctx, prefix, true, appendFn) + } + } + return +} + +func (m *mdb) countPeers(ctx context.Context, scanPrefix []byte) (cnt uint32, err error) { + m.wg.Add(1) + err = m.View(func(txn *lmdb.Txn) (err error) { + txn.RawRead = true + scanner := lmdbscan.New(txn, m.peersDB) + if scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) { + var prevKey []byte + loop: + for scanner.Scan() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + k := scanner.Key() + if len(k) == len(scanPrefix)+packedPeerLen && bytes.HasPrefix(scanPrefix, k[:len(k)-packedPeerLen]) { + if !bytes.Equal(k, prevKey) { + cnt++ + prevKey = k + } + } else if scanPrefix[1] == ipv4Prefix { + scanPrefix[1] = ipv6Prefix + if !scanner.SetNext(scanPrefix, nil, lmdb.SetRange, lmdb.Next) { + break loop + } + } else { + break loop + } + } + } + } + err = scanner.Err() + scanner.Close() + return + }) + m.wg.Done() + + return +} + +func (m *mdb) ScrapeSwarm(ctx context.Context, ih bittorrent.InfoHash) (leechers uint32, seeders uint32, snatched uint32, err error) { + scanPrefix, _ := composeIHKeyPrefix(ih.Bytes(), false, false, 0) + if leechers, err = m.countPeers(ctx, scanPrefix); err != nil { + return + } + scanPrefix[0], scanPrefix[1] = seederPrefix, ipv4Prefix + if seeders, err = m.countPeers(ctx, scanPrefix); err != nil { + return + } + + scanPrefix[0], scanPrefix[1] = downloadedPrefix, countPrefix + err = m.View(func(txn *lmdb.Txn) (err error) { + var b []byte + if b, err = ignoreNotFoundData(txn.Get(m.peersDB, scanPrefix)); err != nil { + return + } + if len(b) >= 4 { + snatched = binary.BigEndian.Uint32(b) + } + return + }) + return +} + +const ( + v1IHKeyLen = bittorrent.InfoHashV1Len + 4 + packedPeerLen + v2IHKeyPen = bittorrent.InfoHashV2Len + 4 + packedPeerLen +) + +func (m *mdb) gc(cutoff time.Time) { + toDel := make([][]byte, 0, 50) + cutoffUnix := cutoff.Unix() + err := m.scanPeers(context.Background(), nil, false, func(k, v []byte) bool { + if l := len(k); (l == v1IHKeyLen || l == v2IHKeyPen) && + (k[0] == seederPrefix || k[0] == leecherPrefix) && + (k[1] == ipv4Prefix || k[1] == ipv6Prefix) && + k[2] == keySeparator && len(v) >= 8 && cutoffUnix >= int64(binary.BigEndian.Uint64(v)) { + toDel = append(toDel, k) + } + return true + }) + if err == nil { + err = m.Update(func(txn *lmdb.Txn) (err error) { + for _, k := range toDel { + if err = txn.Del(m.peersDB, k, nil); err != nil { + break + } + } + return + }) + } + if err == nil { + _ = m.Sync(true) + } else { + logger.Err(err).Msg("Error occurred while GC") + } +} + +func (m *mdb) ScheduleGC(gcInterval, peerLifeTime time.Duration) { + m.wg.Add(1) + go func() { + defer m.wg.Done() + t := time.NewTimer(gcInterval) + defer t.Stop() + for { + select { + case <-m.closed: + return + case <-t.C: + m.gc(time.Now().Add(-peerLifeTime)) + t.Reset(gcInterval) + } + } + }() +} + +func (m *mdb) Ping(_ context.Context) error { + _, err := m.Info() + return err +} diff --git a/storage/mdb/storage_disabled.go b/storage/mdb/storage_disabled.go new file mode 100644 index 0000000..7346ef5 --- /dev/null +++ b/storage/mdb/storage_disabled.go @@ -0,0 +1,3 @@ +//go:build !cgo + +package mdb diff --git a/storage/mdb/storage_test.go b/storage/mdb/storage_test.go new file mode 100644 index 0000000..1fc8485 --- /dev/null +++ b/storage/mdb/storage_test.go @@ -0,0 +1,59 @@ +//go:build cgo + +package mdb + +import ( + "fmt" + "os" + "testing" + + s "github.com/sot-tech/mochi/storage" + "github.com/sot-tech/mochi/storage/test" +) + +const tmpPath = "" + +var cfg = config{ + Path: "", + Mode: defaultMode, + DataDBName: "KV", + PeersDBName: "PEERS", + MaxSize: defaultMapSize, + MaxReaders: defaultMaxReaders, + AsyncWrite: true, + NoMetaSync: false, +} + +func createNew() s.PeerStorage { + var ps s.PeerStorage + var err error + ps, err = newStorage(cfg) + if err != nil { + panic(fmt.Sprint("Unable to open/create LMDB: ", err)) + } + return ps +} + +func TestStorage(t *testing.T) { + tmpDir, err := os.MkdirTemp(tmpPath, "lmdb*") + if err != nil { + t.Error(err) + } + t.Cleanup(func() { + _ = os.RemoveAll(tmpDir) + }) + cfg.Path = tmpDir + test.RunTests(t, createNew()) +} + +func BenchmarkStorage(b *testing.B) { + tmpDir, err := os.MkdirTemp(tmpPath, "lmdb*") + if err != nil { + b.Error(err) + } + b.Cleanup(func() { + _ = os.RemoveAll(tmpDir) + }) + cfg.Path = tmpDir + test.RunBenchmarks(b, createNew) +} diff --git a/storage/memory/storage.go b/storage/memory/storage.go index 70ebd46..d5f3d93 100644 --- a/storage/memory/storage.go +++ b/storage/memory/storage.go @@ -32,27 +32,31 @@ var logger = log.NewLogger("storage/memory") func init() { // Register the storage driver. - storage.RegisterDriver(Name, builder) + storage.RegisterDriver(Name, Builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { - var cfg Config +// Builder is structure to create new in-memory peer or data storage +type Builder struct{} + +// NewDataStorage creates new in-memory KV storage. Does not need configuration +func (Builder) NewDataStorage(conf.MapConfig) (storage.DataStorage, error) { + return dataStorage(), nil +} + +// NewPeerStorage creates new in-memory peer storage +func (Builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg config if err := icfg.Unmarshal(&cfg); err != nil { return nil, err } - return NewPeerStorage(cfg) + return peerStorage(cfg) } -// Config holds the configuration of a memory PeerStorage. -type Config struct { +type config struct { ShardCount int `cfg:"shard_count"` } -// Validate sanity checks values set in a config and returns a new config with -// default values replacing anything that is invalid. -// -// This function warns to the logger when a value is changed. -func (cfg Config) Validate() Config { +func (cfg config) validate() config { validcfg := cfg if cfg.ShardCount <= 0 || cfg.ShardCount > (math.MaxInt/2) { @@ -67,12 +71,11 @@ func (cfg Config) Validate() Config { return validcfg } -// NewPeerStorage creates a new PeerStorage backed by memory. -func NewPeerStorage(provided Config) (storage.PeerStorage, error) { - cfg := provided.Validate() +func peerStorage(provided config) (storage.PeerStorage, error) { + cfg := provided.validate() ps := &peerStore{ shards: make([]*peerShard, cfg.ShardCount*2), - DataStorage: NewDataStorage(), + DataStorage: dataStorage(), closed: make(chan any), } @@ -453,8 +456,7 @@ func (ps *peerStore) ScrapeSwarm(_ context.Context, ih bittorrent.InfoHash) (lee return } -// NewDataStorage creates new in-memory data store -func NewDataStorage() storage.DataStorage { +func dataStorage() storage.DataStorage { return new(dataStore) } diff --git a/storage/memory/storage_test.go b/storage/memory/storage_test.go index c8e3b7f..6c59053 100644 --- a/storage/memory/storage_test.go +++ b/storage/memory/storage_test.go @@ -8,7 +8,7 @@ import ( ) func createNew() storage.PeerStorage { - ps, err := NewPeerStorage(Config{ShardCount: 1024}) + ps, err := peerStorage(config{ShardCount: 1024}) if err != nil { panic(err) } diff --git a/storage/pg/storage.go b/storage/pg/storage.go index b7873b0..3332b62 100644 --- a/storage/pg/storage.go +++ b/storage/pg/storage.go @@ -51,13 +51,37 @@ var ( func init() { // Register the storage builder. - storage.RegisterDriver("pg", builder) + storage.RegisterDriver("pg", builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { - var cfg Config +type builder struct{} - if err := icfg.Unmarshal(&cfg); err != nil { +func (builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + var cfg config + + err := icfg.Unmarshal(&cfg) + if err != nil { + return nil, err + } + + cfg, err = cfg.validateDataStore() + if err != nil { + return nil, err + } + + return newStore(cfg) +} + +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg config + + err := icfg.Unmarshal(&cfg) + if err != nil { + return nil, err + } + + cfg, err = cfg.validateFull() + if err != nil { return nil, err } @@ -73,19 +97,14 @@ func noResultErr(err error) error { return err } -func newStore(cfg Config) (storage.PeerStorage, error) { - cfg, err := cfg.Validate() - if err != nil { - return nil, err - } - +func newStore(cfg config) (storage.PeerStorage, error) { con, err := pgxpool.New(context.Background(), cfg.ConnectionString) if err != nil { return nil, err } return &store{ - Config: cfg, + config: cfg, Pool: con, wg: sync.WaitGroup{}, closed: make(chan any), @@ -121,8 +140,14 @@ type downloadQueryConf struct { IncrementQuery string `cfg:"inc_query"` } -// Config holds the configuration of a redis PeerStorage. -type Config struct { +func checkParameter(p *string, name string) (err error) { + if *p = strings.TrimSpace(*p); len(*p) == 0 { + err = fmt.Errorf(errRequiredParameterNotSetMsg, name) + } + return +} + +type config struct { ConnectionString string `cfg:"connection_string"` PingQuery string `cfg:"ping_query"` Peer peerQueryConf @@ -133,11 +158,7 @@ type Config struct { InfoHashCountQuery string `cfg:"info_hash_count_query"` } -// Validate sanity checks values set in a config and returns a new config with -// default values replacing anything that is invalid. -// -// This function warns to the logger when a value is changed. -func (cfg Config) Validate() (Config, error) { +func (cfg config) validateDataStore() (config, error) { validCfg := cfg validCfg.ConnectionString = strings.TrimSpace(validCfg.ConnectionString) if len(validCfg.ConnectionString) == 0 { @@ -153,66 +174,68 @@ func (cfg Config) Validate() (Config, error) { Msg("falling back to default configuration") } - fn := func(p *string, name string) (err error) { - if *p = strings.TrimSpace(*p); len(*p) == 0 { - err = fmt.Errorf(errRequiredParameterNotSetMsg, name) - } - return - } - - if err := fn(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil { + if err := checkParameter(&validCfg.Data.AddQuery, "data.addQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil { + if err := checkParameter(&validCfg.Data.GetQuery, "data.getQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil { + if err := checkParameter(&validCfg.Data.DelQuery, "data.delQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil { + return validCfg, nil +} + +func (cfg config) validateFull() (config, error) { + validCfg, err := cfg.validateDataStore() + if err != nil { return cfg, err } - if err := fn(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.AddQuery, "peer.addQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.DelQuery, "peer.delQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil { + if err = checkParameter(&validCfg.Peer.GraduateQuery, "peer.graduateQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.Query, "announce.query"); err != nil { + if err = checkParameter(&validCfg.Peer.CountQuery, "peer.countQuery"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.CountSeedersColumn, "peer.countSeedersColumn"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.CountLeechersColumn, "peer.countLeechersColumn"); err != nil { return cfg, err } - if err := fn(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil { + if err = checkParameter(&validCfg.Peer.ByInfoHashClause, "peer.byInfoHashClause"); err != nil { return cfg, err } - if err := fn(&validCfg.Data.AddQuery, "data.addQuery"); err != nil { + if err = checkParameter(&validCfg.Announce.Query, "announce.query"); err != nil { return cfg, err } - if err := fn(&validCfg.Data.GetQuery, "data.getQuery"); err != nil { + if err = checkParameter(&validCfg.Announce.PeerIDColumn, "announce.peerIDColumn"); err != nil { return cfg, err } - if err := fn(&validCfg.Data.DelQuery, "data.delQuery"); err != nil { + if err = checkParameter(&validCfg.Announce.AddressColumn, "announce.addressColumn"); err != nil { + return cfg, err + } + + if err = checkParameter(&validCfg.Announce.PortColumn, "announce.portColumn"); err != nil { return cfg, err } @@ -227,7 +250,7 @@ func (cfg Config) Validate() (Config, error) { } type store struct { - Config + config *pgxpool.Pool wg sync.WaitGroup closed chan any diff --git a/storage/pg/storage_test.go b/storage/pg/storage_test.go index bab82de..c058a6a 100644 --- a/storage/pg/storage_test.go +++ b/storage/pg/storage_test.go @@ -12,7 +12,7 @@ import ( const ( createTablesQuery = ` DROP TABLE IF EXISTS mo_peers; -CREATE TABLE mo_peers ( +CREATE UNLOGGED TABLE mo_peers ( info_hash bytea NOT NULL, peer_id bytea NOT NULL, address inet NOT NULL, @@ -27,7 +27,7 @@ CREATE INDEX mo_peers_created_idx ON mo_peers(created); CREATE INDEX mo_peers_announce_idx ON mo_peers(info_hash, is_seeder, is_v6); DROP TABLE IF EXISTS mo_downloads; -CREATE TABLE mo_downloads ( +CREATE UNLOGGED TABLE mo_downloads ( info_hash bytea PRIMARY KEY NOT NULL, downloads int NOT NULL DEFAULT 1 ); @@ -42,7 +42,7 @@ CREATE TABLE mo_kv ( ` ) -var cfg = Config{ +var cfg = config{ ConnectionString: "host=127.0.0.1 database=test user=postgres pool_max_conns=50", PingQuery: "SELECT 1", Peer: peerQueryConf{ @@ -76,6 +76,10 @@ var cfg = Config{ func createNew() s.PeerStorage { var ps s.PeerStorage var err error + cfg, err = cfg.validateFull() + if err != nil { + panic(fmt.Sprint("invalid configuration: ", err)) + } ps, err = newStore(cfg) if err != nil { panic(fmt.Sprint("Unable to create PostgreSQL connection: ", err, "\nThis driver needs real PostgreSQL instance")) diff --git a/storage/redis/storage.go b/storage/redis/storage.go index 8cf717a..3f6454f 100644 --- a/storage/redis/storage.go +++ b/storage/redis/storage.go @@ -35,6 +35,7 @@ import ( "time" "github.com/redis/go-redis/v9" + "github.com/sot-tech/mochi/pkg/str2bytes" "github.com/sot-tech/mochi/bittorrent" @@ -79,21 +80,28 @@ var ( func init() { // Register the storage builder. - storage.RegisterDriver("redis", builder) + storage.RegisterDriver("redis", builder{}) } -func builder(icfg conf.MapConfig) (storage.PeerStorage, error) { - // Unmarshal the bytes into the proper config type. - var cfg Config +type builder struct{} - if err := icfg.Unmarshal(&cfg); err != nil { +func (builder) NewPeerStorage(icfg conf.MapConfig) (storage.PeerStorage, error) { + var cfg Config + var err error + + if err = icfg.Unmarshal(&cfg); err != nil { return nil, err } - return newStore(cfg) + return NewStore(cfg) } -func newStore(cfg Config) (*store, error) { +func (b builder) NewDataStorage(icfg conf.MapConfig) (storage.DataStorage, error) { + return b.NewPeerStorage(icfg) +} + +// NewStore creates new redis peer storage with provided configuration structure +func NewStore(cfg Config) (storage.PeerStorage, error) { cfg, err := cfg.Validate() if err != nil { return nil, err @@ -456,31 +464,29 @@ func (ps *store) GraduateLeecher(ctx context.Context, ih bittorrent.InfoHash, pe // peerMinimumLen is the least allowed length of string serialized Peer const peerMinimumLen = bittorrent.PeerIDLen + 2 + net.IPv4len -var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (InfoHash + Port + IPv4))", peerMinimumLen) +var errInvalidPeerDataSize = fmt.Errorf("invalid peer data (must be at least %d bytes (PeerID + Port + IPv4))", peerMinimumLen) // UnpackPeer constructs Peer from serialized by Peer.PackPeer data: PeerID[20by]Port[2by]net.IP[4/16by] -func UnpackPeer(data string) (bittorrent.Peer, error) { - var peer bittorrent.Peer +func UnpackPeer(data string) (peer bittorrent.Peer, err error) { if len(data) < peerMinimumLen { - return peer, errInvalidPeerDataSize + err = errInvalidPeerDataSize + return } b := str2bytes.StringToBytes(data) - peerID, err := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen]) - if err == nil { - if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk { - peer = bittorrent.Peer{ - ID: peerID, - AddrPort: netip.AddrPortFrom( - addr.Unmap(), - binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]), - ), - } - } else { - err = bittorrent.ErrInvalidIP + peerID, _ := bittorrent.NewPeerID(b[:bittorrent.PeerIDLen]) + if addr, isOk := netip.AddrFromSlice(b[bittorrent.PeerIDLen+2:]); isOk { + peer = bittorrent.Peer{ + ID: peerID, + AddrPort: netip.AddrPortFrom( + addr.Unmap(), + binary.BigEndian.Uint16(b[bittorrent.PeerIDLen:bittorrent.PeerIDLen+2]), + ), } + } else { + err = bittorrent.ErrInvalidIP } - return peer, err + return } func (ps *Connection) parsePeersList(peersResult *redis.StringSliceCmd) (peers []bittorrent.Peer, err error) { diff --git a/storage/redis/storage_test.go b/storage/redis/storage_test.go index a8ea916..2be47bb 100644 --- a/storage/redis/storage_test.go +++ b/storage/redis/storage_test.go @@ -19,7 +19,7 @@ var cfg = Config{ func createNew() s.PeerStorage { var ps s.PeerStorage var err error - ps, err = newStore(cfg) + ps, err = NewStore(cfg) if err != nil { panic(fmt.Sprint("Unable to create Redis connection: ", err, "\nThis driver needs real Redis instance")) } diff --git a/storage/storage.go b/storage/storage.go index b38c4d7..a2d0279 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -82,9 +82,16 @@ type Entry struct { Value []byte } -// Driver is the function used to initialize a new PeerStorage +// Driver is the interface used to initialize a new DataStorage or PeerStorage // with provided configuration. -type Driver func(conf.MapConfig) (PeerStorage, error) +type Driver interface { + // NewDataStorage function prototype for creating new instance of data (KV) storage + // with provided configuration + NewDataStorage(cfg conf.MapConfig) (DataStorage, error) + // NewPeerStorage function prototype for creating new instance of peer storage + // with provided configuration + NewPeerStorage(cfg conf.MapConfig) (PeerStorage, error) +} // ErrResourceDoesNotExist is the error returned by all delete methods and the // AnnouncePeers method of the PeerStorage interface if the requested resource @@ -231,15 +238,31 @@ func RegisterDriver(name string, d Driver) { drivers[name] = d } -// NewStorage attempts to initialize a new PeerStorage instance from +// NewDataStorage attempts to initialize a new DataStorage instance from // the list of registered drivers. -func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) { +func NewDataStorage(cfg conf.NamedMapConfig) (DataStorage, error) { driversMU.RLock() defer driversMU.RUnlock() - logger.Debug().Object("config", cfg).Msg("staring storage") + logger.Debug().Object("config", cfg).Msg("starting data storage") - var b Driver - b, ok := drivers[cfg.Name] + var d Driver + d, ok := drivers[cfg.Name] + if !ok { + return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name) + } + + return d.NewPeerStorage(cfg.Config) +} + +// NewPeerStorage attempts to initialize a new PeerStorage instance from +// the list of registered drivers. +func NewPeerStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) { + driversMU.RLock() + defer driversMU.RUnlock() + logger.Debug().Object("config", cfg).Msg("starting peer storage") + + var d Driver + d, ok := drivers[cfg.Name] if !ok { return nil, fmt.Errorf("storage with name '%s' does not exists", cfg.Name) } @@ -249,7 +272,7 @@ func NewStorage(cfg conf.NamedMapConfig) (ps PeerStorage, err error) { return } - if ps, err = b(cfg.Config); err != nil { + if ps, err = d.NewPeerStorage(cfg.Config); err != nil { return }