> The following code leaks memory whenever I connect to poster(), which pushes
> memory to all clients of pusher().
> It doesn't collect garbage even when I connect to gc(), which runs
> runtime.GC().
>
> If I remove the case time.After(), no memory leaks.
> Anybody knows why?
Each time you leave a time.After even waiting to fire, there's
a small amount of heap allocated until it does fire.
That means you'll end up with 300 * events/second
items allocated in your serve loop, which could be
very large if you're accepting a high rate of connections.
I recommend using time.NewTimer and stopping the
timer when the timer does not time out.
Another problem your code has is that you're
concurrently sharing access to the servers map.
Here's an alternate phrasing of the functionality in
your code that is perhaps a little more idiomatic.
package main
import (
"flag"
"log"
"net"
"time"
)
func main() {
pusherPort := flag.String("p", "6666", "pusher port")
posterPort := flag.String("a", "6667", "poster port")
flag.Parse()
pusher, err := accepter(*pusherPort)
if err != nil {
log.Fatal(err)
}
poster, err := accepter(*posterPort)
if err != nil {
log.Fatal(err)
}
done := make(chan *server)
servers := make(map[*server]*server)
for {
select {
case conn, ok := <-pusher:
if !ok {
return
}
server := newServer(conn, done)
servers[server] = server
go server.serve()
case conn, ok := <-poster:
if !ok {
return
}
for _, server := range servers {
server.push("haha")
}
conn.Close()
case server := <-done:
delete(servers, server)
server.close()
}
}
}
func accepter(port string) (<-chan net.Conn, error) {
l, err := net.Listen("tcp", ":"+port)
if err != nil {
return nil, err
}
conns := make(chan net.Conn)
go func() {
defer l.Close()
defer close(conns)
for {
conn, err := l.Accept()
if err != nil {
log.Printf("accept %v: %q", port, err)
return
}
conns <- conn
}
}()
return conns, nil
}
type server struct {
conn net.Conn
queue chan string
done chan<- *server
}
func newServer(conn net.Conn, done chan<- *server) *server {
return &server{
conn: conn,
queue: make(chan string, 4),
done: done,
}
}
func (srv *server) serve() {
defer func() {
srv.done <- srv
}()
for {
t := time.NewTimer(5 * time.Minute)
select {
case s := <-srv.queue:
t.Stop()
_, err := srv.conn.Write([]byte(s))
if err != nil {
return
}
case <-time.After(5 * time.Minute):
return
}
}
}
func (this *server) push(s string) {
this.queue <- s
}
func (srv *server) close() error {
return srv.conn.Close()
}