Hi All,
We have recently switched to faust-streaming(0.6.9) from faust 1.10.4. Post this we have seen the applications crashing with the below exception. The application has multiple layers with aggregation and filtering of data at each stage. At each stage, the processor sends the message to Kafka topic and the respective faust app agent consumes the message. But we have kept the partition count the same, for the Kafka topic, at each layer.
- Cluster Size = 12
- Topic & Table Parition count = 36
- faust-streaming version = 0.6.9
- kafka-python version = 2.0.2
[2021-07-29 10:05:23,761] [18808] [ERROR] [^---Fetcher]: Crashed reason=AssertionError(‘Partition is not assigned’)
Traceback (most recent call last):
File “/usr/local/lib/python3.8/site-packages/mode/services.py”, line 802, in _execute_task
await task
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 176, in _fetcher
await consumer._drain_messages(self)
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 1104, in _drain_messages
async for tp, message in ait:
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 714, in getmany
highwater_mark = self.highwater(tp)
File “/usr/local/lib/python3.8/site-packages/faust/transport/consumer.py”, line 1367, in highwater
return self._thread.highwater(tp)
File “/usr/local/lib/python3.8/site-packages/faust/transport/drivers/aiokafka.py”, line 923, in highwater
return self._ensure_consumer().highwater(tp)
File “/usr/local/lib/python3.8/site-packages/aiokafka/consumer/consumer.py”, line 673, in highwater
assert self._subscription.is_assigned(partition), \
AssertionError: Partition is not assigned