$md = new RdKafka\Metadata();
$md->addBrokers("somehost:9092");
$topics=$md->getTopics();function kafkaGetTopics(){
$pk = new RdKafka\Producer();
$pk->addBrokers("servername:9092");
$conf = new RdKafka\Conf();
// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'yourspecialidgoeshere');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("servername:9092");
$sources = [$pk,$rk];
$topics = [];
foreach ($sources as $rdk) {
$rdk->setLogLevel(LOG_INFO);
$meta = $rdk->getMetadata(TRUE, null, 100);
$rkTopics = $meta->getTopics();
foreach ($rkTopics as $topic) {
$topics[$topic->getTopic()] = TRUE;
}
}
$keys = array_keys($topics);
$topics=array();
foreach($keys as $i=>$topic){
if(preg_match('/^\_\_/',$topic)){continue;}
$topics[]=$topic;
}
sort($topics);
return $topics;
}