kafka: send messages with key via kafka_send_key function and kafka.send_key for...
authorVicente Hernando <vhernando@systemonenoc.com>
Mon, 18 May 2020 13:49:31 +0000 (15:49 +0200)
committerVicente Hernando <vhernando@systemonenoc.com>
Wed, 20 May 2020 11:58:49 +0000 (13:58 +0200)
src/modules/kafka/kafka_mod.c
src/modules/kafka/kfk.c
src/modules/kafka/kfk.h

index a9165da..c614cbf 100644 (file)
@@ -58,6 +58,7 @@ static void mod_destroy(void);
 static int child_init(int rank);
 static int fixup_kafka_send(void** param, int param_no);
 static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage);
+static int w_kafka_send_key(struct sip_msg *msg, char *ptopic, char *pmessage, char *pkey);
 
 /*
  * Variables and functions to deal with module parameters.
@@ -72,6 +73,8 @@ static int kafka_topic_param(modparam_t type, void *val);
 static cmd_export_t cmds[] = {
        {"kafka_send", (cmd_function)w_kafka_send, 2, fixup_kafka_send,
         0, ANY_ROUTE},
+       {"kafka_send_key", (cmd_function)w_kafka_send_key, 3, fixup_kafka_send,
+        0, ANY_ROUTE},
     { 0, 0, 0, 0, 0, 0}
 };
 
@@ -201,7 +204,7 @@ static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage)
                return -1;
        }
 
-       if (kfk_message_send(&s_topic, &s_message)) {
+       if (kfk_message_send(&s_topic, &s_message, NULL)) {
                LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
                           s_topic.len, s_topic.s,
                           s_message.len, s_message.s);
@@ -215,6 +218,74 @@ static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage)
 }
 
 /**
+ * \brief Send a message via Kafka plus key parameter.
+ */
+static int w_kafka_send_key(struct sip_msg *msg, char *ptopic, char *pmessage, char *pkey)
+{
+       str s_topic;
+
+       if (ptopic == NULL) {
+               LM_ERR("Invalid topic parameter\n");
+               return -1;
+       }
+
+       if (get_str_fparam(&s_topic, msg, (gparam_t*)ptopic)!=0) {
+               LM_ERR("No topic\n");
+               return -1;
+       }
+       if (s_topic.s == NULL || s_topic.len == 0) {
+               LM_ERR("Invalid topic string\n");
+               return -1;
+       }
+
+       str s_message;
+
+       if (pmessage == NULL) {
+               LM_ERR("Invalid message parameter\n");
+               return -1;
+       }
+
+       if (get_str_fparam(&s_message, msg, (gparam_t*)pmessage)!=0) {
+               LM_ERR("No message\n");
+               return -1;
+       }
+       if (s_message.s == NULL || s_message.len == 0) {
+               LM_ERR("Invalid message string\n");
+               return -1;
+       }
+
+       str s_key;
+
+       if (pkey == NULL) {
+               LM_ERR("Invalid key parameter\n");
+               return -1;
+       }
+
+       if (get_str_fparam(&s_key, msg, (gparam_t*)pkey)!=0) {
+               LM_ERR("No key\n");
+               return -1;
+       }
+       if (s_key.s == NULL || s_key.len == 0) {
+               LM_ERR("Invalid key string\n");
+               return -1;
+       }
+
+       if (kfk_message_send(&s_topic, &s_message, &s_key)) {
+               LM_ERR("Cannot send kafka (topic: %.*s) (key: %.*s) message: %.*s\n",
+                          s_topic.len, s_topic.s,
+                          s_key.len, s_key.s,
+                          s_message.len, s_message.s);
+               return -1;
+       }
+
+       LM_DBG("Message key sent (Topic: %.*s) (key: %.*s) : %.*s\n",
+                  s_topic.len, s_topic.s,
+                  s_key.len, s_key.s,
+                  s_message.len, s_message.s);
+       return 1;
+}
+
+/**
  * \brief KEMI function to send a Kafka message.
  */
 static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
@@ -229,7 +300,7 @@ static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
                return -1;
        }
 
-       if (kfk_message_send(s_topic, s_message)) {
+       if (kfk_message_send(s_topic, s_message, NULL)) {
                LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
                           s_topic->len, s_topic->s,
                           s_message->len, s_message->s);
@@ -243,6 +314,41 @@ static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
 }
 
 /**
+ * \brief KEMI function to send a Kafka message plus key.
+ */
+static int ki_kafka_send_key(struct sip_msg* msg, str *s_topic, str *s_message, str *s_key)
+{
+       if (s_topic == NULL || s_topic->s == NULL || s_topic->len == 0) {
+               LM_ERR("Invalid topic string\n");
+               return -1;
+       }
+
+       if (s_message == NULL || s_message->s == NULL || s_message->len == 0) {
+               LM_ERR("Invalid message string\n");
+               return -1;
+       }
+
+       if (s_key == NULL || s_key->s == NULL || s_key->len == 0) {
+               LM_ERR("Invalid key string\n");
+               return -1;
+       }
+
+       if (kfk_message_send(s_topic, s_message, s_key)) {
+               LM_ERR("Cannot send kafka (topic: %.*s) (key: %.*s) message: %.*s\n",
+                          s_topic->len, s_topic->s,
+                          s_key->len, s_key->s,
+                          s_message->len, s_message->s);
+               return -1;
+       }
+
+       LM_DBG("Message sent (Topic: %.*s) (key: %.*s) : %.*s\n",
+                  s_topic->len, s_topic->s,
+                  s_key->len, s_key->s,
+                  s_message->len, s_message->s);
+       return 1;
+}
+
+/**
  * \brief Kafka :: Array with KEMI functions
  */
 /* clang-format off */
@@ -252,6 +358,11 @@ static sr_kemi_t sr_kemi_kafka_exports[] = {
          { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
                SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
        },
+       { str_init("kafka"), str_init("send_key"),
+         SR_KEMIP_INT, ki_kafka_send_key,
+         { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
+               SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
+       },
 
        { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
 };
index 7f1dd8c..5553517 100644 (file)
@@ -822,10 +822,11 @@ clean:
  *
  * \param topic_name name of the topic
  * \param message message to send.
+ * \param key to send.
  *
  * \return 0 on success.
  */
-int kfk_message_send(str *topic_name, str *message)
+int kfk_message_send(str *topic_name, str *message, str *key)
 {
     /* Get topic from name. */
        rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
@@ -835,6 +836,15 @@ int kfk_message_send(str *topic_name, str *message)
                return -1;
        }
 
+       /* Default key values (No key) */
+       void *keyp = NULL;
+       size_t key_len = 0;
+       if (key != NULL && key->len > 0 && key->s != NULL) {
+               keyp = key->s;
+               key_len = key->len;
+               LM_DBG("Key: %.*s\n", (int)key_len, (char*)keyp);
+       }
+       
        /* Send a message. */
        if (rd_kafka_produce(
                        rkt,
@@ -844,7 +854,7 @@ int kfk_message_send(str *topic_name, str *message)
                        message->s,
                        message->len,
                        /* Optional key and its length */
-                       NULL, 0,
+                       keyp, key_len,
                        /* Message opaque, provided in
                         * delivery report callback as
                         * msg_opaque. */
index b2592dc..827d93d 100644 (file)
@@ -56,9 +56,13 @@ int kfk_topic_parse(char *spec);
 /**
  * \brief send a message to a topic.
  *
+ * \param topic_name name of the topic
+ * \param message message to send.
+ * \param key to send.
+ *
  * \return 0 on success.
  */
-int kfk_message_send(str *topic, str *message);
+int kfk_message_send(str *topic_name, str *message, str *key);
 
 /**
  * \brief Initialize statistics.