Cannot cancel a streaming query

160 views
Skip to first unread message

Mark Goldenstein

unread,
Jan 1, 2017, 4:30:08 PM1/1/17
to Slick / ScalaQuery
Hi guys!

I use Slick together with akka-http to stream rows from a mysql database as csv via http.

Streaming works as expected unless I disconnect the http connection prematurely. In this case I would expect that the sql query should stop executing since there is no downstream demand anymore. However, I can see in the mysql processes that the query is still running and it only stops when I shut down the webserver (or when the query finishes fetching all the rows).

If I turn on debug logs I see repeatedly, while streaming:

22:05:17.738 [db-8] DEBUG slick.basic.BasicBackend.stream - Suspending streaming action with continuation (more data available)
22:05:17.743 [default-akka.actor.default-dispatcher-8] DEBUG slick.basic.BasicBackend.stream - Scheduling stream continuation after transition from demand = 0
22:05:17.743 [db-9] DEBUG slick.basic.BasicBackend.stream - Restarting  streaming action, realDemand = 8

Then, once I disconnect the connection, and there is no downstream demand anymore, I see:

22:05:17.744 [db-9] DEBUG slick.basic.BasicBackend.stream - Suspending streaming action with continuation (more data available)
22:05:17.749 [default-akka.actor.default-dispatcher-7] DEBUG akka.io.TcpIncomingConnection - Closing connection due to IO error java.io.IOException: Broken pipe
22:05:17.774 [default-akka.actor.default-dispatcher-16] DEBUG slick.basic.BasicBackend.stream - Scheduling stream continuation after transition from demand = 0
22:05:17.774 [db-10] DEBUG slick.basic.BasicBackend.stream - Restarting  streaming action, realDemand = oo

What is going on here? Why is realDemand oo?

My code looks something like this:

object Main extends App { 
  implicit val system = ActorSystem()
  implicit val executor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  implicit val csvMarshaller =
    Marshaller.withFixedContentType[CSVLine, ByteString](ContentTypes.`text/csv(UTF-8)`) {
      case CSVLine(line) => ByteString(line)
    }

  implicit val csvStreamingSupport = EntityStreamingSupport.csv()
    .withParallelMarshalling(parallelism = 8, unordered = false)

  val query
 = ??? // a Slick query

  val publisher = DB.get.stream(
    query.result.withStatementParameters(statementInit = DB.enableStream))

  val routes = {
    logRequestResult("poloniex-webservice") {
      encodeResponse {
        pathPrefix("csv") {
          pathSingleSlash {
            get {
              complete {
                Source.fromPublisher(publisher)
              }
            }
          }
        }
      }
    }
  }

  Http().bindAndHandle(routes, "127.0.0.1", 9000)
}

object DB {
  private val db = Database.forConfig("db")

  def get = db

  def enableStream(statement: java.sql.Statement): Unit = {
    statement match {
      case s if s.isWrapperFor(classOf[com.mysql.jdbc.StatementImpl]) =>
        s.unwrap(classOf[com.mysql.jdbc.StatementImpl]).enableStreamingResults()
      case _ => // do nothing
    }
  }
}

Any thoughts on this?

Mark

André Cardoso

unread,
Mar 6, 2018, 1:04:16 PM3/6/18
to Slick / ScalaQuery
Hi there,

I'm having sort of the same problem with this behavior.

I'm using the advised configuration to have proper streaming:
action.withStatementParameters(
  rsType = ResultSetType.ForwardOnly,
  rsConcurrency = ResultSetConcurrency.ReadOnly,
  fetchSize = 1234).transactionally

which I found on https://github.com/slick/slick/issues/1305

Despite the reactive streams being correctly signaled for termination, the underlying SQL statement is not terminated. 
What I've seen happening is that slick only cancels the statement between fetches.
So the problem arises when, for any reason, the streaming query I send to the DB takes a lot of time to even start sending data to the client. 
If, on the slick side, the stream is cancelled, I can see the process still working on the DB until the first batch of data is sent into the (still) opened JDBC connection.
Afterwards, all resources seem to be closed.

In fact, with postgres, if we are not careful enough to correctly configure the action for proper streaming and don't mark it with a pinned session, 
we will be asking for the full DB data set and may end up exhausting all the application memory even after cancelling the stream, because the 
JDBC layer continues to consume the whole data set.

To overcome this, I'm thinking about getting the actual SQL statement used by Slick into the application scope, by means of the "statementInit" param:

def withStatementParameters(rsType: ResultSetType = null,
                                rsConcurrency: ResultSetConcurrency = null,
                                rsHoldability: ResultSetHoldability = null,
                                statementInit: Statement => Unit = null,
                                fetchSize: Int = 0)

The "statementInit" param exposes the statement, which I can now bring into scope by saving it in some wrapper and then use it in my source like so:
theAkkaSource.watchTermination() { (_, f) =>
        f.onComplete { _ =>
          statementWrapper.cancelStatement()
        }
        NotUsed
      }

I think this might work.
However, I don't understand why slick does not deal with this?

Any thoughts or help?
Reply all
Reply to author
Forward
0 new messages