I have a go project where I'm using pull replication and it's syncing fine, but when I register Observer to get changed documents, it seems to deadlock with replication. What I'm doing is
cCallbackPtr := (C.C4DatabaseObserverCallback)(unsafe.Pointer(C.onDatabaseChanged_cgo))
cObserverPtr := C.c4dbobs_create(db, cCallbackPtr, nil)
observerPtr := &DatabaseObserver{cblObserverPtr: cObserverPtr}
where onDatabaseChanged_cgo is a proxy to:
//export onDatabaseChanged
func onDatabaseChanged(cObserverPtr *C.C4DatabaseObserver, cCallbackcontextPtr unsafe.Pointer) {
observerPtr, ok := observerLookup[unsafe.Pointer(cObserverPtr)]
if !ok {
log.Info("external observer, skipping")
return
}
if observerPtr.db.IsInTransaction() {
log.Info("db is in transaction, unable to process changes")
return
}
log.Info("processing onDatabaseChanged event")
lock := observerPtr.db.lock
lock.Lock()
defer lock.Unlock()
log.Info("onDatabaseChanged acquired db lock")
maxChanges := 1
docsBuffer := 1000
external := false
changes := make([]C.C4DatabaseChange, maxChanges)
docIDs := make([]string, docsBuffer)
for ok := true; ok; {
var newExternal C.bool
log.Info("getting changes")
nChanges := uint32(C.c4dbobs_getChanges(cObserverPtr, &changes[0], C.uint32_t(maxChanges), &newExternal))
log.Info(fmt.Sprintf("%v changes received", nChanges))
...
}
c4dbobs_getChanges never returns and replicator hangs in busy state
here is the tail of the log
15:47:18.444370| [Sync]: {Pull#3} Received 'changes' REQ#2 (0 pending revs)
15:47:18.445723| [Sync]: {Pull#3} Handling 'changes' message REQ#2
15:47:18.446267| [Sync]: {Pull#3} now busy
15:47:18.446846| [Sync]: {DBWorker#2} Received 1 changes (seq '30093'..'30093')
15:47:18.447493| [Sync]: {Repl#1} now busy
15:47:18.448790| I0525 15:47:18.448948 12485 log.go:47] [Handling replication state change to 'Busy'. Units total: 0, units completed: 0, documents count: 0]
[Sync]: {DBWorker#2} Responding w/request for 1 revs
15:47:18.450050| [Sync]: {Pull#3} Now waiting for 1 'rev' messages; 1 known sequences pending
15:47:18.451818| [Sync]: {Pull#3} progress +0/+1, 0 docs -- now 0 / 1, 0 docs
15:47:18.465930| [Sync]: {Repl#1} progress +0/+1, 0 docs -- now 0 / 1, 0 docs
15:47:18.467848| [Sync]: {IncomingRev#4}==> litecore::repl::IncomingRev ->ws:localhost:4984/test/_blipsync
15:47:18.468681| [Sync]: {IncomingRev#4} Received revision 'smth.json_2231' #93-001ab230d77aebf4195661a535aa0a94 (seq '30093')
15:47:18.522840| [Sync]: {DBWorker#2} Inserting 1 revs:
15:47:18.524495| [DB]: DataFile: begin transaction
15:47:18.526073| [Sync]: {DBWorker#2} {'smth.json_2231' #93-001ab230d77aebf4195661a535aa0a94}
15:47:18.527063| [DB]: KeyStore(default) update smth.json_2231
15:47:18.530792| [DB]: DataFile: commit transaction
I0525 15:47:18.532023 12485 log.go:47] [processing onDatabaseChanged event]
I0525 15:47:18.649747 12485 log.go:47] [Handling replication state change to 'Busy'. Units total: 1, units completed: 0, documents count: 0]
Looking at the log, DBWorker commits the transaction, but doesn't report "Inserted %zu revs in %.2fms (%.0f/sec)" (I can see the record with observer commented out), so it locks on rev->onInserted(transactionErr);
I tried to peek in C# binding to see implementation of DbObserverCallback but couldn't spot any synchronization mechanism other than db mutex, but it's not set by replicator I think. What am I missing?
Thanks.
Dmitry