Metric already exists errors when deploying my process

288 views
Skip to first unread message

mathieu...@diginext.fr

unread,
Dec 3, 2020, 8:45:20 AM12/3/20
to Nussknacker
Hi everyone,

I'm (again ;)) needing your help.

Still working with NK 0.2.2, and Flink 1.9.1 (scala 2.12).

When I deploy my process built with Nussknacker, I can see a great number of warning messages of type
"Metric already exists" (allmost a hundred by metric) in the log entries of my task manager.

Here is the trace example for a given metric :
2020-12-03 14:26:32,448 WARN  org.apache.flink.runtime.metrics.MetricRegistryImpl           - Error while registering metric.
java.lang.IllegalArgumentException: A metric named demo.7a19923b8f7f.taskmanagerTask.18b7f678fa714e0420ef3df979de3abd.trip_tracking_process.Sink: Unnamed.0.numRecordsIn already exists
        at com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
        at org.apache.flink.dropwizard.ScheduledDropwizardReporter.notifyOfAddedMetric(ScheduledDropwizardReporter.java:131)
        at org.apache.flink.runtime.metrics.MetricRegistryImpl.register(MetricRegistryImpl.java:338)
        at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.addMetric(AbstractMetricGroup.java:396)
        at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:329)
        at org.apache.flink.runtime.metrics.groups.AbstractMetricGroup.counter(AbstractMetricGroup.java:319)
        at org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup.<init>(OperatorIOMetricGroup.java:40)
        at org.apache.flink.runtime.metrics.groups.OperatorMetricGroup.<init>(OperatorMetricGroup.java:48)
        at org.apache.flink.runtime.metrics.groups.TaskMetricGroup.getOrAddOperator(TaskMetricGroup.java:146)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.setup(AbstractStreamOperator.java:178)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.setup(AbstractUdfStreamOperator.java:82)
        at org.apache.flink.streaming.api.operators.SimpleOperatorFactory.createStreamOperator(SimpleOperatorFactory.java:75)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:428)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:418)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:373)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
        at java.lang.Thread.run(Thread.java:748)

On the other hand, there is no such warn message in the logs of the jobmanager.

Have you ever met these messages ? Could you help me please solving this issue ? Please let me know if you need some additional information in order to unbderstand what's going on.

Thanks a lot, and best regards.

--

Piotr Przybylski

unread,
Dec 3, 2020, 10:38:29 AM12/3/20
to Nussknacker
This also happens on regular Flink, you can safely filter it out. If you are using Logback then it can be done by adding Janino [1] to Flink's lib directory and using:

        <filter class="ch.qos.logback.core.filter.EvaluatorFilter">
            <evaluator>
                <expression>
                <![CDATA[
                return level == WARN
                    && logger.equals("org.apache.flink.runtime.metrics.MetricRegistryImpl")
                    && throwable != null
                    && throwable.getMessage() != null
                    && throwable.getMessage().contains("already exists");
                ]]>
                </expression>
            </evaluator>
            <OnMatch>DENY</OnMatch>
            <OnMismatch>NEUTRAL</OnMismatch>
        </filter>

Or an equivalent in your logging framework.

Reply all
Reply to author
Forward
0 new messages