How to process a stream of DBIO

94 views
Skip to first unread message

Aldo Stracquadanio

unread,
May 15, 2015, 8:37:51 AM5/15/15
to scala...@googlegroups.com
Hello,

I am trying to adapt a project of mine that uses already Akka streams to Slick 3.0; my problem is that in my current solution I create a stream of sequences of functions that cause some side-effect to the database.
 
Flow[Seq[Session => Unit]].map { msg =>
 
Try {
    db
.withSession { session =>
      msg
.foreach(_(session))
   
}
 
}

I already replaced the code generating the flow of `Session => Unit` to generate instead a flow of `DBIO[Int]`; ideally I would now like to use `DBIO.sequence` to fuse the `Seq[DBIO[Int]]` in a `DBIO[Seq[Int]]` and then run those sequentially and get in the end a `Flow[Seq[Int]]`. 

Does anyone have an advice in how to achieve this? 

Many thanks!

Stefan Zeiger

unread,
May 18, 2015, 4:26:20 AM5/18/15
to scala...@googlegroups.com
On 2015-05-15 14:37, Aldo Stracquadanio wrote:
I am trying to adapt a project of mine that uses already Akka streams to Slick 3.0; my problem is that in my current solution I create a stream of sequences of functions that cause some side-effect to the database.
 
Flow[Seq[Session => Unit]].map { msg =>
 
Try {
    db
.withSession { session =>
      msg
.foreach(_(session))
   
}
 
}

I already replaced the code generating the flow of `Session => Unit` to generate instead a flow of `DBIO[Int]`; ideally I would now like to use `DBIO.sequence` to fuse the `Seq[DBIO[Int]]` in a `DBIO[Seq[Int]]` and then run those sequentially and get in the end a `Flow[Seq[Int]]`.

There's no useful way to run a lazy stream of DBIO at the moment. The only lazy combinator is flatMap, so you could fold a stream with it, but it won't get you very far because the interpreter doesn't do trampolining and will quickly blow the stack.

But your old code doesn't run the whole stream in a single session, either. If you don't need transaction control across the whole stream, it should be as simple as .mapAsync(1)(db.run _).

--
Stefan Zeiger
Slick Tech Lead
Typesafe - Build Reactive Apps!
Twitter: @StefanZeiger

Aldo Stracquadanio

unread,
May 18, 2015, 6:34:39 AM5/18/15
to scala...@googlegroups.com
Thanks for the answer Stefan, I will give it a try. One thing that I don't fully understand is whether or not I will be guaranteed that the actions will be executed at the database level in order using mapAsync or if I have to worry about this detail as some of my actions could be dependent on each other. Could you advice on this please?

Many thanks again,

Aldo.

Stefan Zeiger

unread,
May 18, 2015, 7:21:29 AM5/18/15
to scala...@googlegroups.com
On 2015-05-18 12:34, Aldo Stracquadanio wrote:
Thanks for the answer Stefan, I will give it a try. One thing that I don't fully understand is whether or not I will be guaranteed that the actions will be executed at the database level in order using mapAsync or if I have to worry about this detail as some of my actions could be dependent on each other. Could you advice on this please?

I was wondering the same thing and had to reread the docs for mapAsync a few times (and look for alternative methods) but I think it should be fine. From the scaladoc: "The number of Futures that shall run in parallel is given as the first argument to ``mapAsync``". My understanding of this is that a factor of 1 guarantees sequential execution.
Reply all
Reply to author
Forward
0 new messages