I find a problem but ,I'm not sure if it is a bug
when i insert a document some times it will return closed expicitly bug, actually it run success~
when below code return an error removeServer
func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
socket, err = server.Connect(timeout)
}
below code will excute remove user
func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) {
s, abended, err := server.AcquireSocket(poolLimit, socketTimeout)
if err == errPoolLimit {
println("err:" + errPoolLimit.Error())
if !warnedLimit {
warnedLimit = true
log("WARNING: Per-server connection limit reached.")
}
time.Sleep(100 * time.Millisecond)
continue
}
if err != nil {
println("AcquireSocket remove server:err:" + err.Error())
cluster.removeServer(server)
cluster.syncServers()
continue
}
}
and
func (cluster *mongoCluster) removeServer(server *mongoServer) {
cluster.Lock()
cluster.masters.Remove(server)
other := cluster.servers.Remove(server)
cluster.Unlock()
if other != nil {
println("other close()")
other.Close()
log("Removed server ", server.Addr, " from cluster.")
}
println("server close()")
server.Close()
}
and
func (cluster *mongoCluster) removeServer(server *mongoServer) {
cluster.Lock()
cluster.masters.Remove(server)
other := cluster.servers.Remove(server)
cluster.Unlock()
if other != nil {
println("other close()")
other.Close()
log("Removed server ", server.Addr, " from cluster.")
}
println("server close()")
server.Close()
}
and
func (socket *mongoSocket) kill(err error, abend bool) {
socket.Lock()
if socket.dead != nil {
debugf("Socket %p to %s: killed again: %s (previously: %s)", socket, socket.addr, err.Error(), socket.dead.Error())
socket.Unlock()
return
}
logf("Socket %p to %s: closing: %s (abend=%v)", socket, socket.addr, err.Error(), abend)
socket.dead = err
socket.conn.Close()
stats.socketsAlive(-1)
replyFuncs := socket.replyFuncs
socket.replyFuncs = make(map[uint32]replyFunc)
server := socket.server
socket.server = nil
socket.gotNonce.Broadcast()
socket.Unlock()
for _, replyFunc := range replyFuncs {
logf("Socket %p to %s: notifying replyFunc of closed socket: %s", socket, socket.addr, err.Error())
replyFunc(err, nil, -1, nil)
}
if abend {
server.AbendSocket(socket)
}
}
So below code will casue my insert return error closed explicitly
func (socket *mongoSocket) SimpleQuery(op *queryOp) (data []byte, err error) {
var wait, change sync.Mutex
var replyDone bool
var replyData []byte
var replyErr error
wait.Lock()
op.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
change.Lock()
if !replyDone {
replyDone = true
replyErr = err
if err == nil {
replyData = docData
}
if err != nil {
println("in replyFunc err:" + err.Error())
}
}
change.Unlock()
wait.Unlock()
}
err = socket.Query(op)
if err != nil {
println("after replyFunc err:" + err.Error())
}
if err != nil {
return nil, err
}
wait.Lock()
change.Lock()
data = replyData
err = replyErr
change.Unlock()
return data, err
}
some times println("after replyFunc err:" + err.Error()) some times excute some not,some times excute err=replyErr,
Begin I think it's a bug, after i see mgo code,i found may be not, but i don't think it's a good design,why kill all livesockets, this will case my sucess operate return an error,worse mgo didn't have this closed explicitly error so I'cant treat this error as a special one
my test code
package main
import (
"fmt"
"sync"
)
var session *mgo.Session
var err error
type Data struct {
A int
B int
}
func main() {
session, err = mgo.Dial("mongodb://localhost:27017/db1")
if err != nil {
panic("dial mongodb://localhost:27017/db1 fail")
}
Begin(250, 1000000)
}
func Begin(goroutine, count int) {
var wg sync.WaitGroup
wg.Add(goroutine)
for i := 0; i < goroutine; i++ {
go Add(i, count, &wg)
}
wg.Wait()
}
func Add(flag, count int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < count; i++ {
a := Data{A: flag, B: i}
c := session.Copy().DB("db1").C("coll1")
err = c.Insert(a)
if err != nil {
fmt.Printf("%d,%d, err:%v\r\n", flag, i, err)
panic("")
}
c.Database.Session.Close()
}
}
when i test in my production, I really find only one goroutine return this error too, but now i can't recurrent it, can you tell me one goroutine really return closed explicityl? I don't know why one goroutine will return this error
thanks