abstract class Token(prefix: String, sequence: Long) { def code: String = prefix + sequence}case class RefreshToken(sequence: Long) extends Token("R", sequence) { def next = this.copy(sequence = sequence + 1)}case class AccessToken private (sequence: Long, expiresIn: Duration) extends Token("A", sequence) { def this(token: RefreshToken, expiresIn: Duration) = this(token.sequence, expiresIn)}
case class RefreshResponse(access: AccessToken, refresh: Option[RefreshToken])
// The basic OAuth refresh token request/response flow (section 6 of the OAuth 2.0 spec)// This flow is the fundamental source of access and refresh tokens// and would be a real HTTP flow in a working exampledef refreshRequest(endpoint: URL, clientId: String, clientSecret: String): Flow[RefreshToken, Future[RefreshResponse], Unit] = { // val expiresIn = 10.minutes // This would be the real flow that makes the request to the OAuth refresh endpoint // using Http().singleRequest(...), for example Flow[RefreshToken].map(refresh => Future.successful(RefreshResponse(new AccessToken(refresh, 10.minutes), Some(refresh.next))))}
// Executes a rolling series of refresh requests with appropriate delay to serve as the// basis for a continuous stream of Access Tokens// The pair of futures emitted represent the current and next Access Tokens, respectively// The current Access Token should be used until the future for the next Access Token completesdef refreshFlow(initial: RefreshToken, request: Flow[RefreshToken, Future[RefreshResponse], Unit]): Source[(Future[AccessToken], Future[AccessToken])] = { Source( Source(initial, Promise[AccessToken]), request, Merge[(RefreshToken, Promise[AccessToken])](2), Unzip[RefreshToken, Promise[AccessToken]], Zip[RefreshResponse, Promise[AccessToken]], Broadcast(2))((mat, _, _, _, _, _) => mat) { implicit b => (initial, request, merge, unzip, zip, bcast) =>
// Complete current promise and create next promise val promise = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])].map { case (response, promise) => { promise.completeWith(response.map(_.access)) (response, Promise[AccessToken]) } })
// Feeds back the refresh token after delay to initiate the next access token refresh // Uses the Akka after pattern to schedule the Future about 30 seconds prior to expiration val feedback = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])].map { case (response, promise) => { Source(response).collect { case RefreshResponse(access, Some(refresh)) => { val delay = access.expiresIn.minus(30.seconds) val future = after(delay, scheduler)(() => Future.successful(refresh, promise)) Source(future) } }.flatten(FlattenStrategy.concat) } })
// Output the current/next token future pair val output = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])].map { case (response, promise) => (response.map(_.access), promise.future) })
// The rolling request flow, initiated with the initial refresh token initial ~> merge ~> unzip.in unzip.left ~> request ~> zip.in0 unzip.right ~> zip.in1 zip.out ~> promise ~> bcast ~> feedback merge <~ feedback bcast ~> output
output.outlet }}
// Converts a source of (current, next) Access Token future pairs into
// a continuous stream of Access Tokens, switching to the next token, when availabledef accessTokenSource(source: Source[(Future[AccessToken], Future[AccessToken])]): Source[AccessToken, Unit] = { source.map { case (current, next) => Source(current).map(token => Source.repeat(token).takeWhile(!next.isCompleted)) }.flatten(FlattenStrategy.concat)}
// Decorate requests with the current access tokendef protected(tokens: Source[AccessToken]): Flow[HttpRequest, HttpRequest] = { Flow[HttpRequest].map(_.withHeaders(new OAuth2BearerToken(AccessToken.code)))}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to a topic in the Google Groups "Akka User List" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/akka-user/qh1ktrdbjbE/unsubscribe.
To unsubscribe from this group and all its topics, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
// A simple implementation of the Akka Cancellable traitclass AtomicCancellable extends Cancellable { private val cancelled = new AtomicBoolean() override def cancel() = cancelled.compareAndSet(false, true) override def isCancelled() = cancelled.get}object AtomicCancellable { def apply() = new AtomicCancellable}
// Sample OAuth refresh token/response related classesabstract class Token(prefix: String, sequence: Long) { val code: String = prefix + sequence}case class RefreshToken(sequence: Long) extends Token("R", sequence) { def this() = this(0) def next = this.copy(sequence = sequence + 1)}case class AccessToken private (sequence: Long, expiresIn: FiniteDuration) extends Token("A", sequence) { def this(token: RefreshToken, expiresIn: FiniteDuration) = this(token.sequence, expiresIn)}
case class RefreshResponse(access: AccessToken, refresh: Option[RefreshToken])
// Creates a sink that materializes an Agent for obtaining the current Access Token// The sink accepts a single element, the initial Refresh Token, and// initiates an internal flow that automatically requests new Access Tokens// and updates the materialized Agent accordingly// The paired Cancellable can be used to stop the automatic update process// Callers should provide a valid persist sink, typically backed by a database// or other persistent store, to save the latest refresh token value// The lead argument is the lead time to request the next Access Token prior to// the current token's expirationdef autoRefresh( request: Flow[RefreshToken, Future[RefreshResponse], _], persist: Sink[RefreshToken, _], lead: FiniteDuration = 30.seconds )(implicit materializer: Materializer, system: ActorSystem) : Sink[RefreshToken, (Agent[Future[AccessToken]], Cancellable)] = {
Sink.head[RefreshToken].mapMaterializedValue { futureInitial =>
implicit val executionContext = materializer.executionContext val first = Promise[AccessToken] val agent = Agent(first.future) val cancellable = AtomicCancellable()
// Create the auto refresh flow that will run independently // to periodically update the agent with the current Access Token val auto = Sink( Flow[(RefreshToken, Promise[AccessToken])], Merge[(RefreshToken, Promise[AccessToken])](2), request, Unzip[RefreshToken, Promise[AccessToken]], Zip[Future[RefreshResponse], Promise[AccessToken]], Broadcast[(RefreshResponse, Promise[AccessToken])](3) )((mat, _, _, _, _, _) => mat) { implicit b => (initial, merge, request, unzip, zip, bcast) =>
// Detect and handle cancellation // The splitWhen diverts the flow of elements upon cancellation, // ending the refresh process and allowing us to handle cleanup // so we don't leave a dangling, uncompleted promise val cancel = b.add(Flow[(RefreshToken, Promise[AccessToken])] .splitWhen(_ => cancellable.isCancelled) .prefixAndTail(1).map { case (prefix, tail) => { tail.map(_.map { case (_, promise) => promise.failure(new Exception("auto refresh cancelled")) }) prefix.head } }.flatten(FlattenStrategy.concat))
// Complete current promise and create next promise val promise = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])] .map { case (fresponse, cur) => { cur.completeWith(fresponse.map(_.access))
val next = Promise[AccessToken] // Update agent upon promise completion // Note that this a side effect, hence the andThen next.future.andThen { case _ => agent.send(next.future) }
(fresponse, next) } })
// Unwrap the completed response future val response = b.add(Flow[(Future[RefreshResponse], Promise[AccessToken])] .map { case (fr, p) => Source(fr.map(r => (r, p))) } .flatten(FlattenStrategy.concat))
// Save the updated refresh token, if supplied val save = b.add(Flow[(RefreshResponse, Promise[AccessToken])] .collect { case (RefreshResponse(_, Some(refresh)), _) => refresh } .to(persist))
// Send the next future to the agent upon expiration of current to // prevent users of the agent from using expired tokens // Uses the Akka after pattern to schedule the send val expiration = b.add(Sink.foreach[(RefreshResponse, Promise[AccessToken])] { case (RefreshResponse(access, _), promise) => { after(access.expiresIn, system.scheduler)(Future { if (!promise.isCompleted) { agent.send(promise.future) } }) } })
// Feed back the refresh token after delay to initiate the next access token refresh // Uses the Akka after pattern to schedule the Future with configured lead time // prior to expiration val refresh = b.add(Flow[(RefreshResponse, Promise[AccessToken])] .collect { case (RefreshResponse(access, Some(refresh)), promise) => { val delay = access.expiresIn.minus(lead) val future = after(delay, system.scheduler)(Future.successful((refresh, promise))) Source(future) } }.flatten(FlattenStrategy.concat))
// The rolling request flow, initiated with the initial refresh token initial ~> merge ~> cancel ~> unzip.in unzip.out0 ~> request ~> zip.in0 unzip.out1 ~> zip.in1 zip.out ~> promise ~> response ~> bcast ~> save bcast ~> expiration bcast ~> refresh merge <~ refresh
initial.inlet
}
// Run the auto refresh flow val initial = futureInitial.map(refresh => (refresh, first)) auto.runWith(Source(initial))
// Return the (Agent, Cancellable) pair (agent, cancellable) }
}
// Create an Access Token source that always retrieves the latest value from an agentdef accessTokenSource(agent: Agent[Future[AccessToken]]): Source[AccessToken, Unit] = Source.repeat().map(_ => Source(agent())).flatten(FlattenStrategy.concat)
// Flow that decorates requests with the current access tokendef addAccessToken(tokens: Source[AccessToken, Unit]): Flow[HttpRequest, HttpRequest, Unit] = Flow(Flow[HttpRequest], tokens, Zip[HttpRequest, AccessToken])((mat, _, _) => mat) { implicit b => (requests, tokens, zip) =>
val addHeader = b.add(Flow[(HttpRequest, AccessToken)].map { case (request, access) => request.withHeaders(new Authorization(new OAuth2BearerToken(access.code))) })
requests ~> zip.in0 tokens ~> zip.in1 zip.out ~> addHeader
(requests.inlet, addHeader.outlet) }
class OAuthRefreshSpec extends FlatSpec with AkkaStreamsImplicits with Matchers with ScalaFutures { def createMockResponse(refresh: RefreshToken, expiresIn: FiniteDuration) = RefreshResponse(new AccessToken(refresh, expiresIn), Some(refresh.next))
"Auto refresh" should "generate access token" in {
val initialRefresh = new RefreshToken() val expiresIn = 3.seconds val mockRequestFlow = Flow[RefreshToken].map(r => Future.successful(createMockResponse(r, expiresIn))) val persist = Sink.foreach[RefreshToken](r => println(s"Saving refresh token: $r"))
val autoRefreshSink = autoRefresh(mockRequestFlow, persist, 1.second) val (agent, cancellable) = Source.single(initialRefresh).runWith(autoRefreshSink)
whenReady(agent.get) { token => val expected = new AccessToken(initialRefresh, expiresIn) token shouldBe expected cancellable.cancel() }
}
}val auto = Sink(
Flow[(RefreshToken, Promise[AccessToken])],
Merge[(RefreshToken, Promise[AccessToken])](2),
request,
Unzip[RefreshToken, Promise[AccessToken]],
Zip[Future[RefreshResponse], Promise[AccessToken]],
Broadcast[(RefreshResponse, Promise[AccessToken])](3)
)((mat, _, _, _, _, _) => mat) {
implicit b => (initial, merge, request, unzip, zip, bcast) =>
...
}
Sink<Pair<RefreshToken, F.Promise<OAuth2Info>>, BoxedUnit> auto =
Sink.factory().create6(
Flow.<Pair<RefreshToken, F.Promise<OAuth2Info>>>create(),
Merge.<Pair<RefreshToken, F.Promise<OAuth2Info>>>create(2),
request,
Unzip.<RefreshToken, F.Promise<OAuth2Info>>create(),
Zip.<Future<RefreshResponse>, F.Promise<OAuth2Info>>create(),
Broadcast.<Pair<RefreshResponse, F.Promise<OAuth2Info>>>create(3),
new Function6<BoxedUnit, BoxedUnit, BoxedUnit, BoxedUnit, BoxedUnit, BoxedUnit, BoxedUnit>() {
@Override
public BoxedUnit apply(BoxedUnit m1, BoxedUnit m2, BoxedUnit m3, BoxedUnit m4, BoxedUnit m5, BoxedUnit m6) {
return m1;
}
},
new Function7<FlowGraph.Builder<BoxedUnit>,
FlowShape<Pair<RefreshToken, F.Promise<OAuth2Info>>, Pair<RefreshToken, F.Promise<OAuth2Info>>>,
UniformFanInShape<Pair<RefreshToken, F.Promise<OAuth2Info>>, Pair<RefreshToken, F.Promise<OAuth2Info>>>,
FlowShape<RefreshToken, F.Promise<RefreshResponse>>,
FanOutShape2<Pair<RefreshToken, F.Promise<OAuth2Info>>, RefreshToken, F.Promise<OAuth2Info>>,
FanInShape2<Future<RefreshResponse>, F.Promise<OAuth2Info>, Pair<Future<RefreshResponse>, F.Promise<OAuth2Info>>>,
UniformFanOutShape<Pair<RefreshResponse, F.Promise<OAuth2Info>>, Pair<RefreshResponse, F.Promise<OAuth2Info>>>,
Inlet<Pair<RefreshToken, F.Promise<OAuth2Info>>>>() {
@Override
public Inlet<Pair<RefreshToken, F.Promise<OAuth2Info>>> apply(
FlowGraph.Builder<BoxedUnit> builder,
FlowShape<Pair<RefreshToken, F.Promise<OAuth2Info>>, Pair<RefreshToken, F.Promise<OAuth2Info>>> initial,
UniformFanInShape<Pair<RefreshToken, F.Promise<OAuth2Info>>, Pair<RefreshToken, F.Promise<OAuth2Info>>> merge,
FlowShape<RefreshToken, F.Promise<RefreshResponse>> request,
FanOutShape2<Pair<RefreshToken, F.Promise<OAuth2Info>>, RefreshToken, F.Promise<OAuth2Info>> unzip,
FanInShape2<Future<RefreshResponse>, F.Promise<OAuth2Info>, Pair<Future<RefreshResponse>, F.Promise<OAuth2Info>>> zip,
UniformFanOutShape<Pair<RefreshResponse, F.Promise<OAuth2Info>>, Pair<RefreshResponse, F.Promise<OAuth2Info>>> broadcast) {
...
}
...
--