Skip to content

2 races conditions #281

@daedric

Description

Hi,
I may have found 2 races conditions:

  • when broker thread are created, they do not increase the ref counter on the broker so on exit, a thread can find itself using freed data.
    Basically such a patch would be enough:
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c
index 07a579a..d6a711e 100644
--- a/src/rdkafka_broker.c
+++ b/src/rdkafka_broker.c
@@ -4118,7 +4118,10 @@ static void rd_kafka_broker_consumer_serve (rd_kafka_broker_t *rkb) {

 static void *rd_kafka_broker_thread_main (void *arg) {
        rd_kafka_broker_t *rkb = arg;
+       rd_kafka_broker_keep(rkb);
+
        rd_kafka_t *rk = rkb->rkb_rk;
+       rd_kafka_keep(rk);

        (void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);

@@ -4166,6 +4169,9 @@ static void *rd_kafka_broker_thread_main (void *arg) {

        (void)rd_atomic_sub(&rd_kafka_thread_cnt_curr, 1);

+       rd_kafka_destroy0(rk);
+       rd_kafka_broker_destroy(rkb);
+
        return NULL;
 }
  • On mac os X, sometimes, on producer creation an assert was triggered because it was testing if rkb_thread was equal to phtread_self BEFORE pthread_create set the value. I did a dirty fix I'm not happy with so I would like to know what you can propose :)
    Here is my (crappy) patch:
diff --git a/src/rdkafka.c b/src/rdkafka.c
index ac867c8..1f7be84 100644
--- a/src/rdkafka.c
+++ b/src/rdkafka.c
@@ -1179,6 +1179,10 @@ static void rd_kafka_metadata_refresh_cb (rd_kafka_t *rk, void *arg) {
  */
 static void *rd_kafka_thread_main (void *arg) {
        rd_kafka_t *rk = arg;
+
+       while (rk->rk_thread != pthread_self());
+
+
        rd_kafka_timer_t tmr_topic_scan = {};
        rd_kafka_timer_t tmr_stats_emit = {};
        rd_kafka_timer_t tmr_metadata_refresh = {};
diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c
index d6a711e..f3f61ec 100644
--- a/src/rdkafka_broker.c
+++ b/src/rdkafka_broker.c
@@ -4123,6 +4123,8 @@ static void *rd_kafka_broker_thread_main (void *arg) {
        rd_kafka_t *rk = rkb->rkb_rk;
        rd_kafka_keep(rk);

+       while (rkb->rkb_thread != pthread_self());
+
        (void)rd_atomic_add(&rd_kafka_thread_cnt_curr, 1);

        rd_rkb_dbg(rkb, BROKER, "BRKMAIN", "Enter main broker thread");
diff --git a/src/rdthread.c b/src/rdthread.c
index 011ac26..9ad554e 100644
--- a/src/rdthread.c
+++ b/src/rdthread.c
@@ -93,6 +93,8 @@ void rd_thread_dispatch (void) {
 static void *rd_thread_start_routine (void *arg) {
        rd_thread_t *rdt = arg;
        void *ret;
+
+    while (rdt->rdt_thread != phtread_self());

        /* By default with block the user-defined signals. */
        rd_thread_sigmask(SIG_BLOCK, SIGUSR1, SIGUSR2, RD_SIG_END);

I'm looking forward your comments !
Thank you,
Cheers,

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions