Hi Michael,
I've included our sanitized definitions for this broker. The exchange most impacted is heka.logs. I ran this and found it odd:
[root@heka1-2a ~]# rabbitmq-diagnostics consistent_hash_exchange_ring_state heka.logs
Inspecting consistent hashing ring state for exchange heka.logs in virtual host '/'...
Error:
Exchange does not exist or is of a different type
This is a two node cluster and the other node shows the same error output.
As for runnable examples, we are using
Heka to ingest logs on application hosts that publishes them to rabbitmq. On our elasticsearch hosts, we have heka consuming those messages and putting them in elastic search. I think it was set up on 3.5.x or 3.6.x, so this is a relatively old to me setup that I inherited. I'm still trying to understand this plugin more and I know what it does at a high level, but I'm still learning what it does underneath the hood.
This is the Heka config on the various application hosts. The url uses a round robin CNAME that will resolve to either node:
[RabbitmqLogsOutput]
type = 'AMQPOutput'
encoder = 'ProtobufEncoder'
exchange = 'heka.logs'
exchange_auto_delete = false
exchange_durability = true
exchange_type = 'x-consistent-hash'
message_matcher = '(Logger != "hekad" && Type != "sandbox.output" && Type != "stats" && Type != "heka.sandbox.stats") || Fields[http_user_agent] == NIL) && (Fields[programname] !~ /puppet-agent|xinetd/ || Fields[programname] == NIL)'
persistent = true
url = 'amqps://[REDACTED]@heka.las:5671/heka'
use_buffering = true
This is the Heka config on the elasticsearch hosts, there are actually two config files, one for each rabbitmq node. They're identical except the other node uses logs-es0.las-b for the queue, and the url in both use specific node names instead of the CNAME. There are three of these ES hosts, so the queue name referenced in each one is specific to the host but the all follow the same scheme, logs-es[012].las-[ab]:
[RabbitmqLogsInput-Heka1-2a]
type = 'AMQPInput'
can_exit = true
decoder = 'ProtobufGeoIp2MultiDecoder'
exchange = 'heka.logs'
exchange_auto_delete = false
exchange_durability = true
exchange_type = 'x-consistent-hash'
prefetch_count = 100
queue = 'logs-es0.las-a'
queue_auto_delete = true
queue_durability = false
queue_ttl = 1200000
routing_key = '224'
splitter = 'HekaFramingSplitter'
url = 'amqps://[REDACTED]@heka1-2a.las:5671/heka'
The elasticsearch hosts also use this DLX config that uses the CNAME. And like the config above, uses the queue name specific to each host:
[RabbitmqDlxLogsInput]
type = 'AMQPInput'
can_exit = true
decoder = 'ProtobufGeoIp2MultiDecoder'
exchange = 'heka.dlx.logs'
exchange_auto_delete = false
exchange_durability = true
exchange_type = 'x-consistent-hash'
prefetch_count = 100
queue = 'dlx-logs-es0.las'
queue_auto_delete = true
queue_durability = false
queue_ttl = 1200000
routing_key = '224'
splitter = 'HekaFramingSplitter'
url = 'amqps://[REDACTED]@heka.las:5671/heka'
Thank you,
-J