kafka: send messages with key via kafka_send_key function and kafka.send_key for...
[sip-router] / src / modules / kafka / kfk.c
1 /*
2  * Copyright (C) 2019 Vicente Hernando (Sonoc https://www.sonoc.io)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * Kamailio is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  * Kamailio is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20  */
21
22 /**
23  * \file
24  * \brief Kafka :: Apache Kafka functions via librdkafka
25  * \ingroup kfk
26  *
27  * - Module: \ref kfk
28  */
29
30 #include <syslog.h> /* For log levels. */
31 #include <librdkafka/rdkafka.h>
32
33 #include "../../core/dprint.h"
34 #include "../../core/parser/parse_param.h"
35 #include "../../core/mem/pkg.h"
36 #include "../../core/mem/shm_mem.h"
37 #include "../../core/locking.h"
38
39 /**
40  * \brief data type for a configuration property.
41  */
42 typedef struct kfk_conf_node_s {
43         str *sname; /**< name of property */
44         str *svalue; /**< value of property */
45         struct kfk_conf_node_s *next; /**< next property in list */
46 } kfk_conf_node_t;
47
48 /**
49  * \brief list of configuration properties.
50  */
51 typedef struct kfk_conf_s {
52         param_t *attrs; /**< parsed attributes from configuration parameter. */
53         char *spec; /**< original string of configuration. */
54         kfk_conf_node_t *property; /**< list of configuration properties. */
55 } kfk_conf_t;
56
57 /**
58  * \brief data type for a topic.
59  *
60  * This is an element in a topic list.
61  */
62 typedef struct kfk_topic_s {
63         str *topic_name; /**< Name of the topic. */
64         rd_kafka_topic_t *rd_topic; /**< rd kafkfa topic structure. */
65         param_t *attrs; /**< parsed attributes for topic configuration. */
66         char *spec; /**< original string for topic configuration. */
67         kfk_conf_node_t *property; /**< list of configuration properties for a topic. */
68         struct kfk_topic_s *next; /**< Next element in topic list. */
69 } kfk_topic_t;
70
71 /**
72  * \brief stats about a topic.
73  */
74 typedef struct kfk_stats_s {
75         str *topic_name; /**< Name of the topic, or NULL for general statistics. */
76         uint64_t total; /**< Total number of messages sent. */
77         uint64_t error; /**< Number of failed messages to sent. */
78         struct kfk_stats_s *next; /**< Next element in stats list. */
79 } kfk_stats_t;
80
81 /* Static variables. */
82 static rd_kafka_conf_t *rk_conf = NULL;  /* Configuration object */
83 static rd_kafka_t *rk = NULL; /* Producer instance handle */
84 static kfk_conf_t *kfk_conf = NULL; /* List for Kafka configuration properties. */
85 static kfk_topic_t *kfk_topic = NULL; /* List for Kafka topics. */
86
87 #define ERRSTR_LEN 512 /**< Length of internal buffer for errors. */
88 static char errstr[ERRSTR_LEN]; /* librdkafka API error reporting buffer */
89 gen_lock_t *stats_lock = NULL; /**< Lock to protect shared statistics data. */
90
91 /**
92  * \brief Total statistics
93  *
94  * First node (mandatory) is the general one with NULL topic.
95  * Next nodes are topic dependant ones and are optional.
96  * This way because general node is created in kfk_stats_init in mod_init is
97  * shared among every Kamailio process.
98  */
99 static kfk_stats_t *stats_general;
100
101 /* Static functions. */
102 static void kfk_conf_free(kfk_conf_t *kconf);
103 static void kfk_topic_free(kfk_topic_t *ktopic);
104 static int kfk_conf_configure();
105 static int kfk_topic_list_configure();
106 static int kfk_topic_exist(str *topic_name);
107 static rd_kafka_topic_t* kfk_topic_get(str *topic_name);
108 static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t err);
109 static void kfk_stats_topic_free(kfk_stats_t *st_topic);
110
111 /**
112  * \brief Kafka logger callback
113  */
114 static void kfk_logger (const rd_kafka_t *rk, int level,
115                     const char *fac, const char *buf) {
116
117         switch(level) {
118                 case LOG_EMERG:
119                         LM_NPRL("RDKAFKA fac: %s : %s : %s\n",
120                                         fac, rk ? rd_kafka_name(rk) : NULL,
121                                         buf);
122                         break;
123                         
124                 case LOG_ALERT:
125                         LM_ALERT("RDKAFKA fac: %s : %s : %s\n",
126                                          fac, rk ? rd_kafka_name(rk) : NULL,
127                                          buf);
128                         break;
129                         
130                 case LOG_CRIT:
131                         LM_CRIT("RDKAFKA fac: %s : %s : %s\n",
132                                         fac, rk ? rd_kafka_name(rk) : NULL,
133                                         buf);
134                         break;
135
136                 case LOG_ERR:
137                         LM_ERR("RDKAFKA fac: %s : %s : %s\n",
138                                    fac, rk ? rd_kafka_name(rk) : NULL,
139                                    buf);
140                         break;
141
142                 case LOG_WARNING:
143                         LM_WARN("RDKAFKA fac: %s : %s : %s\n",
144                                         fac, rk ? rd_kafka_name(rk) : NULL,
145                                         buf);
146                         break;
147
148                 case LOG_NOTICE:
149                         LM_NOTICE("RDKAFKA fac: %s : %s : %s\n",
150                                         fac, rk ? rd_kafka_name(rk) : NULL,
151                                         buf);
152                         break;
153                         
154                 case LOG_INFO:
155                         LM_INFO("RDKAFKA fac: %s : %s : %s\n",
156                                         fac, rk ? rd_kafka_name(rk) : NULL,
157                                         buf);
158                         break;
159
160                 case LOG_DEBUG:
161                         LM_DBG("RDKAFKA fac: %s : %s : %s\n",
162                                    fac, rk ? rd_kafka_name(rk) : NULL,
163                                    buf);
164                         break;
165
166                 default:
167                         LM_ERR("Unsupported kafka log level: %d\n", level);
168                         break;
169         }
170 }
171
172 /**
173  * \brief Message delivery report callback using the richer rd_kafka_message_t object.
174  */
175 static void kfk_msg_delivered (rd_kafka_t *rk,
176                                                            const rd_kafka_message_t *rkmessage, void *opaque) {
177
178         LM_DBG("Message delivered callback\n");
179         
180         const char *topic_name = NULL;
181         topic_name = rd_kafka_topic_name(rkmessage->rkt);
182         if (!topic_name) {
183                 LM_ERR("Cannot get topic name for delivered message\n");
184                 return;
185         }
186         
187         kfk_stats_add(topic_name, rkmessage->err);
188         
189         if (rkmessage->err) {
190                 LM_ERR("RDKAFKA Message delivery failed: %s\n",
191                            rd_kafka_err2str(rkmessage->err));
192         } else {
193                 LM_DBG("RDKAFKA Message delivered (%zd bytes, offset %"PRId64", "
194                            "partition %"PRId32"): %.*s\n",
195                            rkmessage->len, rkmessage->offset,
196                            rkmessage->partition,
197                            (int)rkmessage->len, (const char *)rkmessage->payload);
198         }
199 }
200
201 /**
202  * \brief Initialize kafka functionality.
203  *
204  * \param brokers brokers to add.
205  * \return 0 on success.
206  */
207 int kfk_init(char *brokers)
208 {
209         LM_DBG("Initializing Kafka\n");
210
211         if (brokers == NULL) {
212                 LM_ERR("brokers parameter not set\n");
213                 return -1;
214         }
215         
216         /*
217          * Create Kafka client configuration place-holder
218          */
219         rk_conf = rd_kafka_conf_new();
220
221         /* Set logger */
222         rd_kafka_conf_set_log_cb(rk_conf, kfk_logger);
223
224         /* Set message delivery callback. */
225         rd_kafka_conf_set_dr_msg_cb(rk_conf, kfk_msg_delivered);
226
227         /* Configure properties: */
228         if (kfk_conf_configure()) {
229                 LM_ERR("Failed to configure general properties\n");
230                 return -1;
231         }
232
233         /*
234          * Create producer instance.
235          *
236          * NOTE: rd_kafka_new() takes ownership of the conf object
237          *       and the application must not reference it again after
238          *       this call.
239          */
240         rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
241         if (!rk) {
242                 LM_ERR("Failed to create new producer: %s\n", errstr);
243                 return -1;
244         }
245         rk_conf = NULL; /* Now owned by producer. */
246         LM_DBG("Producer handle created\n");
247
248         LM_DBG("Adding broker: %s\n", brokers);
249         /* Add brokers */
250         if (rd_kafka_brokers_add(rk, brokers) == 0) {
251                 LM_ERR("No valid brokers specified: %s\n", brokers);
252                 return -1;
253         }
254         LM_DBG("Added broker: %s\n", brokers);
255
256         /* Topic creation and configuration. */
257         if (kfk_topic_list_configure()) {
258                 LM_ERR("Failed to configure topics\n");
259                 return -1;
260         }
261
262         return 0;
263 }
264
265 /**
266  * \brief Close kafka related functionality.
267  */
268 void kfk_close()
269 {
270         rd_kafka_resp_err_t err;
271         
272         LM_DBG("Closing Kafka\n");
273
274     /* Destroy the producer instance */
275         if (rk) {
276                 /* Flushing messages. */
277                 LM_DBG("Flushing messages\n");
278                 err = rd_kafka_flush(rk, 0);
279                 if (err) {
280                         LM_ERR("Failed to flush messages: %s\n", rd_kafka_err2str(err));
281                 }
282
283                 /* Destroy producer. */
284                 LM_DBG("Destroying instance of Kafka producer\n");
285                 rd_kafka_destroy(rk);
286         }
287
288         /* Destroy configuration if not freed by rd_kafka_destroy. */
289         if (rk_conf) {
290                 LM_DBG("Destroying instance of Kafka configuration\n");
291                 rd_kafka_conf_destroy(rk_conf);
292         }
293
294         /* Free list of configuration properties. */
295         if (kfk_conf) {
296                 kfk_conf_free(kfk_conf);
297         }
298
299         /* Free list of topics. */
300         while (kfk_topic) {
301                 kfk_topic_t *next = kfk_topic->next;
302                 kfk_topic_free(kfk_topic);
303                 kfk_topic = next;
304         }
305 }
306
307 /**
308  * \brief Free a general configuration object.
309  */
310 static void kfk_conf_free(kfk_conf_t *kconf)
311 {
312         if (kconf == NULL) {
313                 /* Nothing to free. */
314                 return;
315         }
316
317         kfk_conf_node_t *knode = kconf->property;
318         while (knode) {
319                 kfk_conf_node_t *next = knode->next;
320                 pkg_free(knode);
321                 knode = next;
322         }
323
324         free_params(kconf->attrs);
325         pkg_free(kconf);
326 }
327
328 /**
329  * \brief Parse general configuration properties for Kafka.
330  */
331 int kfk_conf_parse(char *spec)
332 {
333         param_t *pit = NULL;
334         param_hooks_t phooks;
335         kfk_conf_t *kconf = NULL;
336
337         if (kfk_conf != NULL) {
338                 LM_ERR("Configuration already set\n");
339                 goto error;
340         }
341         
342         str s;
343         s.s = spec;
344         s.len = strlen(spec);
345         if(s.s[s.len-1]==';') {
346                 s.len--;
347         }
348         if (parse_params(&s, CLASS_ANY, &phooks, &pit) < 0) {
349                 LM_ERR("Failed parsing params value\n");
350                 goto error;
351         }
352
353         kconf = (kfk_conf_t*)pkg_malloc(sizeof(kfk_conf_t));
354         if (kconf == NULL) {
355                 LM_ERR("No more pkg memory\n");
356                 goto error;
357         }
358         memset(kconf, 0, sizeof(kfk_conf_t));
359         kconf->attrs = pit;
360         kconf->spec = spec;
361         for (pit = kconf->attrs; pit; pit=pit->next)
362         {
363                 /* Parse a property. */
364                 kfk_conf_node_t *knode = NULL;
365                 knode = (kfk_conf_node_t*)pkg_malloc(sizeof(kfk_conf_node_t));
366                 if (knode == NULL) {
367                         LM_ERR("No more pkg memory\n");
368                         goto error;
369                 }
370                 memset(knode, 0, sizeof(kfk_conf_node_t));
371                 
372                 knode->sname = &pit->name;
373                 knode->svalue = &pit->body;
374                 if (knode->sname && knode->svalue) {
375                         LM_DBG("Parsed property: %.*s -> %.*s\n",
376                                    knode->sname->len, knode->sname->s,
377                                    knode->svalue->len, knode->svalue->s);
378                 }
379
380                 /* Place node at beginning of knode list. */
381             knode->next = kconf->property;
382                 kconf->property = knode;
383         } /* for pit */
384
385         kfk_conf = kconf;
386         return 0;
387
388 error:
389         if(pit!=NULL) {
390                 free_params(pit);
391         }
392
393         if(kconf != NULL) {
394                 kfk_conf_free(kconf);
395         }
396         return -1;
397 }
398
399 /**
400  * \brief Configure Kafka properties.
401  *
402  * \return 0 on success. 
403  */
404 static int kfk_conf_configure()
405 {
406         if (kfk_conf == NULL) {
407                 /* Nothing to configure. */
408                 LM_DBG("No properties to configure\n");
409                 return 0;
410         }
411
412         LM_DBG("Configuring properties\n");
413         
414         kfk_conf_node_t *knode = kfk_conf->property;
415         while (knode) {
416                 kfk_conf_node_t *next = knode->next;
417                 str *sname = knode->sname;
418                 str *svalue = knode->svalue;
419                 knode = next;
420                 
421                 if (sname == NULL || sname->len == 0 || sname->s == NULL) {
422                         LM_ERR("Bad name in configuration property\n");
423                         continue;
424                 }
425
426                 if (svalue == NULL || svalue->len == 0 || svalue->s == NULL) {
427                         LM_ERR("Bad value in configuration property\n");
428                         continue;
429                 }
430
431                 /* We temporarily convert to zstring. */
432                 char cname = sname->s[sname->len];
433                 sname->s[sname->len] = '\0';
434                 char cvalue = svalue->s[svalue->len];
435                 svalue->s[svalue->len] = '\0';
436                 
437                 LM_DBG("Setting property: %s -> %s\n", sname->s, svalue->s);
438                 
439                 if (rd_kafka_conf_set(rk_conf, sname->s, svalue->s,
440                                                           errstr, sizeof(errstr)) !=
441                         RD_KAFKA_CONF_OK) {
442                         LM_ERR("Configuration failed: %s\n", errstr);
443
444                         /* We restore zstrings back to str */
445                         sname->s[sname->len] = cname;
446                         svalue->s[svalue->len] = cvalue;
447                         return -1;
448                 }
449
450                 /* We restore zstrings back to str */
451                 sname->s[sname->len] = cname;
452                 svalue->s[svalue->len] = cvalue;
453
454         } /* while knode */
455         
456         return 0;
457 }
458
459 /**
460  * \brief Free a topic object.
461  */
462 static void kfk_topic_free(kfk_topic_t *ktopic)
463 {
464         if (ktopic == NULL) {
465                 /* Nothing to free. */
466                 return;
467         }
468
469         kfk_conf_node_t *knode = ktopic->property;
470         while (knode) {
471                 kfk_conf_node_t *next = knode->next;
472                 pkg_free(knode);
473                 knode = next;
474         }
475
476         /* Destroy rd Kafka topic. */
477         if (ktopic->rd_topic) {
478                 rd_kafka_topic_destroy(ktopic->rd_topic);
479         }
480         
481         free_params(ktopic->attrs);
482         pkg_free(ktopic);
483 }
484
485 /**
486  * \brief Parse topic properties for Kafka.
487  */
488 int kfk_topic_parse(char *spec)
489 {
490         param_t *pit = NULL;
491         param_hooks_t phooks;
492         kfk_topic_t *ktopic = NULL;
493
494         str s;
495         s.s = spec;
496         s.len = strlen(spec);
497         if(s.s[s.len-1]==';') {
498                 s.len--;
499         }
500         if (parse_params(&s, CLASS_ANY, &phooks, &pit) < 0) {
501                 LM_ERR("Failed parsing params value\n");
502                 goto error;
503         }
504
505         ktopic = (kfk_topic_t*)pkg_malloc(sizeof(kfk_topic_t));
506         if (ktopic == NULL) {
507                 LM_ERR("No more pkg memory\n");
508                 goto error;
509         }
510         memset(ktopic, 0, sizeof(kfk_topic_t));
511         ktopic->attrs = pit;
512         ktopic->spec = spec;
513         for (pit = ktopic->attrs; pit; pit=pit->next)
514         {
515                 /* Check for topic name. */
516                 if (pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
517                         if (ktopic->topic_name != NULL) {
518                                 LM_ERR("Topic name already set\n");
519                                 goto error;
520                         }
521                         ktopic->topic_name = &pit->body;
522                         LM_DBG("Topic name: %.*s\n", pit->body.len, pit->body.s);
523                         
524                 } else {
525
526                         /* Parse a property. */
527                         kfk_conf_node_t *knode = NULL;
528                         knode = (kfk_conf_node_t*)pkg_malloc(sizeof(kfk_conf_node_t));
529                         if (knode == NULL) {
530                                 LM_ERR("No more pkg memory\n");
531                                 goto error;
532                         }
533                         memset(knode, 0, sizeof(kfk_conf_node_t));
534                         
535                         knode->sname = &pit->name;
536                         knode->svalue = &pit->body;
537                         if (knode->sname && knode->svalue) {
538                                 LM_DBG("Topic parsed property: %.*s -> %.*s\n",
539                                            knode->sname->len, knode->sname->s,
540                                            knode->svalue->len, knode->svalue->s);
541                         }
542                         
543                         /* Place node at beginning of ktopic list. */
544                         knode->next = ktopic->property;
545                         ktopic->property = knode;
546                 } /* if pit->name.len == 4 */
547         } /* for pit */
548
549         /* Topic name is mandatory. */
550         if(ktopic->topic_name == NULL)
551         {
552                 LM_ERR("No topic name\n");
553                 goto error;
554         }
555
556         /* Place topic at beginning of topic list. */
557         ktopic->next = kfk_topic;
558         kfk_topic = ktopic;
559         return 0;
560
561 error:
562         if(pit!=NULL) {
563                 free_params(pit);
564         }
565
566         if(ktopic != NULL) {
567                 kfk_topic_free(ktopic);
568         }
569         return -1;
570 }
571
572 /**
573  * \brief Create and configure a topic.
574  *
575  * \return 0 on success.
576  */
577 static int kfk_topic_configure(kfk_topic_t *ktopic)
578 {
579         rd_kafka_topic_conf_t *topic_conf = NULL;
580         rd_kafka_topic_t *rkt = NULL;
581         
582         if (ktopic == NULL) {
583                 LM_ERR("No topic to create\n");
584                 goto error;
585         }
586         
587         /* Check topic name. */
588         if (!ktopic->topic_name || !ktopic->topic_name->s || ktopic->topic_name->len == 0) {
589                 LM_ERR("Bad topic name\n");
590                 goto error;
591         }
592
593         int topic_found = kfk_topic_exist(ktopic->topic_name);
594         if (topic_found == -1) {
595                 LM_ERR("Failed to search for topic %.*s in cluster\n",
596                            ktopic->topic_name->len, ktopic->topic_name->s);
597                 goto error;
598         } else if (topic_found == 0) {
599                 LM_ERR("Topic not found %.*s in cluster\n",
600                            ktopic->topic_name->len, ktopic->topic_name->s);
601                 goto error;
602         }
603         
604         LM_DBG("Creating topic: %.*s\n",
605                    ktopic->topic_name->len, ktopic->topic_name->s);
606
607         /* Topic configuration */
608
609         topic_conf = rd_kafka_topic_conf_new();
610
611         kfk_conf_node_t *knode = kfk_topic->property;
612         while (knode) {
613                 kfk_conf_node_t *next = knode->next;
614                 str *sname = knode->sname;
615                 str *svalue = knode->svalue;
616                 knode = next;
617                 
618                 if (sname == NULL || sname->len == 0 || sname->s == NULL) {
619                         LM_ERR("Bad name in topic configuration property\n");
620                         continue;
621                 }
622
623                 if (svalue == NULL || svalue->len == 0 || svalue->s == NULL) {
624                         LM_ERR("Bad value in topic configuration property\n");
625                         continue;
626                 }
627
628                 /* We temporarily convert to zstring. */
629                 char cname = sname->s[sname->len];
630                 sname->s[sname->len] = '\0';
631                 char cvalue = svalue->s[svalue->len];
632                 svalue->s[svalue->len] = '\0';
633                 
634                 LM_DBG("Setting topic property: %s -> %s\n", sname->s, svalue->s);
635
636                 rd_kafka_conf_res_t res;
637                 res = rd_kafka_topic_conf_set(topic_conf, sname->s, svalue->s,
638                                                                           errstr, sizeof(errstr));
639                 if (res != RD_KAFKA_CONF_OK) {
640                         LM_ERR("Failed to set topic configuration: %s -> %s\n",
641                                    sname->s, svalue->s);
642
643                         /* We restore zstrings back to str */
644                         sname->s[sname->len] = cname;
645                         svalue->s[svalue->len] = cvalue;
646
647                         goto error;
648                 }
649
650                 /* We restore zstrings back to str */
651                 sname->s[sname->len] = cname;
652                 svalue->s[svalue->len] = cvalue;
653
654         } /* while knode */
655
656         /* We temporarily convert to zstring. */
657         char c_topic_name = ktopic->topic_name->s[ktopic->topic_name->len];
658         ktopic->topic_name->s[ktopic->topic_name->len] = '\0';
659
660         rkt = rd_kafka_topic_new(rk, ktopic->topic_name->s, topic_conf);
661         if (!rkt) {
662                 LM_ERR("Failed to create topic (%s): %s\n",
663                            ktopic->topic_name->s,
664                            rd_kafka_err2str(rd_kafka_last_error()));
665
666                 /* We restore zstrings back to str */
667                 ktopic->topic_name->s[ktopic->topic_name->len] = c_topic_name;
668
669                 goto error;
670         }
671         topic_conf = NULL; /* Now owned by topic */
672         LM_DBG("Topic created: %s\n", ktopic->topic_name->s);
673         
674         /* We restore zstrings back to str */
675         ktopic->topic_name->s[ktopic->topic_name->len] = c_topic_name;
676
677         /* Everything went fine. */
678         ktopic->rd_topic = rkt;
679         return 0;
680
681 error:
682
683         /* Destroy topic configuration. */
684         if (topic_conf) {
685                 rd_kafka_topic_conf_destroy(topic_conf);
686         }
687
688         /* Destroy topic */
689         if (rkt) {
690                 LM_DBG("Destroying topic\n");
691                 rd_kafka_topic_destroy(rkt);
692         }
693
694         return -1;
695 }
696         
697 /**
698  * \brief Create and configure a list of topics.
699  *
700  * \return 0 on success.
701  */
702 static int kfk_topic_list_configure()
703 {
704         kfk_topic_t *ktopic = kfk_topic;
705
706         while (ktopic) {
707                 kfk_topic_t *next = ktopic->next;
708                 /* Create current topic. */
709                 if (kfk_topic_configure(ktopic)) {
710                         LM_ERR("Failed to create topic: %.*s\n",
711                                    ktopic->topic_name->len, ktopic->topic_name->s);
712                         return -1;
713                 }
714                 ktopic = next;
715         }
716
717         return 0;
718 }
719
720 /* -1 means RD_POLL_INFINITE */
721 /* 100000 means 100 seconds */
722 #define METADATA_TIMEOUT 100000 /**< Timeout when asking for metadata in milliseconds. */
723
724 /**
725  * \brief check that a topic exists in cluster.
726  *
727  * \return 0 if topic does not exist.
728  * \return 1 if topic does exist.
729  * \return -1 on error.
730  */
731 static int kfk_topic_exist(str *topic_name)
732 {
733         /* Where to receive metadata. */
734         const struct rd_kafka_metadata *metadatap = NULL;
735         int i;
736         int topic_found = 0; /* Topic not found by default. */
737
738         if (!topic_name || topic_name->len == 0 || topic_name->s == NULL) {
739                 LM_ERR("Bad topic name\n");
740                 goto error;
741         }
742
743         /* Get metadata for all topics. */
744         rd_kafka_resp_err_t res;
745         res = rd_kafka_metadata(rk, 1, NULL, &metadatap, METADATA_TIMEOUT);
746         if (res != RD_KAFKA_RESP_ERR_NO_ERROR) {
747                 LM_ERR("Failed to get metadata: %s\n", rd_kafka_err2str(res));
748                 goto error;
749         }
750
751         /* List topics */
752         for (i=0; i<metadatap->topic_cnt; i++) {
753                 rd_kafka_metadata_topic_t *t = &metadatap->topics[i];
754                 if (t->topic) {
755                         LM_DBG("Metadata Topic: %s\n", t->topic);
756                         if (strncmp(topic_name->s, t->topic, topic_name->len) == 0) {
757                                 topic_found = 1;
758                                 LM_DBG("Metadata Topic (%s) found!\n", t->topic);
759                                 break;
760                         }
761                 }
762         } // for (i=0; i<m->topic_cnt; i++)
763
764         /* Destroy metadata. */
765         rd_kafka_metadata_destroy(metadatap);
766
767         if (topic_found == 0) {
768                 LM_DBG("Topic not found: %.*s\n", topic_name->len, topic_name->s);
769                 return 0;
770         }
771
772         LM_DBG("Topic found: %.*s\n", topic_name->len, topic_name->s);
773         return 1;
774         
775 error:
776
777         /* Destroy metadata. */
778         if (metadatap) {
779                 rd_kafka_metadata_destroy(metadatap);
780         }
781         
782         return -1;
783 }
784
785 /**
786  * \brief get a topic based on its name.
787  *
788  * \return topic if it founds a matching one, NULL otherwise.
789  */
790 static rd_kafka_topic_t* kfk_topic_get(str *topic_name)
791 {
792         rd_kafka_topic_t *result = NULL;
793
794         if (!topic_name || topic_name->len == 0 || topic_name->s == NULL) {
795                 LM_ERR("Bad topic name\n");
796                 goto clean;
797         }
798
799         kfk_topic_t *ktopic = kfk_topic;
800         while (ktopic) {
801                 kfk_topic_t *next = ktopic->next;
802
803                 if (topic_name->len == ktopic->topic_name->len &&
804                         strncmp(topic_name->s, ktopic->topic_name->s, topic_name->len) == 0) {
805                         LM_DBG("Topic name match: %.*s\n",
806                                    ktopic->topic_name->len,
807                                    ktopic->topic_name->s);
808                         result = ktopic->rd_topic;
809                         break;
810                 }
811
812                 ktopic = next;
813         }
814         
815 clean:
816         
817         return result;
818 }
819
820 /**
821  * \brief send a message to a topic.
822  *
823  * \param topic_name name of the topic
824  * \param message message to send.
825  * \param key to send.
826  *
827  * \return 0 on success.
828  */
829 int kfk_message_send(str *topic_name, str *message, str *key)
830 {
831     /* Get topic from name. */
832         rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
833
834         if (!rkt) {
835                 LM_ERR("Topic not found: %.*s\n", topic_name->len, topic_name->s);
836                 return -1;
837         }
838
839         /* Default key values (No key) */
840         void *keyp = NULL;
841         size_t key_len = 0;
842         if (key != NULL && key->len > 0 && key->s != NULL) {
843                 keyp = key->s;
844                 key_len = key->len;
845                 LM_DBG("Key: %.*s\n", (int)key_len, (char*)keyp);
846         }
847         
848         /* Send a message. */
849         if (rd_kafka_produce(
850                         rkt,
851                         RD_KAFKA_PARTITION_UA,
852                         RD_KAFKA_MSG_F_COPY,
853                         /* Payload and length */
854                         message->s,
855                         message->len,
856                         /* Optional key and its length */
857                         keyp, key_len,
858                         /* Message opaque, provided in
859                          * delivery report callback as
860                          * msg_opaque. */
861                         NULL) == -1) {
862                 rd_kafka_resp_err_t err = rd_kafka_last_error();
863                 LM_ERR("Error sending message: %s\n", rd_kafka_err2str(err));
864
865                 return -1;
866         }
867
868         LM_DBG("Message sent\n");
869
870         /* Poll to handle delivery reports */
871         rd_kafka_poll(rk, 0);
872         LM_DBG("Message polled\n");
873
874         return 0;
875 }
876
877 /**
878  * \brief Initialize statistics.
879  *
880  * \return 0 on success.
881  */
882 int kfk_stats_init()
883 {
884         LM_DBG("Initializing statistics\n");
885
886         stats_lock = lock_alloc();
887         if (!stats_lock) {
888                 LM_ERR("Cannot allocate stats lock\n");
889                 return -1;
890         }
891
892         if(lock_init(stats_lock) == NULL) {
893                 LM_ERR("cannot init stats lock\n");
894                 lock_dealloc(stats_lock);
895                 stats_lock = NULL;
896                 return -1;
897         }
898
899         stats_general = shm_malloc(sizeof(kfk_stats_t));
900         if (!stats_general) {
901                 LM_ERR("Out of shared memory\n");
902                 return -1;
903         }
904         memset(stats_general, 0, sizeof(kfk_stats_t));
905
906         return 0;
907 }
908
909 /**
910  * \brief Close statistics.
911  */
912 void kfk_stats_close()
913 {
914         LM_DBG("Closing statistics\n");
915
916         if (stats_lock) {
917                 LM_DBG("Freeing lock\n");
918                 lock_destroy(stats_lock);
919                 lock_dealloc(stats_lock);
920                 stats_lock = NULL;
921         }
922
923         kfk_stats_t *current_topic = stats_general;
924         while (current_topic) {
925                 kfk_stats_t *next = current_topic->next;
926                 kfk_stats_topic_free(current_topic);
927                 current_topic = next;
928         }
929 }
930
931 /**
932  * \brief free a kfk_stats_t structure.
933  */
934 static void kfk_stats_topic_free(kfk_stats_t *st_topic)
935 {
936         if (!st_topic) {
937                 /* Nothing to free. */
938                 return;
939         }
940
941         /* Free topic_name str. */
942         if (st_topic->topic_name) {
943                 if (st_topic->topic_name->s) {
944                         shm_free(st_topic->topic_name->s);
945                 }
946                 shm_free(st_topic->topic_name);
947         }
948
949         shm_free(st_topic);
950 }
951
952 /**
953  * \brief create a new stats_topic node.
954  *
955  * \return the new kfk_stats_t on success.
956  * \return NULL on error.
957  */
958 static kfk_stats_t* kfk_stats_topic_new(const char *topic, rd_kafka_resp_err_t err)
959 {
960         kfk_stats_t *st = NULL;
961
962         if (!topic) {
963                 LM_ERR("No topic\n");
964                 goto error;
965         }
966         int topic_len = strlen(topic);
967         if (topic_len == 0) {
968                 LM_ERR("Void topic\n");
969                 goto error;
970         }
971         
972         st = shm_malloc(sizeof(kfk_stats_t));
973         if (!st) {
974                 LM_ERR("Out of shared memory\n");
975                 goto error;
976         }
977         memset(st, 0, sizeof(kfk_stats_t));
978
979         st->topic_name = shm_malloc(sizeof(str));
980         if (!st->topic_name) {
981                 LM_ERR("Out of shared memory\n");
982                 goto error;
983         }
984         memset(st->topic_name, 0, sizeof(str));
985
986         st->topic_name->s = shm_malloc(topic_len + 1);
987         if (!st->topic_name->s) {
988                 LM_ERR("Out of shared memory\n");
989                 goto error;
990         }
991         memcpy(st->topic_name->s, topic, topic_len);
992         st->topic_name->s[topic_len] = '\0';
993         st->topic_name->len = topic_len;
994         
995         st->total++;
996         if (err) {
997                 st->error++;
998         }
999
1000         return st;
1001         
1002 error:
1003
1004         if (st) {
1005                 kfk_stats_topic_free(st);
1006         }
1007         
1008         return NULL;
1009 }
1010
1011 /**
1012  * \brief add a new message delivery to statistics.
1013  *
1014  * \return 0 on success.
1015  */
1016 static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t err)
1017 {
1018         LM_DBG("Adding stats: (topic: %s) (error: %d)\n",
1019                    topic, err);
1020
1021         if (topic == NULL || *topic == '\0') {
1022                 LM_ERR("No topic to add to statistics\n");
1023                 return -1;
1024         }
1025         int topic_len = strlen(topic);
1026         
1027         lock_get(stats_lock);
1028
1029         stats_general->total++;
1030
1031         if (err) {
1032                 stats_general->error++;
1033         }
1034
1035         LM_DBG("General stats: total = %" PRIu64 "  error = %" PRIu64 "\n",
1036                    stats_general->total, stats_general->error);
1037
1038         kfk_stats_t **stats_pre = &(stats_general->next);
1039         while (*stats_pre != NULL) {
1040                 LM_DBG("Topic search: %.*s\n", (*stats_pre)->topic_name->len,
1041                                    (*stats_pre)->topic_name->s);
1042                 if ((*stats_pre)->topic_name->len == topic_len &&
1043                         strncmp(topic, (*stats_pre)->topic_name->s, (*stats_pre)->topic_name->len) == 0) {
1044                         /* Topic match. */
1045                         LM_DBG("Topic match: %.*s\n", (*stats_pre)->topic_name->len,
1046                                    (*stats_pre)->topic_name->s);
1047                         break;
1048                 }
1049
1050                 stats_pre = &((*stats_pre)->next);
1051         }
1052
1053         if (*stats_pre == NULL) {
1054                 /* Topic not found. */
1055                 LM_DBG("Topic: %s not found\n", topic);
1056                 
1057                 /* Add a new stats topic. */
1058                 kfk_stats_t *new_topic = NULL;
1059                 new_topic = kfk_stats_topic_new(topic, err);
1060                 if (!new_topic) {
1061                         LM_ERR("Failed to create stats for topic: %s\n", topic);
1062                         goto error;
1063                 }
1064
1065                 *stats_pre = new_topic;
1066                 LM_DBG("Created Topic stats (%s): total = %" PRIu64 "  error = %" PRIu64 "\n",
1067                            topic, new_topic->total, new_topic->error);
1068
1069                 goto clean;
1070         }
1071
1072         /* Topic found. Increase statistics. */
1073         kfk_stats_t *current = *stats_pre;
1074         current->total++;
1075         if (err) {
1076                 current->error++;
1077         }
1078
1079         LM_DBG("Topic stats (%s): total = %" PRIu64 "  error = %" PRIu64 "\n",
1080                    topic, current->total, current->error);
1081
1082 clean:
1083         lock_release(stats_lock);
1084         
1085         return 0;
1086
1087 error:
1088         lock_release(stats_lock);
1089
1090         return -1;
1091 }
1092
1093 /**
1094  * \brief Get total statistics.
1095  *
1096  * \param msg_total return total number of messages by reference.
1097  * \param msg_error return total number of errors by reference.
1098  *
1099  * \return 0 on success.
1100  */
1101 int kfk_stats_get(uint64_t *msg_total, uint64_t *msg_error)
1102 {
1103         lock_get(stats_lock);
1104
1105         *msg_total = stats_general->total;
1106         *msg_error = stats_general->error;
1107
1108         lock_release(stats_lock);
1109
1110         return 0;
1111 }
1112
1113 /**
1114  * \brief Get statistics for a specified topic.
1115  *
1116  * \param s_topic string with topic name.
1117  * \param msg_total return total number of messages by reference.
1118  * \param msg_error return total number of errors by reference.
1119  *
1120  * \return 0 on success.
1121  */
1122 int kfk_stats_topic_get(str *s_topic, uint64_t *msg_total, uint64_t *msg_error)
1123 {
1124         /* Default return values. */
1125         *msg_total = 0;
1126         *msg_error = 0;
1127         
1128         lock_get(stats_lock);
1129
1130         kfk_stats_t *st = stats_general->next;
1131         while (st) {
1132                 LM_DBG("Topic show search: %.*s\n", st->topic_name->len, st->topic_name->s);
1133
1134                 if (st->topic_name->len == s_topic->len &&
1135                         strncmp(s_topic->s, st->topic_name->s, s_topic->len) == 0) {
1136                         /* Topic match. */
1137                         LM_DBG("Topic show match: %.*s\n", st->topic_name->len, st->topic_name->s);
1138                         break;
1139                 }
1140
1141                 st = st->next;
1142         }
1143
1144         if (!st) {
1145                 LM_ERR("Topic not found. Showing default 0 values\n");
1146                 goto clean;
1147         }
1148
1149         *msg_total = st->total;
1150         *msg_error = st->error;
1151
1152 clean:
1153         
1154         lock_release(stats_lock);
1155
1156         return 0;
1157 }