Actors behaving unexpectedly with Circuit breaker

374 views
Skip to first unread message

Jasper

unread,
Aug 6, 2014, 6:25:28 AM8/6/14
to akka...@googlegroups.com
Hi,

I have a problem in my current project and I cannot pinpoint the exact cause. I implemented an ETL and what I have is a controller actor dispatching files to process to a router (SmallestMailboxPool with one file being one message), which dispatch them to file processors. In the end, the processors notifies back the controller of the outcome. My datasource (HikariCP) is held within an actor attached to the guardian so it restarts when necessary.
My controller and processors actors are attached to a pinned dispatcher because they use JDBC to insert files data into a DB. What I'm trying to do is handle failures of the database for the file processors. I used a Circuit Breaker like so :

File processors (simplified and without logging) :

    override def preStart(): Unit = {
      breaker
= new CircuitBreaker(context.system.scheduler,
        maxFailures
= 1,
        callTimeout
= 30 seconds,
        resetTimeout
= 10 seconds)(context.dispatcher)
   
}

   
[...]

   
case Forwarded(file) =>
     
try{
        breaker
.withSyncCircuitBreaker {
            val file
= doParsingAndValidation()
            file match
{
             
case Success(parsedFile) =>
                 
using((connectionFactory ? GetConnection).mapTo[Connection].waitForResult) { conn =>
                      conn
.setAutoCommit(false)

                      doJDBCStuff
()                    

                      conn
.commit()
                      controller
! Processed(parsedFile)
                   
}
             
case Failure(err) =>
               controller
! Malformed(file, err)
           
}
       
}
     
} catch {
       
case ex: Exception => context.system.scheduler.scheduleOnce(5 seconds, self, Forwarded(file))(context.dispatcher)
     
}


So basically when the database is down the breaker is gonna switch to Open at first exception (therefore resending message to self). After 10 seconds it's gonna be in HalfOpen state and allow only one file through (while all others will trigger the CircuitBreakerOpenException and get resent). If the allowed one works it's all good, otherwise the breaker is restored to Open state again.
Anyway, the logic works in itself : When I stop the database it loops through messages every 5 seconds to fail each time and resends them. My real problem is that some files aren't processed !

I looked at my logs and ran YourKit to find out that processors process files normally until a crash occurs, at which point only some of them (seems random) switch into Open state. Eventually these ones switch to Closed and resume their processing but the ones that didn't go into Open state just keep waiting ! What I get from this is that some actors aren't crashing (because they never go into Open state and I don't have any logs for them) and are keeping files in their mailbox without doing anything. Here's a sample logs file (here only one processors switched state)

So I guess my logic is flawed and I missed something ? Thanks for any help.

Message has been deleted

Jasper

unread,
Aug 6, 2014, 10:04:13 AM8/6/14
to akka...@googlegroups.com
Just noticed something interesting with YourKit... Here's an example of an actor behaving properly (when all files are processed) :

Sys-akka.actor.pinned-dispatcher-7 [RUNNABLE, IN_NATIVE]
java
.net.SocketInputStream.read(byte[], int, int)SocketInputStream.java:122
org
.postgresql.core.VisibleBufferedInputStream.readMore(int)VisibleBufferedInputStream.java:143
org
.postgresql.core.VisibleBufferedInputStream.ensureBytes(int)VisibleBufferedInputStream.java:112
org
.postgresql.core.VisibleBufferedInputStream.read()VisibleBufferedInputStream.java:71
org
.postgresql.core.PGStream.ReceiveChar()PGStream.java:269
org
.postgresql.core.v3.QueryExecutorImpl.processCopyResults(CopyOperationImpl, boolean)QueryExecutorImpl.java:930
org
.postgresql.core.v3.QueryExecutorImpl.endCopy(CopyInImpl)QueryExecutorImpl.java:828
org
.postgresql.core.v3.CopyInImpl.endCopy()CopyInImpl.java:59
org
.postgresql.copy.CopyManager.copyIn(String, Reader, int)CopyManager.java:145
org
.postgresql.copy.CopyManager.copyIn(String, Reader)CopyManager.java:124
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8.apply(Seq)FileProcessor.scala:98
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7$$anonfun$apply$8.apply(Object)FileProcessor.scala:84
scala
.collection.Iterator$class.foreach(Iterator, Function1)Iterator.scala:743
scala
.collection.AbstractIterator.foreach(Function1)Iterator.scala:1174
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7.apply(PushbackReader)FileProcessor.scala:84
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1$$anonfun$apply$7.apply(Object)FileProcessor.scala:82
xx
.xxxx.util.Cleaning$.using(Object, Function1)Cleaning.scala:12
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1.apply(Connection)FileProcessor.scala:82
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1$$anonfun$apply$mcV$sp$1.apply(Object)FileProcessor.scala:75
xx
.xxxx.util.Cleaning$.using(Object, Function1)Cleaning.scala:12
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp()FileProcessor.scala:75
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply()FileProcessor.scala:56
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply()FileProcessor.scala:56
akka
.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()CircuitBreaker.scala:135
akka
.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()CircuitBreaker.scala:135
akka
.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, Function0)CircuitBreaker.scala:296
akka
.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.scala:345
akka
.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala:354
akka
.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.scala:113
akka
.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)CircuitBreaker.scala:135
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, Function1)FileProcessor.scala:55
akka
.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor.scala:465
xx
.xxxx.actors.FileProcessor.aroundReceive(PartialFunction, Object)FileProcessor.scala:27
akka
.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516
akka
.actor.ActorCell.invoke(Envelope)ActorCell.scala:487
akka
.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238
akka
.dispatch.Mailbox.run()Mailbox.scala:220
java
.lang.Thread.run()Thread.java:744

Now one who does not :

Sys-akka.actor.pinned-dispatcher-6 [WAITING]
java
.lang.Object.wait()Object.java:503
org
.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.java:91
org
.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int)QueryExecutorImpl.java:228
org
.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)AbstractJdbc2Connection.java:808
org
.postgresql.jdbc2.AbstractJdbc2Connection.rollback()AbstractJdbc2Connection.java:861
com
.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState()ConnectionProxy.java:192
com
.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305
java
.lang.reflect.Method.invoke(Object, Object[])Method.java:606
xx
.xxxx.util.Cleaning$.using(Object, Function1)Cleaning.scala:14
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp()FileProcessor.scala:75
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply()FileProcessor.scala:56
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply()FileProcessor.scala:56
akka
.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()CircuitBreaker.scala:135
akka
.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()CircuitBreaker.scala:135
akka
.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, Function0)CircuitBreaker.scala:296
akka
.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.scala:345
akka
.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala:354
akka
.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.scala:113
akka
.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)CircuitBreaker.scala:135
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, Function1)FileProcessor.scala:55
akka
.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor.scala:465
xx
.xxxx.actors.FileProcessor.aroundReceive(PartialFunction, Object)FileProcessor.scala:27
akka
.actor.ActorCell.receiveMessage(Object)ActorCell.scala:516
akka
.actor.ActorCell.invoke(Envelope)ActorCell.scala:487
akka
.dispatch.Mailbox.processMailbox(int, long)Mailbox.scala:238
akka
.dispatch.Mailbox.run()Mailbox.scala:220
java
.lang.Thread.run()Thread.java:744


Apparently it's blocking because of a lock on the database while closing the connection (closeable.close() is at line 14). Which is weird because I know their connections aren't closed thanks to Hikari log stats.


 
def using[C <: {def close()} , B](closeable: C)(getB: C => B): B =
   
try {
      getB
(closeable)
   
} finally {
      closeable
.close()
   
}

Konrad Malawski

unread,
Aug 6, 2014, 11:13:24 AM8/6/14
to Akka User List
Hi Jasper,
as far as I can see this is not really a problem with akka, but with your threads getting stuck on that blocking `close` call?
The circuit breaker (well, in general the JVM and threads) can't "kill" such operation, and you'll get stuck when using such blocking calls.

Not sure how we can help with this specific problem here.


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,
Konrad 'ktoso' Malawski
hAkker @ Typesafe


Måns Schultz

unread,
Aug 6, 2014, 5:02:00 PM8/6/14
to akka...@googlegroups.com
Hi Jasper

I faced a similar problem just before my vacation (Naughty database that might block in circuit breakers). I'm just an akka newbie but for what's it worth here's what I did. I found that using withSyncCircuitBreaker in these cases was a bad idea. I solved my problem by making the circuit breaker return a future with a timeout instead and then piped that future further down the line. That along with a transaction timeout in the driver solved my issues. In your case I see that you want the actor to block but could you not simply await the future (since it has a timeout it won't block forever) and if you get a timeout send back Malformed. 

BR

Måns

Brett Wooldridge

unread,
Aug 7, 2014, 1:27:04 AM8/7/14
to akka...@googlegroups.com
It appears that you are using the PostgreSQL CopyManager, correct?  Looking at QueryExecutorImpl it appears that rollback() is trying to obtain a lock that was not released by the CopyManager.  I recommend using the CopyManager.copyIn() method that returns a CopyIn object, rather than using the convenience method that takes a reader.  Use the writeToCopy() to pump the data in, and be sure to catch SQLException.  If you get an SQLException, call cancelCopy() and retry or whatever your recovery scenario is, otherwise call endCopy().  I would have expected PostgreSQL to handle the severing of a Connection in the middle of a bulk copy better, but that is probably a question for the PostgreSQL group.

Just my armchair diagnosis.

On Wednesday, August 6, 2014 11:04:13 PM UTC+9, Jasper wrote:
Sys-akka.actor.pinned-dispatcher-6 [WAITING]
java
.lang.Object.wait()Object.java:503
org
.postgresql.core.v3.QueryExecutorImpl.waitOnLock()QueryExecutorImpl.java:91
org
.postgresql.core.v3.QueryExecutorImpl.execute(Query, ParameterList, ResultHandler, int, int, int)QueryExecutorImpl.java:228
org
.postgresql.jdbc2.AbstractJdbc2Connection.executeTransactionCommand(Query)AbstractJdbc2Connection.java:808
org
.postgresql.jdbc2.AbstractJdbc2Connection.rollback()AbstractJdbc2Connection.java:861
com
.zaxxer.hikari.proxy.ConnectionProxy.resetConnectionState()ConnectionProxy.java:192
com
.zaxxer.hikari.proxy.ConnectionProxy.close()ConnectionProxy.java:305
java
.lang.reflect.Method.invoke(Object, Object[])Method.java:606
xx
.xxxx.util.Cleaning$.using(Object, Function1)Cleaning.scala:14
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply$mcV$sp()FileProcessor.scala:75
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply()FileProcessor.scala:56
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1$$anonfun$applyOrElse$1.apply()FileProcessor.scala:56
akka
.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()CircuitBreaker.scala:135
akka
.pattern.CircuitBreaker$$anonfun$withSyncCircuitBreaker$1.apply()CircuitBreaker.scala:135
akka
.pattern.CircuitBreaker$State$class.callThrough(CircuitBreaker$State, Function0)CircuitBreaker.scala:296
akka
.pattern.CircuitBreaker$Closed$.callThrough(Function0)CircuitBreaker.scala:345
akka
.pattern.CircuitBreaker$Closed$.invoke(Function0)CircuitBreaker.scala:354
akka
.pattern.CircuitBreaker.withCircuitBreaker(Function0)CircuitBreaker.scala:113
akka
.pattern.CircuitBreaker.withSyncCircuitBreaker(Function0)CircuitBreaker.scala:135
xx
.xxxx.actors.FileProcessor$$anonfun$receive$1.applyOrElse(Object, Function1)FileProcessor.scala:55

akka
.actor.Actor$class.aroundReceive(Actor, PartialFunction, Object)Actor.scala:</spa
...

Jasper

unread,
Aug 7, 2014, 5:10:30 AM8/7/14
to akka...@googlegroups.com
I tried it and this looks very promising as since all processors now go into Open state. However without the reader I'm deep into encoding hell because my files are in us-ascii and my db in UTF-8 :

invalid byte sequence for encoding "UTF8": 0x00
And I can't just sanitize the files beforehand... Anyway I'm aware it's not really the place for this so unless anyone have the solution, thanks for your help !

√iktor Ҡlang

unread,
Aug 7, 2014, 5:11:59 AM8/7/14
to Akka User List
:( encoding hell


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Brett Wooldridge

unread,
Aug 7, 2014, 6:02:48 AM8/7/14
to akka...@googlegroups.com
That error indicates that one of your "us-ascii" files has a NUL byte (0x00) in it somewhere, this is never valid in UTF-8.  You have two options.  Figure out why there is a NUL character the file, or at least sanitizing the bytes on-the-fly as you read chunks from the file.  Either redacting the NUL bytes completely, or replacing them with, for example, a space (0x20).

Jasper

unread,
Aug 7, 2014, 10:02:21 AM8/7/14
to akka...@googlegroups.com
Alright I fixed it, it was stupid. But actually it didn't solve anything... Here's how I did it :

doJDBCStuff() :

val cpManager = conn.unwrap(classOf[PGConnection]).getCopyAPI
val stringBytes
: Array[Byte] = batchStrings.toString().map(_.toByte).toArray
val copy
= cpManager.copyIn(s"COPY tableName FROM STDIN WITH CSV")
try {
  copy
.writeToCopy(stringBytes, 0, stringBytes.length)
  copy
.endCopy()
} finally {
 
if(copy.isActive){
    copy
.cancelCopy()
 
}
}
conn
.commit()

The potential exception should be caught higher in the first snippet I showed (to resend the message). By the way, I noticed that If I turn on autoCommit, absolutely all processors are blocking...

Jasper

unread,
Aug 11, 2014, 9:44:04 AM8/11/14
to akka...@googlegroups.com
Spent some time today on this... Actually when I turn on autocommit and crash the DB, processors do block but they ALL go into Open state, which is good. But for some reason my actor holding the datasource isn't responding anymore and I end up with this when asking for a connection :

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://catalinaSys/user/connectionFactory#-1136872742]] after [10000 ms]

It isn't blowing up either because I don't have my hook log on restart (supervisor is the guardian here). What bugs me is HikariCP log :

After cleanup pool stats HikariPool-0 (total=19, inUse=1, avail=18, waiting=0)

So there are connections available but I get timeouts instead :( Here's how I do it :

case GetConnection => sender() ! datasource.map(ds => ds.getConnection).get

datasource is an Option[HikariDataSource], and if isn't there then it should just blow up and restart.

√iktor Ҡlang

unread,
Aug 11, 2014, 9:47:58 AM8/11/14
to Akka User List
Unrelated but why the complication of


sender() ! datasource.map(ds => ds.getConnection).get

iso

sender() ! datasource.get.getConnection


--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--
Cheers,

Jasper

unread,
Aug 11, 2014, 9:51:54 AM8/11/14
to akka...@googlegroups.com
Quick and dirty modification but indeed, that was dumb.

√iktor Ҡlang

unread,
Aug 11, 2014, 9:55:19 AM8/11/14
to Akka User List
Hi Jasper,

It wasn't dumb at all.
So are you sure that it doesn't blow up, because that would mean that the sender() of GetConnection never gets his/her connection. Right?

Jasper

unread,
Aug 11, 2014, 10:22:54 AM8/11/14
to akka...@googlegroups.com
Well, given what I said before, pretty sure yes. And yes the sender() doesn't receive anything so it times out and the circuit breaker keep on doing the Open/HalfOpen circle. Here are some sample logs

This is annoying, I don't know if I'm doing something wrong with Akka, Hikari or PostgreSQL.

√iktor Ҡlang

unread,
Aug 11, 2014, 10:26:49 AM8/11/14
to Akka User List
Did you enable lifecycle logging?

Leonard Meyer

unread,
Aug 11, 2014, 10:42:34 AM8/11/14
to akka...@googlegroups.com
Yes, all I have is at the beginning where it's basically saying who's supervising who. And I have :

[DEBUG] [08/11/2014 16:36:41.977] [akka.actor.default-dispatcher-2] [akka://xxxx/user] now supervising Actor[akka://xxxxx/user/connectionFactory#608280561]

connectionFactory is the name of my datasource actor.


You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/Ehzioy3jVoU/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.

Patrik Nordwall

unread,
Aug 11, 2014, 11:34:32 AM8/11/14
to akka...@googlegroups.com
What dispatcher do you use for what?
You mentioned pinned dispatcher, but you also have context.dispatcher for the circuit breaker. Pinned dispatcher there is probably not a good idea since the actor is blocking (the thread is busy).
You also have futures, e.g. mapTo.

/Patrik

Patrik Nordwall
Typesafe Reactive apps on the JVM
Twitter: @patriknw

Leonard Meyer

unread,
Aug 11, 2014, 11:56:42 AM8/11/14
to akka...@googlegroups.com
Anything blocking I put on a pinned dispatcher. So each FileProcessor and the Controller (which sometimes does JDBC).  Each FileProcessor also has a CircuitBreaker which uses the same pinned dispatcher as said FileProcessor.

Is there something wrong with this ?

Patrik Nordwall

unread,
Aug 11, 2014, 12:41:05 PM8/11/14
to akka...@googlegroups.com
Does it change anything if use another (default) dispatcher for the circuit breaker and the futures?

/Patrik

Jasper

unread,
Aug 12, 2014, 4:13:17 AM8/12/14
to akka...@googlegroups.com
Nope, doesn't change anything.
Reply all
Reply to author
Forward
0 new messages