Play framework, Akka Persistence, PersistenceQuery, unable to convert Source to Future Object for Action.async

49 views
Skip to first unread message

Srinivas

unread,
Dec 4, 2016, 11:23:49 AM12/4/16
to Akka User List

I am trying to get list of persistent events and send it as response(Action.async). But I am not able to convert PersistenceQuery results to Future object. Here is the code

val queries = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val source: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("MYID", 0, Long.MaxValue)
val mappedSource: Source[JsValue, NotUsed] = source.mapAsync(1) { e =>
      e.event match {
        case l: String =>
           Future(Json.parse(l))
      }
   }
val finalResult: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => {
      println(a)
      a :+ b
    })
finalResult

I am able to see prints within runFold, but finalResult was never returned. I even tried to Await, even after waiting for minutes, it never returned. This finalResult shows all the user activity wanted to send it as response as Action.async. Please let me know what is the way to convert Source to Future object

Justin du coeur

unread,
Dec 4, 2016, 11:55:34 AM12/4/16
to akka...@googlegroups.com
Hmm.  You're intentionally not using currentEventsByPersistenceId()?  And your println() is showing 10 results?

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Srinivas

unread,
Dec 4, 2016, 12:51:11 PM12/4/16
to Akka User List
I am not aware of currentEventsByPersistenceId, so started using eventsByPersistenceId. println to check if execution is reaching to end or not, yes it is printing all the events but `finalResult` is not being returned. 


On Sunday, December 4, 2016 at 10:25:34 PM UTC+5:30, Justin du coeur wrote:
Hmm.  You're intentionally not using currentEventsByPersistenceId()?  And your println() is showing 10 results?
On Sun, Dec 4, 2016 at 11:23 AM, Srinivas <chu...@gmail.com> wrote:

I am trying to get list of persistent events and send it as response(Action.async). But I am not able to convert PersistenceQuery results to Future object. Here is the code

val queries = PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val source: Source[EventEnvelope, NotUsed] = queries.eventsByPersistenceId("MYID", 0, Long.MaxValue)
val mappedSource: Source[JsValue, NotUsed] = source.mapAsync(1) { e =>
      e.event match {
        case l: String =>
           Future(Json.parse(l))
      }
   }
val finalResult: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => {
      println(a)
      a :+ b
    })
finalResult

I am able to see prints within runFold, but finalResult was never returned. I even tried to Await, even after waiting for minutes, it never returned. This finalResult shows all the user activity wanted to send it as response as Action.async. Please let me know what is the way to convert Source to Future object

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Justin du coeur

unread,
Dec 4, 2016, 5:04:17 PM12/4/16
to akka...@googlegroups.com
Hmm.  I'm not the deepest expert in this, but my understanding is that eventsByPersistenceId() intentionally doesn't terminate: that's what you use to create an always-open stream to listen for new events.  By contrast, currentEventsByPersistenceId() fetches only stuff that's already there, and *does* terminate.

Like I said, I'm a bit surprised that the take(10) doesn't give you a terminated Future, but you may want to try using currentEventsByPersistenceId() instead, and see if that fixes your problem.

And I should check: what do you mean by "finalResult is not being returned"?  How are you checking for that?  Are you mapping over it?

To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

Srinivas

unread,
Dec 4, 2016, 8:51:20 PM12/4/16
to Akka User List
Thanks Justin,

Here is what I am doing

class UserController extends Controller {
   
def getUserActions(userId: String) = AuthenticatedAction.async {
     
UserService.getUserEvents(userId) map (Ok)
   
}
}


Object UserService {
 
def getUserEvents(userId: String): Future[List[JsValue]] = {

    val queries
= PersistenceQuery(actorSystem).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

    val source
: Source[EventEnvelope, NotUsed] = queries.currentEventsByPersistenceId(leadId, 0, Long.MaxValue)


    val mappedSource
: Source[JsValue, NotUsed] = source.mapAsync(1) { e =>
       
e.event match {
        case l: String =>
           Future(Json.parse(l))
       }
   
}
    val finalResult
: Future[List[JsValue]] = mappedSource.take(10).runFold(List[JsValue]())((a, b) => {
       println
(a)
       a
:+ b
    
})
    finalResult
 
}
}


Controller is not sending response at all, I even tried with Await in controller 5 mins, it is timing out but never sent response. I thought `take` would terminate stream, looks like no. Just by replacing `eventsByPersistenceId ` with `currentEventsByPersistenceId`, response is being sent.


Justin du coeur

unread,
Dec 4, 2016, 9:58:12 PM12/4/16
to akka...@googlegroups.com
Okay, good -- glad that helped...

Patrik Nordwall

unread,
Dec 5, 2016, 1:15:48 AM12/5/16
to akka...@googlegroups.com
The original take(10) should also work if you have 10 or more events.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Srinivas

unread,
Dec 5, 2016, 4:01:16 AM12/5/16
to Akka User List
Hi Patrik,

   Does it mean, it will wait to accumulate at least 10 events?

Srinivas

unread,
Dec 5, 2016, 4:01:44 AM12/5/16
to Akka User List
In my case there were less than 10 events

Patrik Nordwall

unread,
Dec 5, 2016, 4:12:33 AM12/5/16
to akka...@googlegroups.com
Yes, it will keep the stream open until it can take 10 elements.
That explains it, and you can use currentEventsByPersistenceId.


On Mon, Dec 5, 2016 at 10:01 AM, Srinivas <chu...@gmail.com> wrote:
In my case there were less than 10 events

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.

To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Srinivas

unread,
Dec 5, 2016, 5:52:19 AM12/5/16
to Akka User List
Thanks Patrik
Reply all
Reply to author
Forward
0 new messages