package kernel.modeller.workers.streamFinder.generic
import akka.stream.{Shape, Outlet, Inlet}
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
object FanShape {
sealed trait Init[_] {
def inlets: immutable.Seq[Inlet[_]]
def outlets: immutable.Seq[Outlet[_]]
def name: String
}
final case class Name[_](override val name: String) extends Init[Any] {
override def inlets: immutable.Seq[Inlet[_]] = Nil
override def outlets: immutable.Seq[Outlet[_]] = Nil
}
final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
override def name: String = "FanShape"
}
}
abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: Iterator[Outlet[_]], _name: String) extends Shape {
import FanShape._
def this(init: FanShape.Init[_]) = this(init.inlets.iterator, init.outlets.iterator, init.name)
final override def outlets: immutable.Seq[Outlet[_]] = _outlets
final override def inlets: immutable.Seq[Inlet[_]] = _inlets
private var _outlets: Vector[Outlet[_]] = Vector.empty
private var _inlets: Vector[Inlet[_]] = Vector.empty
protected def newOutlet[T](name: String): Outlet[T] = {
val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else Outlet[T](s"${_name}.$name")
_outlets :+= p
p
}
protected def newInlet[T](name: String): Inlet[T] = {
val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else Inlet[T](s"${_name}.$name")
_inlets :+= p
p
}
protected def construct(init: Init[_]): FanShape[_]
def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), outlets.map(_.carbonCopy())))
final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): FanShape[_] = {
require(outlets.size == _outlets.size, s"proposed outlets [${outlets.mkString(", ")}] do not fit FanShape")
require(inlets.size == _inlets.size, s"proposed inlects [${inlets.mkString(", ")}] do not fit FanShape")
construct(Ports(inlets, outlets))
}
}
object UniformFanShape {
def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): UniformFanShape[I, O] =
new UniformFanShape(inlets.size, outlets.size, FanShape.Ports(inlets.toList, outlets.toList))
}
class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_]) extends FanShape(_init) {
def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
def this(inlets: Array[Inlet[I]], outlets: Array[Outlet[O]]) = this(inlets.size, outlets.size, FanShape.Ports(inlets.toList, outlets.toList))
override protected def construct(init: FanShape.Init[_]): FanShape[_] = new UniformFanShape(n, m, init)
override def deepCopy(): UniformFanShape[I, O] = super.deepCopy().asInstanceOf[UniformFanShape[I, O]]
val inArray: Array[Inlet[I @uncheckedVariance]] = Array.tabulate(n)(i ⇒ newInlet[I](s"in$i"))
def in(n: Int): Inlet[I @uncheckedVariance] = inArray(n)
val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(m)(j ⇒ newOutlet[O](s"out$j"))
def out(m: Int): Outlet[O @uncheckedVariance] = outArray(m)
}
The idea of multi input/output shape was to redirect messages to a right output based on the message data.
I just learn streams, so maybe you can suggest a better solution?
Thanks!
Cheers,
Sergey
It's not clear to me, what are you trying to accomplish. It looks like you are trying to implement AmorphousShape (ie. arbitrary number of open inlets and outlets) on your own, and then a specific variant of it, that has all inlets sharing the same type, and all outlets sharing another type. The "Fan" fragment in the names you used is a bit misleading, since in Akka Stream's own usage of it names like FanIn / FanOut shape mean that such grap has many inlets and single outlet / single inlet many outlets. The analogy is to a Chinese-style hand held fan, rather than ceiling fan with many blades :) I am wondering what use case you have in mind for your AmorphousShape because the graphs that can be materialized and executed must ultimately have a ClosedShape. You could use such multi-outlet graphs for reusing pieces of functionality, but anything more complex than a BidiShape seems rather unwieldy to me.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.
private CompletionStage<List<Response>> buildAndRunGraph(ArrayList<Data> sourceList) { //First function
Source<Data, NotUsed> source = Source.from(sourceList);
Materializer materializer = ActorMaterializer.create(context());
System.out.println("Running flow.");
return source.map(i -> {System.out.println("Message_1!"); return i;}).via(balancer(buildFinderFlow(), sourceList.size(), false)).
map(i -> {System.out.println("Message_2!"); return i;}).runWith(Sink.seq(), materializer);
}
/**
* This procedure build main processing flow.
* Flow design is following:
*
* +-------------------------------------------+
* | +-----------------(0)--+----+ |
* | | +-----------+ | FL |-(1)-------|---->
* | +-->| |-(0)->+----+ |
* >---|------>|Management | +-----+ |
* | +-->| flow |-(1)->| S |----+ |
* | |+->| | +-----+ | |
* | || +-----------+-(2)->+-------+ | |
* | |+---------------------|C | | |
* | | +-------+ | |
* | +---------------------------------+ |
* +-------------------------------------------+
*
* @return main processing flow.
*/
private static Flow<Data, Response, NotUsed> buildFinderFlow() {
return Flow.fromGraph(GraphDSL.create(builder -> {
Graph<UniformFanShape<Data, Data>, NotUsed> managementFlow = new ManagementFlow(); //Extends my new UniformFanShape. Code listed below
UniformFanShape<Data, Data> managementShape = builder.add(managementFlow);
Graph<FanOutShape2<Data, Data, Response>, NotUsed> fl = new FLShape(); //FLShape extends FanOutShape2<Data, Data, Response>
FanOutShape2<Data, Data, Response> flShape = builder.add(fl);
FlowShape<Data, Data> s = builder.add(new SDShape()); //SDShape extends FlowShape<Data, Data>
FlowShape<Data, Data> c = builder.add(new CDShape()); //CDShape extends FlowShape<Data, Data>
builder.from(managementShape.out(0)).toInlet(flShape.in());
builder.from(managementShape.out(1)).toInlet(s.in());
builder.from(managementShape.out(2)).toInlet(c.in());
builder.from(flShape.out0()).toInlet(managementShape.in(0));
builder.from(s.out()).toInlet(managementShape.in(2));
builder.from(c.out()).toInlet(managementShape.in(3));
return new FlowShape<>(managementShape.in(1), flShape.out1());
})
);
}
/**
* This procedure returns Flow which contains set of sub Flows to be run asynchronously.
*
* @param worker Flow which contains processing logic and to be run asynchronously
* @param workerCount amount of asycnhronous processes
* @param <In> Type of input
* @param <Out> Type of output
* @return Flow which contains set of asynchronous processes
*/
private static <In, Out> Flow<In, Out, NotUsed> balancer(
Flow<In, Out, NotUsed> worker, int workerCount, boolean waitForAllDownstreams) {
return Flow.fromGraph(GraphDSL.create(b -> {
final UniformFanOutShape<In, In> balance =
b.add(Balance.<In>create(workerCount, waitForAllDownstreams));
final UniformFanInShape<Out, Out> merge =
b.add(Merge.<Out>create(workerCount));
for (int i = 0; i < workerCount; i++) {
b.from(balance.out(i)).via(b.add(worker.async())).toInlet(merge.in(i));
}
return FlowShape.of(balance.in(), merge.out());
}));
}
package kernel.modeller.workers.streamFinder.subPathFinderShapes;
import akka.stream.Attributes;import akka.stream.Inlet;import akka.stream.Outlet;import akka.stream.stage.AbstractInHandler;import akka.stream.stage.AbstractOutHandler;import akka.stream.stage.GraphStage;import akka.stream.stage.GraphStageLogic;import kernel.modeller.data.Data;import kernel.modeller.workers.streamFinder.generic.UniformFanShape;
public final class ManagementFlow extends GraphStage<UniformFanShape<Data,Data>> { //Inlets public final Inlet<Data> startIn = Inlet.create("Start.in"); public final Inlet<Data> flIn = Inlet.create("FL.in"); public final Inlet<Data> sIn = Inlet.create("sDir.in"); public final Inlet<Data> cIn = Inlet.create("cDir.in"); //Outlets public final Outlet<Data> flOut = Outlet.create("FL.out"); public final Outlet<Data> sOut = Outlet.create("sDir.out"); public final Outlet<Data> cOut = Outlet.create("cDir.out");
private Inlet[] inlets = { flIn, startIn, sIn, cIn}; private Outlet[] outlets = {flOut, sOut, cOut};
private byte inletNumberToPullFrom = -1;
//Shape private final UniformFanShape<Data, Data> shape = new UniformFanShape((Inlet<Data>[])inlets, (Outlet<Data>[])outlets);
@Override public UniformFanShape<Data, Data> shape() { return shape; }
@Override public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { //Handler for Start.in Inlet { setHandler(startIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(startIn); inletNumberToPullFrom = 0; push(findOutlet(elem), elem); } }); } //Handler for FirstLast.in Inlet { setHandler(flIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(flIn); inletNumberToPullFrom = 1; push(findOutlet(elem), elem); } }); } //Handler for sDir.in Inlet { setHandler(sIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(sIn); inletNumberToPullFrom = 2; push(findOutlet(elem), elem); } }); } //Handler for cDir.in Inlet { setHandler(cIn, new AbstractInHandler() { @Override public void onPush() throws Exception { System.out.println("We are in ManagementFlow abstract in handler."); Data elem = grab(cIn); inletNumberToPullFrom = 3; push(findOutlet(elem), elem); } }); }
//Handler for FirstLast.out outlet { setHandler(flOut, new AbstractOutHandler() { @Override public void onPull() throws Exception { pull(findInlet()); } }); } //Handler for sDir.out outlet { setHandler(sOut, new AbstractOutHandler() { @Override public void onPull() throws Exception { pull(findInlet()); } }); } //Handler for cDir.out outlet { setHandler(cOut, new AbstractOutHandler() { @Override public void onPull() throws Exception { pull(findInlet()); } }); }
}; }
private Outlet<Data> findOutlet(Data elem) { if(elem.isFirst() || elem.isLast()) { return flOut; } else if(!elem.getSomeFlag()) { return sOut; } return cOut; }
private Inlet<Data> findInlet() { switch (inletNumberToPullFrom) { case 0: return startIn; case 1: return flIn; case 2: return sIn; case 3: return cIn; default: throw new IllegalStateException("ManagementFlow Error: It is impossible to define inlet to pull from. Current value of flag is: " + inletNumberToPullFrom); } }
}
Rafał,- If I understand onPulI() function correctly, it is allowed only for outlets, so in order to pull something you need to push it in this outlet firstly.
- I don't think that testing will help me, because based on the first statement I would see something in the log. So, messages do not go inside the shape and stuck somewhere between flow and managementFlow.
- sink requests data from you- OutletHandler.onPull is invoked on the outlet where the sink is connected to- you propagate demand outstream by calling pull on any (or all) of your Inlets, depending on your logic- eventually data becomes available upstream- InletHandler.onPush is invoked on the inlet you pulled previously, with the incoming element
OK, but the actual number of workers should not be greater than the number of available CPUs, because otherwise Akka will interleave their execution anyway. Spawning 1000s of worker flows will only waste memory. Of course I understand that the input of fixed collection of data is artif
I usually prefer to debug single problem at a time than a number of possibly interrelated problems at once.