-
-
Notifications
You must be signed in to change notification settings - Fork 267
php core dump from ext-rdkafka #275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
@daydaygo can you provide more information on what actually led to this core dump? thx in advance |
private function getConsumer()
{
if ($this->consumer) {
return $this->consumer;
}
$conf = new \RdKafka\Conf();
//$conf->set("security.protocol", "SASL_SSL");
$conf->set("sasl.mechanisms", "PLAIN");
if (Config::get('kafka.username')) {
$conf->set("sasl.username", Config::get('kafka.username'));
$conf->set("sasl.password", Config::get('kafka.password'));
}
$conf->set("broker.version.fallback", "0.10.0.0");
$conf->set("ssl.ca.location", dirname(__FILE__) . "./cert.pem");
// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
log_message("[KAFAK-CONSUMER] Assign:" . json_encode($partitions), LOG_INFO);
echo "Assign: ";
var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
#echo "Revoke: ";
#var_dump($partitions);
log_message("[KAFAK-CONSUMER] Revoke:" . json_encode($partitions), LOG_INFO);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', $this->options['group_id']);
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', Config::get('kafka.addrs'));
$topicConf = new \RdKafka\TopicConf();
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
$topicConf->set('auto.offset.reset', 'smallest');
//$topicConf->set("auto.commit.interval.ms", 1e3);
//$topicConf->set("offset.store.sync.interval.ms", 60e3);
//$topicConf->set("offset.store.path", dirname(__FILE__) . '/offsets/');
// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
#$consumer->setLogLevel(LOG_DEBUG);
$topics = $this->options['topics'];
if (!is_array($topics)) {
$topics = [ $topics ];
}
$consumer->subscribe($topics);
$this->consumer = $consumer;
return $this->consumer;
}
public function consume($handler)
{
$consumer = $this->getConsumer();
if (!$consumer) {
error_report("[KAFKA-CONSUMER] get consumer[{$this->options['group_id']}] error");
return;
}
echo "Waiting for partition assignment... (make take some time when\n";
echo "quickly re-joining the group after leaving it.)\n";
$break_loop = FALSE;
while (true) {
$message = $consumer->consume($this->options['timeout']);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
$handler($message);
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break
;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
if ($this->options['quit_when_timeout']) {
$break_loop = TRUE;
break;
}
break;
default:
throw new \Exception($message->errstr(), $message->err);
break;
}
if ($break_loop) {
break;
}
}
try {
$consumer->commit();
} catch (\Exception $e) {}
} |
ok so i see in the trace that it has happening during the destruct of the consumer. |
@nick-zh i got |
@daydaygo no it's ok, i think i know what the problem is, disregard my previous message, we fixed this for the low level consumer, i see that we still have this behaviour in the high level consumer, i will try to provide a fix very soon. |
use
gdb php /path/to/core.php
, then usebt
for backtraceThe text was updated successfully, but these errors were encountered: