From b2f798cf03ad10f0bb1ba0f1045f2ecf0b0ead69 Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Fri, 13 Apr 2018 15:47:57 -0400 Subject: [PATCH 1/3] eliminate dedicated RunSocketWriter goroutine --- irc/client.go | 1 - irc/socket.go | 97 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 58 insertions(+), 40 deletions(-) diff --git a/irc/client.go b/irc/client.go index d309102a..7e2cdde8 100644 --- a/irc/client.go +++ b/irc/client.go @@ -90,7 +90,6 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client { limits := server.Limits() fullLineLenLimit := limits.LineLen.Tags + limits.LineLen.Rest socket := NewSocket(conn, fullLineLenLimit*2, server.MaxSendQBytes()) - go socket.RunSocketWriter() client := &Client{ atime: now, authorized: server.Password() == nil, diff --git a/irc/socket.go b/irc/socket.go index 0f98a6f9..f4ac6917 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -31,23 +31,26 @@ type Socket struct { maxSendQBytes int - // coordination system for asynchronous writes - buffer []byte - lineToSendExists chan bool + // this is a trylock enforcing that only one goroutine can write to `conn` at a time + writerSlotOpen chan bool + buffer []byte closed bool sendQExceeded bool finalData string // what to send when we die + finalized bool } // NewSocket returns a new Socket. func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket { - return Socket{ - conn: conn, - reader: bufio.NewReaderSize(conn, maxReadQBytes), - maxSendQBytes: maxSendQBytes, - lineToSendExists: make(chan bool, 1), + result := Socket{ + conn: conn, + reader: bufio.NewReaderSize(conn, maxReadQBytes), + maxSendQBytes: maxSendQBytes, + writerSlotOpen: make(chan bool, 1), } + result.writerSlotOpen <- true + return result } // Close stops a Socket from being able to send/receive any more data. @@ -56,7 +59,7 @@ func (socket *Socket) Close() { socket.closed = true socket.Unlock() - socket.wakeWriter() + go socket.send() } // CertFP returns the fingerprint of the certificate provided by the client. @@ -114,7 +117,11 @@ func (socket *Socket) Read() (string, error) { return line, nil } -// Write sends the given string out of Socket. +// Write sends the given string out of Socket. Requirements: +// 1. MUST NOT block for macroscopic amounts of time +// 2. MUST NOT reorder messages +// 3. MUST provide mutual exclusion for socket.conn.Write +// 4. SHOULD NOT tie up additional goroutines, beyond the one blocked on socket.conn.Write func (socket *Socket) Write(data string) (err error) { socket.Lock() if socket.closed { @@ -127,19 +134,10 @@ func (socket *Socket) Write(data string) (err error) { } socket.Unlock() - socket.wakeWriter() + go socket.send() return } -// wakeWriter wakes up the goroutine that actually performs the write, without blocking -func (socket *Socket) wakeWriter() { - // nonblocking send to the channel, no-op if it's full - select { - case socket.lineToSendExists <- true: - default: - } -} - // SetFinalData sets the final data to send when the SocketWriter closes. func (socket *Socket) SetFinalData(data string) { socket.Lock() @@ -154,32 +152,53 @@ func (socket *Socket) IsClosed() bool { return socket.closed } -// RunSocketWriter starts writing messages to the outgoing socket. -func (socket *Socket) RunSocketWriter() { - localBuffer := make([]byte, 0) - shouldStop := false - for !shouldStop { - // wait for new lines +// is there data to write? +func (socket *Socket) readyToWrite() bool { + socket.Lock() + defer socket.Unlock() + // on the first time observing socket.closed, we still have to write socket.finalData + return !socket.finalized && (len(socket.buffer) > 0 || socket.closed || socket.sendQExceeded) +} + +// send actually writes messages to socket.Conn; it may block +func (socket *Socket) send() { + // one of these checks happens-after every call to Write(), so we can't miss writes + for socket.readyToWrite() { select { - case <-socket.lineToSendExists: - // retrieve the buffered data, clear the buffer - socket.Lock() - localBuffer = append(localBuffer, socket.buffer...) - socket.buffer = socket.buffer[:0] - socket.Unlock() - - _, err := socket.conn.Write(localBuffer) - localBuffer = localBuffer[:0] - - socket.Lock() - shouldStop = (err != nil) || socket.closed || socket.sendQExceeded - socket.Unlock() + case <-socket.writerSlotOpen: + // got the trylock: actually do the write + socket.performWrite() + socket.writerSlotOpen <- true + default: + // another goroutine is in progress; exit and wait for them to loop back around + // and observe readyToWrite() again + return } } +} + +// write the contents of the buffer, then see if we need to close +func (socket *Socket) performWrite() { + // retrieve the buffered data, clear the buffer + socket.Lock() + buffer := socket.buffer + socket.buffer = nil + socket.Unlock() + + _, err := socket.conn.Write(buffer) + + socket.Lock() + shouldClose := (err != nil) || socket.closed || socket.sendQExceeded + socket.Unlock() + + if !shouldClose { + return + } // mark the socket closed (if someone hasn't already), then write error lines socket.Lock() socket.closed = true + socket.finalized = true finalData := socket.finalData if socket.sendQExceeded { finalData = "\r\nERROR :SendQ Exceeded\r\n" From 4778e7bcc7024c7784eda17074ee9986dc0e8eaa Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Sun, 15 Apr 2018 01:21:32 -0400 Subject: [PATCH 2/3] fixes * Placate `go vet` * Reorder the `send` loop, clarify things a little --- irc/client.go | 2 +- irc/socket.go | 16 ++++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/irc/client.go b/irc/client.go index 7e2cdde8..3f4e2c8f 100644 --- a/irc/client.go +++ b/irc/client.go @@ -100,7 +100,7 @@ func NewClient(server *Server, conn net.Conn, isTLS bool) *Client { ctime: now, flags: make(map[modes.Mode]bool), server: server, - socket: &socket, + socket: socket, nick: "*", // * is used until actual nick is given nickCasefolded: "*", nickMaskString: "*", // * is used until actual nick is given diff --git a/irc/socket.go b/irc/socket.go index f4ac6917..c78c7d57 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -42,7 +42,7 @@ type Socket struct { } // NewSocket returns a new Socket. -func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket { +func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) *Socket { result := Socket{ conn: conn, reader: bufio.NewReaderSize(conn, maxReadQBytes), @@ -50,7 +50,7 @@ func NewSocket(conn net.Conn, maxReadQBytes int, maxSendQBytes int) Socket { writerSlotOpen: make(chan bool, 1), } result.writerSlotOpen <- true - return result + return &result } // Close stops a Socket from being able to send/receive any more data. @@ -162,16 +162,20 @@ func (socket *Socket) readyToWrite() bool { // send actually writes messages to socket.Conn; it may block func (socket *Socket) send() { - // one of these checks happens-after every call to Write(), so we can't miss writes - for socket.readyToWrite() { + for { select { case <-socket.writerSlotOpen: // got the trylock: actually do the write socket.performWrite() + // surrender the trylock: socket.writerSlotOpen <- true + // check if more data came in while we held the trylock: + if !socket.readyToWrite() { + return + } default: - // another goroutine is in progress; exit and wait for them to loop back around - // and observe readyToWrite() again + // someone else has the trylock; if there's more data to write, + // they'll see if after they release it return } } From f54561171e9d84a2afd57f56cc8c056deb8fae5a Mon Sep 17 00:00:00 2001 From: Shivaram Lingamneni Date: Sun, 15 Apr 2018 19:05:22 -0400 Subject: [PATCH 3/3] try to reduce redundant goroutines --- irc/socket.go | 38 ++++++++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/irc/socket.go b/irc/socket.go index c78c7d57..5b0d2e19 100644 --- a/irc/socket.go +++ b/irc/socket.go @@ -59,7 +59,7 @@ func (socket *Socket) Close() { socket.closed = true socket.Unlock() - go socket.send() + socket.wakeWriter() } // CertFP returns the fingerprint of the certificate provided by the client. @@ -134,10 +134,22 @@ func (socket *Socket) Write(data string) (err error) { } socket.Unlock() - go socket.send() + socket.wakeWriter() return } +// wakeWriter starts the goroutine that actually performs the write, without blocking +func (socket *Socket) wakeWriter() { + // attempt to acquire the trylock + select { + case <-socket.writerSlotOpen: + // acquired the trylock; send() will release it + go socket.send() + default: + // failed to acquire; the holder will check for more data after releasing it + } +} + // SetFinalData sets the final data to send when the SocketWriter closes. func (socket *Socket) SetFinalData(data string) { socket.Lock() @@ -163,19 +175,21 @@ func (socket *Socket) readyToWrite() bool { // send actually writes messages to socket.Conn; it may block func (socket *Socket) send() { for { + // we are holding the trylock: actually do the write + socket.performWrite() + // surrender the trylock, avoiding a race where a write comes in after we've + // checked readyToWrite() and it returned false, but while we still hold the trylock: + socket.writerSlotOpen <- true + // check if more data came in while we held the trylock: + if !socket.readyToWrite() { + return + } select { case <-socket.writerSlotOpen: - // got the trylock: actually do the write - socket.performWrite() - // surrender the trylock: - socket.writerSlotOpen <- true - // check if more data came in while we held the trylock: - if !socket.readyToWrite() { - return - } + // got the trylock, loop back around and write default: - // someone else has the trylock; if there's more data to write, - // they'll see if after they release it + // failed to acquire; exit and wait for the holder to observe readyToWrite() + // after releasing it return } }