API for Flink - need for a casting feature

26 views
Skip to first unread message

mathieu...@diginext.fr

unread,
Sep 11, 2020, 5:36:46 AM9/11/20
to Nussknacker
Hi everyone,

I'm currently working with NK 0.1.2 (I'm planning to upgrade, but I would like to enhance my API first - let me know if upgrading could solve my issue in any way :) )

I'm working with input streams that produces objects of miscellaneous types. And at some point, in my processes, i need to filter for objects instances of a certain type, and then cast my output object to that type. 

Is there a way to do so ? 
Is there a way to "filter and cast" a value of my context ?

To achieve this goal, just tried to build a FlinkCustomStreamTransformation (by composition of a flink filter and map stream operation), but the signature of the transform method seems to force the output object to be casted as... an object (and thus, in the following steps of the process, in the ui, I can't use any code completion in the SPEL expressions) :

DataStream<ValueWithContext<Object>> transform(final DataStream<Context> start, final FlinkCustomNodeContext context);

Thanks a lot for your help and your work on the project.

Best regards
--

Arek Burdach

unread,
Sep 11, 2020, 6:13:31 AM9/11/20
to mathieu...@diginext.fr, Nussknacker
Hi,

Upgrade to recent version shouldn't be needed.

Indeed we currently doesn't provide any option to cast values.
Does your "certain type" will be static for your model or you want user to provide it in the parameter?
If it is static, the option is as you already mentioned, to write your own FlinkCustomStreamTransformation. Don't worry about Object in DataStream<ValueWithContext<Object>> - it is because of some variance problems, and you can still put object of any type there.

To make completion work correctly You need to declare returned type in:
@MethodToInvoke(returnType = classOf[CertainType])

Then you need to just make sure that your implementation will produce the same type as you declared as a return type e.g. by:
start.flatMap(ctx => ctx.get[AnyRef]("foo").filter(_.isInstanceOf[CertainType]).map(ValueWithContext(_, ctx)))
or by using LazyParameter if you want to make user provide which exactly object from context should be casted. You can check PreviousValueTransformer for example.

Does my answer helped you?

Cheers,
Arek
--
You received this message because you are subscribed to the Google Groups "Nussknacker" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nussknacker...@googlegroups.com.
To view this discussion on the web, visit https://groups.google.com/d/msgid/nussknacker/0ffbd198-b4ce-4576-a3a6-3c18404dc3c5n%40googlegroups.com.

mathieu...@diginext.fr

unread,
Sep 11, 2020, 8:15:58 AM9/11/20
to Nussknacker
Yes, thank you, that's great, This field of the @MethodToInvoke was exactly what I was looking for.

To answer your first question, at first step, the types would be static, and I will instantiate as many CustomTransformation as needed (not so much hopefully). But it would be great to let the user choose the type for filtering the stream and casting the result.

Best regards,

Arek Burdach

unread,
Sep 11, 2020, 8:27:54 AM9/11/20
to mathieu...@diginext.fr, Nussknacker
If you want to make it dynamically chosen it also should be possible. You have overloaded version of FlinkCustomStreamTransformation taking returnType: TypingResult.
You can define parameter of node taking name of type and then based on this you can load class using Class.forName and create Typed.typedClass based on it and use this class also in filtering logic.
Also you can use FIXED_VALUES_EDITOR to make the list of types selectable from selectbox. Take a look at SimpleSlidingAggregateTransformer for example.

mathieu...@diginext.fr

unread,
Sep 11, 2020, 10:19:47 AM9/11/20
to Nussknacker
I just gave a try to the solution of the overloaded version of FlinkCustomStreamTransformation taking returnType: TypingResult.

Unfortunately, even if the produced variable is displayed as an instance of the correct type (see below)...
cast_ok.PNG
... I don't get any code completion when I try to navigate through the fields of the casted object (also see below) :
cast_ko.PNG


Here is the source of my transformer (the workaround using the dummy object is odd, but I don't really see how to proceed otherwise). Do you have any clue to help me fiw this issue ? :

public class FilterAndCastStreamTransformer<T> extends CustomStreamTransformer {

    private static final Logger LOGGER = LoggerFactory.getLogger(FilterAndCastStreamTransformer.class);

    private final Class<T> clazz;

    public FilterAndCastStreamTransformer(Class<T> clazz) {
        this.clazz = clazz;
    }

    @MethodToInvoke()
    public FlinkCustomStreamTransformation transform(@ParamName("toCast") LazyParameter<Object> objectToCastParam) {

        return FlinkCustomStreamTransformation.apply((start, ctx) -> {
            ValueWithContext<Object> dummy = new ValueWithContext<Object>(null, null);
            return start.map(ctx.lazyParameterHelper().lazyMapFunction(objectToCastParam), TypeInformation.of((Class<ValueWithContext<Object>>) dummy.getClass()))
                    .filter(new FilterFunction<ValueWithContext<Object>>() {
                        @Override
                        public boolean filter(ValueWithContext<Object> value) throws Exception {
                            return clazz.isInstance(value.value());
                        }
                    })
                    .map(new MapFunction<ValueWithContext<Object>, ValueWithContext<Object>>() {
                        @Override
                        public ValueWithContext<Object> map(ValueWithContext<Object> value) throws Exception {
                            return new ValueWithContext<>(value.value(), value.context());
                        }
                    }, TypeInformation.of((Class<ValueWithContext<Object>>) dummy.getClass()));
        }, typing.Typed$.MODULE$.apply(clazz));

    }
}

Thanks a lot.

Best regards,

Arek Burdach

unread,
Sep 11, 2020, 11:29:01 AM9/11/20
to mathieu...@diginext.fr, Nussknacker


On 11.09.2020 16:19, 'mathieu...@diginext.fr' via Nussknacker wrote:
I just gave a try to the solution of the overloaded version of FlinkCustomStreamTransformation taking returnType: TypingResult.

Unfortunately, even if the produced variable is displayed as an instance of the correct type (see below)...
cast_ok.PNG
... I don't get any code completion when I try to navigate through the fields of the casted object (also see below) :
cast_ko.PNG
It might be because for completion we collect all available types by discovering objects created by ConfigCreater. See ProcessDefinitionExtractor.extractTypes for details:
TypesInformation.extract(definition.services.values ++
  definition.sourceFactories.values ++
  definition.customStreamTransformers.values.map(_._1) ++
  definition.signalsWithTransformers.values.map(_._1) ++
  definition.expressionConfig.globalVariables.values
)(definition.settings)
, so one possible workaround is to declare in @MethodToInvoke(returnType = ClassThatContainsSignaturesWithAllPossibleTypes.class)
It can look like:
interface ClassThatContainsSignaturesWithAllPossibleTypes {
  FooClass getFoo()
  BarClass getBar()
...
}
I don't know if it will work, but you can give it a try.

We are working on code-less models. e.g. in 0.2.x we provided avro sources that has automatically regenerated types based on types defined in avro schema stored in schema registry.
It based on TypedObjectTypingResult. You could use it in your example but then you will need to somehow list all available properties of selected class. Also it doesn't work with methods.





Here is the source of my transformer (the workaround using the dummy object is odd, but I don't really see how to proceed otherwise). Do you have any clue to help me fiw this issue ? :
TypeInformation.of(new TypeHint<ValueWithContext<Object>>() {})
should work for you :)

Reply all
Reply to author
Forward
0 new messages