I just gave a try to the solution of the overloaded version of FlinkCustomStreamTransformation taking returnType: TypingResult.
Unfortunately, even if the produced variable is displayed as an instance of the correct type (see below)...
Here is the source of my transformer (the workaround using the dummy object is odd, but I don't really see how to proceed otherwise). Do you have any clue to help me fiw this issue ? :
public class FilterAndCastStreamTransformer<T> extends CustomStreamTransformer {
private static final Logger LOGGER = LoggerFactory.getLogger(FilterAndCastStreamTransformer.class);
private final Class<T> clazz;
public FilterAndCastStreamTransformer(Class<T> clazz) {
this.clazz = clazz;
}
@MethodToInvoke()
public FlinkCustomStreamTransformation transform(@ParamName("toCast") LazyParameter<Object> objectToCastParam) {
return FlinkCustomStreamTransformation.apply((start, ctx) -> {
ValueWithContext<Object> dummy = new ValueWithContext<Object>(null, null);
return start.map(ctx.lazyParameterHelper().lazyMapFunction(objectToCastParam), TypeInformation.of((Class<ValueWithContext<Object>>) dummy.getClass()))
.filter(new FilterFunction<ValueWithContext<Object>>() {
@Override
public boolean filter(ValueWithContext<Object> value) throws Exception {
return clazz.isInstance(value.value());
}
})
.map(new MapFunction<ValueWithContext<Object>, ValueWithContext<Object>>() {
@Override
public ValueWithContext<Object> map(ValueWithContext<Object> value) throws Exception {
return new ValueWithContext<>(value.value(), value.context());
}
}, TypeInformation.of((Class<ValueWithContext<Object>>) dummy.getClass()));
}, typing.Typed$.MODULE$.apply(clazz));
}
}