diff --git a/nostr.go b/nostr.go index 0ce442b..8c25292 100644 --- a/nostr.go +++ b/nostr.go @@ -5,9 +5,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" - "errors" "fmt" - "net" "net/http" "strings" "sync" @@ -34,21 +32,6 @@ var nip57Receipt nostr.Event var zapEventSerializedStr string var nip57ReceiptRelays []string -// Relay connections -type RelayConnection struct { - URL string - relay *nostr.Relay - lastUsed time.Time - closeChan chan bool -} - -var relayConnections = make(map[string]*RelayConnection) -var relayConnectionsMutex sync.Mutex -var connectionTimeout = 30 * time.Minute -var ignoreRelayDuration = 5 * time.Minute -var ignoredRelays = make(map[string]time.Time) -var ignoredRelaysMutex sync.Mutex - func Nip57DescriptionHash(zapEventSerialized string) string { hash := sha256.Sum256([]byte(zapEventSerialized)) hashString := hex.EncodeToString(hash[:]) @@ -212,90 +195,6 @@ func GetNostrProfileMetaData(npub string) (nostr.ProfileMetadata, error) { } -func ignoreRelay(url string) { - ignoredRelaysMutex.Lock() - defer ignoredRelaysMutex.Unlock() - ignoredRelays[url] = time.Now() -} - -func isRelayIgnored(url string) bool { - ignoredRelaysMutex.Lock() - defer ignoredRelaysMutex.Unlock() - - if t, ok := ignoredRelays[url]; ok { - if time.Since(t) < ignoreRelayDuration { - return true - } - delete(ignoredRelays, url) - } - return false -} - -func isBrokenPipeError(err error) bool { - var netErr net.Error - if errors.As(err, &netErr) { - if strings.Contains(netErr.Error(), "write: broken pipe") { - return true - } - } - return false -} - -func getRelayConnection(url string) (*nostr.Relay, error) { - if isRelayIgnored(url) { - return nil, fmt.Errorf("relay %s is being ignored", url) - } - - relayConnectionsMutex.Lock() - defer relayConnectionsMutex.Unlock() - - if relayConn, ok := relayConnections[url]; ok { - relayConn.lastUsed = time.Now() - return relayConn.relay, nil - } - - ctx := context.WithValue(context.Background(), "url", url) - relay, err := nostr.RelayConnect(ctx, url) - if err != nil { - ignoreRelay(url) - return nil, err - } - - relayConn := &RelayConnection{ - URL: url, - relay: relay, - lastUsed: time.Now(), - closeChan: make(chan bool), - } - relayConnections[url] = relayConn - - go func() { - select { - case <-time.After(connectionTimeout): - relayConnectionsMutex.Lock() - if time.Since(relayConn.lastUsed) >= connectionTimeout { - relay.Close() - delete(relayConnections, url) - } - relayConnectionsMutex.Unlock() - case <-relayConn.closeChan: - } - }() - - return relay, nil -} - -func closeRelayConnection(url string) { - relayConnectionsMutex.Lock() - defer relayConnectionsMutex.Unlock() - - if relayConn, ok := relayConnections[url]; ok { - relayConn.closeChan <- true - relayConn.relay.Close() - delete(relayConnections, url) - } -} - func publishNostrEvent(ev nostr.Event, relays []string) { // Add more relays, remove trailing slashes, and ensure unique relays relays = uniqueSlice(cleanUrls(append(relays, Relays...))) @@ -305,40 +204,53 @@ func publishNostrEvent(ev nostr.Event, relays []string) { var wg sync.WaitGroup wg.Add(len(relays)) + // Create a buffered channel to control the number of active goroutines + concurrencyLimit := 20 + goroutines := make(chan struct{}, concurrencyLimit) + // Publish the event to relays for _, url := range relays { + goroutines <- struct{}{} go func(url string) { - defer wg.Done() + defer func() { + <-goroutines + wg.Done() + }() var err error - var relay *nostr.Relay + var conn *nostr.Relay var status nostr.Status maxRetries := 3 + retryDelay := 1 * time.Second for i := 0; i < maxRetries; i++ { - relay, err = getRelayConnection(url) + // Set a timeout for connecting to the relay + connCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + conn, err = nostr.RelayConnect(connCtx, url) + cancel() + if err != nil { log.Printf("Error connecting to relay %s: %v", url, err) - return + time.Sleep(retryDelay) + retryDelay *= 2 + continue } + defer conn.Close() - time.Sleep(3 * time.Second) + // Set a timeout for publishing to the relay + pubCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + status, err = conn.Publish(pubCtx, ev) + cancel() - ctx := context.WithValue(context.Background(), "url", url) - status, err = relay.Publish(ctx, ev) if err != nil { log.Printf("Error publishing to relay %s: %v", url, err) - - if isBrokenPipeError(err) { - closeRelayConnection(url) // Close the broken connection - continue // Retry connection and publish - } + time.Sleep(retryDelay) + retryDelay *= 2 + continue } else { - log.Printf("[NOSTR] published to %s: %s", url, status.String()) // Convert the nostr.Status value to a string + log.Printf("[NOSTR] published to %s: %s", url, status.String()) break } - - time.Sleep(3 * time.Second) } }(url) }