Merge pull request #17 from fishcakeday/fix-connection-reuse

Fix connection reuse, by removing that code.
This commit is contained in:
believethehype
2023-04-25 22:53:45 +02:00
committed by GitHub
+28 -116
View File
@@ -5,9 +5,7 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"strings"
"sync"
@@ -34,21 +32,6 @@ var nip57Receipt nostr.Event
var zapEventSerializedStr string
var nip57ReceiptRelays []string
// Relay connections
type RelayConnection struct {
URL string
relay *nostr.Relay
lastUsed time.Time
closeChan chan bool
}
var relayConnections = make(map[string]*RelayConnection)
var relayConnectionsMutex sync.Mutex
var connectionTimeout = 30 * time.Minute
var ignoreRelayDuration = 5 * time.Minute
var ignoredRelays = make(map[string]time.Time)
var ignoredRelaysMutex sync.Mutex
func Nip57DescriptionHash(zapEventSerialized string) string {
hash := sha256.Sum256([]byte(zapEventSerialized))
hashString := hex.EncodeToString(hash[:])
@@ -212,90 +195,6 @@ func GetNostrProfileMetaData(npub string) (nostr.ProfileMetadata, error) {
}
func ignoreRelay(url string) {
ignoredRelaysMutex.Lock()
defer ignoredRelaysMutex.Unlock()
ignoredRelays[url] = time.Now()
}
func isRelayIgnored(url string) bool {
ignoredRelaysMutex.Lock()
defer ignoredRelaysMutex.Unlock()
if t, ok := ignoredRelays[url]; ok {
if time.Since(t) < ignoreRelayDuration {
return true
}
delete(ignoredRelays, url)
}
return false
}
func isBrokenPipeError(err error) bool {
var netErr net.Error
if errors.As(err, &netErr) {
if strings.Contains(netErr.Error(), "write: broken pipe") {
return true
}
}
return false
}
func getRelayConnection(url string) (*nostr.Relay, error) {
if isRelayIgnored(url) {
return nil, fmt.Errorf("relay %s is being ignored", url)
}
relayConnectionsMutex.Lock()
defer relayConnectionsMutex.Unlock()
if relayConn, ok := relayConnections[url]; ok {
relayConn.lastUsed = time.Now()
return relayConn.relay, nil
}
ctx := context.WithValue(context.Background(), "url", url)
relay, err := nostr.RelayConnect(ctx, url)
if err != nil {
ignoreRelay(url)
return nil, err
}
relayConn := &RelayConnection{
URL: url,
relay: relay,
lastUsed: time.Now(),
closeChan: make(chan bool),
}
relayConnections[url] = relayConn
go func() {
select {
case <-time.After(connectionTimeout):
relayConnectionsMutex.Lock()
if time.Since(relayConn.lastUsed) >= connectionTimeout {
relay.Close()
delete(relayConnections, url)
}
relayConnectionsMutex.Unlock()
case <-relayConn.closeChan:
}
}()
return relay, nil
}
func closeRelayConnection(url string) {
relayConnectionsMutex.Lock()
defer relayConnectionsMutex.Unlock()
if relayConn, ok := relayConnections[url]; ok {
relayConn.closeChan <- true
relayConn.relay.Close()
delete(relayConnections, url)
}
}
func publishNostrEvent(ev nostr.Event, relays []string) {
// Add more relays, remove trailing slashes, and ensure unique relays
relays = uniqueSlice(cleanUrls(append(relays, Relays...)))
@@ -305,40 +204,53 @@ func publishNostrEvent(ev nostr.Event, relays []string) {
var wg sync.WaitGroup
wg.Add(len(relays))
// Create a buffered channel to control the number of active goroutines
concurrencyLimit := 20
goroutines := make(chan struct{}, concurrencyLimit)
// Publish the event to relays
for _, url := range relays {
goroutines <- struct{}{}
go func(url string) {
defer wg.Done()
defer func() {
<-goroutines
wg.Done()
}()
var err error
var relay *nostr.Relay
var conn *nostr.Relay
var status nostr.Status
maxRetries := 3
retryDelay := 1 * time.Second
for i := 0; i < maxRetries; i++ {
relay, err = getRelayConnection(url)
// Set a timeout for connecting to the relay
connCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
conn, err = nostr.RelayConnect(connCtx, url)
cancel()
if err != nil {
log.Printf("Error connecting to relay %s: %v", url, err)
return
time.Sleep(retryDelay)
retryDelay *= 2
continue
}
defer conn.Close()
time.Sleep(3 * time.Second)
// Set a timeout for publishing to the relay
pubCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
status, err = conn.Publish(pubCtx, ev)
cancel()
ctx := context.WithValue(context.Background(), "url", url)
status, err = relay.Publish(ctx, ev)
if err != nil {
log.Printf("Error publishing to relay %s: %v", url, err)
if isBrokenPipeError(err) {
closeRelayConnection(url) // Close the broken connection
continue // Retry connection and publish
}
time.Sleep(retryDelay)
retryDelay *= 2
continue
} else {
log.Printf("[NOSTR] published to %s: %s", url, status.String()) // Convert the nostr.Status value to a string
log.Printf("[NOSTR] published to %s: %s", url, status.String())
break
}
time.Sleep(3 * time.Second)
}
}(url)
}