Correct way to "wait" for polling elements from a Source in Akka Streams

313 views
Skip to first unread message

Drew Kutcharian

unread,
Dec 30, 2016, 5:29:55 PM12/30/16
to Akka User List
Hi,

I have a use case where I poll a datasource for items and as long as there are items present I process them as quickly as possible, but when there are no items, I would like to poll every X seconds. Is there a built-in construct in Akka Streams that I can use?

I'm currently using `Source.unfoldAsync` to get the records from the data source. One "bad" option is to just call `Thread.sleep(XXX)` inside `Source.unfoldAsync` when the datasource returns no results (as demonstrated below), but I want to see if there's a better option.

Source.unfoldAsync[NotUsed, Seq[Item]](NotUsed) { _ =>
service.getItems flatMap { items =>
if (items.isEmpty) {
//TODO Fix this!!!
Thread.sleep(30 * 1000) //30 seconds
}
Future.successful(Some((NotUsed, items)))
}
}

I can probably improve on the `Thread.sleep(XXX)` by using having some "State" object and have smaller waits, but I feel like that's not going to be that much better.

Any input is appreciated.

Thanks,

Drew

Konrad Malawski

unread,
Dec 30, 2016, 5:53:25 PM12/30/16
to Akka User List
Read the docs about GraphStage and custom stream processing elements. It had all the utilities needed for this. We also explained them on the Akka blog: Akka.io/blog so check that out :)

--
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Viktor Klang

unread,
Dec 30, 2016, 6:11:56 PM12/30/16
to Akka User List
How about something like this?

Source.repeat(NotUsed).flatMapConcat { _ =>
  val items = service.getItems
  if (items.isEmpty)
    Source.tick(30.seconds, 1.second, Seq.empty[Item]).take(1).drop(1) // <-- more than likely easier ways to do this, but it is late here.
  else
    Source.single(items)
}
--
Cheers,

Drew Kutcharian

unread,
Dec 30, 2016, 6:41:40 PM12/30/16
to akka...@googlegroups.com
Hi Viktor,

I looked at the Source.tick, but the issue is that `service.getItems` returns a Future[Seq[Item]] that's why I'm using Source.unfoldAsync. How would this work wrt Future[Seq[Item]]? BTW, considering it's late where you are, you don't need to respond immediately ;)

- Drew


To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.

Drew Kutcharian

unread,
Dec 30, 2016, 8:58:18 PM12/30/16
to akka...@googlegroups.com
Thanks for the pointer Konrad, I ended up building a custom GraphStage inspired by the TimedGate. Do you mind reviewing it?


Thanks,

Drew



To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages