7f1dd8c87c1d6e04b8030d7945330e497b29765b
[sip-router] / src / modules / kafka / kfk.c
1 /*
2  * Copyright (C) 2019 Vicente Hernando (Sonoc https://www.sonoc.io)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * Kamailio is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  * Kamailio is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20  */
21
22 /**
23  * \file
24  * \brief Kafka :: Apache Kafka functions via librdkafka
25  * \ingroup kfk
26  *
27  * - Module: \ref kfk
28  */
29
30 #include <syslog.h> /* For log levels. */
31 #include <librdkafka/rdkafka.h>
32
33 #include "../../core/dprint.h"
34 #include "../../core/parser/parse_param.h"
35 #include "../../core/mem/pkg.h"
36 #include "../../core/mem/shm_mem.h"
37 #include "../../core/locking.h"
38
39 /**
40  * \brief data type for a configuration property.
41  */
42 typedef struct kfk_conf_node_s {
43         str *sname; /**< name of property */
44         str *svalue; /**< value of property */
45         struct kfk_conf_node_s *next; /**< next property in list */
46 } kfk_conf_node_t;
47
48 /**
49  * \brief list of configuration properties.
50  */
51 typedef struct kfk_conf_s {
52         param_t *attrs; /**< parsed attributes from configuration parameter. */
53         char *spec; /**< original string of configuration. */
54         kfk_conf_node_t *property; /**< list of configuration properties. */
55 } kfk_conf_t;
56
57 /**
58  * \brief data type for a topic.
59  *
60  * This is an element in a topic list.
61  */
62 typedef struct kfk_topic_s {
63         str *topic_name; /**< Name of the topic. */
64         rd_kafka_topic_t *rd_topic; /**< rd kafkfa topic structure. */
65         param_t *attrs; /**< parsed attributes for topic configuration. */
66         char *spec; /**< original string for topic configuration. */
67         kfk_conf_node_t *property; /**< list of configuration properties for a topic. */
68         struct kfk_topic_s *next; /**< Next element in topic list. */
69 } kfk_topic_t;
70
71 /**
72  * \brief stats about a topic.
73  */
74 typedef struct kfk_stats_s {
75         str *topic_name; /**< Name of the topic, or NULL for general statistics. */
76         uint64_t total; /**< Total number of messages sent. */
77         uint64_t error; /**< Number of failed messages to sent. */
78         struct kfk_stats_s *next; /**< Next element in stats list. */
79 } kfk_stats_t;
80
81 /* Static variables. */
82 static rd_kafka_conf_t *rk_conf = NULL;  /* Configuration object */
83 static rd_kafka_t *rk = NULL; /* Producer instance handle */
84 static kfk_conf_t *kfk_conf = NULL; /* List for Kafka configuration properties. */
85 static kfk_topic_t *kfk_topic = NULL; /* List for Kafka topics. */
86
87 #define ERRSTR_LEN 512 /**< Length of internal buffer for errors. */
88 static char errstr[ERRSTR_LEN]; /* librdkafka API error reporting buffer */
89 gen_lock_t *stats_lock = NULL; /**< Lock to protect shared statistics data. */
90
91 /**
92  * \brief Total statistics
93  *
94  * First node (mandatory) is the general one with NULL topic.
95  * Next nodes are topic dependant ones and are optional.
96  * This way because general node is created in kfk_stats_init in mod_init is
97  * shared among every Kamailio process.
98  */
99 static kfk_stats_t *stats_general;
100
101 /* Static functions. */
102 static void kfk_conf_free(kfk_conf_t *kconf);
103 static void kfk_topic_free(kfk_topic_t *ktopic);
104 static int kfk_conf_configure();
105 static int kfk_topic_list_configure();
106 static int kfk_topic_exist(str *topic_name);
107 static rd_kafka_topic_t* kfk_topic_get(str *topic_name);
108 static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t err);
109 static void kfk_stats_topic_free(kfk_stats_t *st_topic);
110
111 /**
112  * \brief Kafka logger callback
113  */
114 static void kfk_logger (const rd_kafka_t *rk, int level,
115                     const char *fac, const char *buf) {
116
117         switch(level) {
118                 case LOG_EMERG:
119                         LM_NPRL("RDKAFKA fac: %s : %s : %s\n",
120                                         fac, rk ? rd_kafka_name(rk) : NULL,
121                                         buf);
122                         break;
123                         
124                 case LOG_ALERT:
125                         LM_ALERT("RDKAFKA fac: %s : %s : %s\n",
126                                          fac, rk ? rd_kafka_name(rk) : NULL,
127                                          buf);
128                         break;
129                         
130                 case LOG_CRIT:
131                         LM_CRIT("RDKAFKA fac: %s : %s : %s\n",
132                                         fac, rk ? rd_kafka_name(rk) : NULL,
133                                         buf);
134                         break;
135
136                 case LOG_ERR:
137                         LM_ERR("RDKAFKA fac: %s : %s : %s\n",
138                                    fac, rk ? rd_kafka_name(rk) : NULL,
139                                    buf);
140                         break;
141
142                 case LOG_WARNING:
143                         LM_WARN("RDKAFKA fac: %s : %s : %s\n",
144                                         fac, rk ? rd_kafka_name(rk) : NULL,
145                                         buf);
146                         break;
147
148                 case LOG_NOTICE:
149                         LM_NOTICE("RDKAFKA fac: %s : %s : %s\n",
150                                         fac, rk ? rd_kafka_name(rk) : NULL,
151                                         buf);
152                         break;
153                         
154                 case LOG_INFO:
155                         LM_INFO("RDKAFKA fac: %s : %s : %s\n",
156                                         fac, rk ? rd_kafka_name(rk) : NULL,
157                                         buf);
158                         break;
159
160                 case LOG_DEBUG:
161                         LM_DBG("RDKAFKA fac: %s : %s : %s\n",
162                                    fac, rk ? rd_kafka_name(rk) : NULL,
163                                    buf);
164                         break;
165
166                 default:
167                         LM_ERR("Unsupported kafka log level: %d\n", level);
168                         break;
169         }
170 }
171
172 /**
173  * \brief Message delivery report callback using the richer rd_kafka_message_t object.
174  */
175 static void kfk_msg_delivered (rd_kafka_t *rk,
176                                                            const rd_kafka_message_t *rkmessage, void *opaque) {
177
178         LM_DBG("Message delivered callback\n");
179         
180         const char *topic_name = NULL;
181         topic_name = rd_kafka_topic_name(rkmessage->rkt);
182         if (!topic_name) {
183                 LM_ERR("Cannot get topic name for delivered message\n");
184                 return;
185         }
186         
187         kfk_stats_add(topic_name, rkmessage->err);
188         
189         if (rkmessage->err) {
190                 LM_ERR("RDKAFKA Message delivery failed: %s\n",
191                            rd_kafka_err2str(rkmessage->err));
192         } else {
193                 LM_DBG("RDKAFKA Message delivered (%zd bytes, offset %"PRId64", "
194                            "partition %"PRId32"): %.*s\n",
195                            rkmessage->len, rkmessage->offset,
196                            rkmessage->partition,
197                            (int)rkmessage->len, (const char *)rkmessage->payload);
198         }
199 }
200
201 /**
202  * \brief Initialize kafka functionality.
203  *
204  * \param brokers brokers to add.
205  * \return 0 on success.
206  */
207 int kfk_init(char *brokers)
208 {
209         LM_DBG("Initializing Kafka\n");
210
211         if (brokers == NULL) {
212                 LM_ERR("brokers parameter not set\n");
213                 return -1;
214         }
215         
216         /*
217          * Create Kafka client configuration place-holder
218          */
219         rk_conf = rd_kafka_conf_new();
220
221         /* Set logger */
222         rd_kafka_conf_set_log_cb(rk_conf, kfk_logger);
223
224         /* Set message delivery callback. */
225         rd_kafka_conf_set_dr_msg_cb(rk_conf, kfk_msg_delivered);
226
227         /* Configure properties: */
228         if (kfk_conf_configure()) {
229                 LM_ERR("Failed to configure general properties\n");
230                 return -1;
231         }
232
233         /*
234          * Create producer instance.
235          *
236          * NOTE: rd_kafka_new() takes ownership of the conf object
237          *       and the application must not reference it again after
238          *       this call.
239          */
240         rk = rd_kafka_new(RD_KAFKA_PRODUCER, rk_conf, errstr, sizeof(errstr));
241         if (!rk) {
242                 LM_ERR("Failed to create new producer: %s\n", errstr);
243                 return -1;
244         }
245         rk_conf = NULL; /* Now owned by producer. */
246         LM_DBG("Producer handle created\n");
247
248         LM_DBG("Adding broker: %s\n", brokers);
249         /* Add brokers */
250         if (rd_kafka_brokers_add(rk, brokers) == 0) {
251                 LM_ERR("No valid brokers specified: %s\n", brokers);
252                 return -1;
253         }
254         LM_DBG("Added broker: %s\n", brokers);
255
256         /* Topic creation and configuration. */
257         if (kfk_topic_list_configure()) {
258                 LM_ERR("Failed to configure topics\n");
259                 return -1;
260         }
261
262         return 0;
263 }
264
265 /**
266  * \brief Close kafka related functionality.
267  */
268 void kfk_close()
269 {
270         rd_kafka_resp_err_t err;
271         
272         LM_DBG("Closing Kafka\n");
273
274     /* Destroy the producer instance */
275         if (rk) {
276                 /* Flushing messages. */
277                 LM_DBG("Flushing messages\n");
278                 err = rd_kafka_flush(rk, 0);
279                 if (err) {
280                         LM_ERR("Failed to flush messages: %s\n", rd_kafka_err2str(err));
281                 }
282
283                 /* Destroy producer. */
284                 LM_DBG("Destroying instance of Kafka producer\n");
285                 rd_kafka_destroy(rk);
286         }
287
288         /* Destroy configuration if not freed by rd_kafka_destroy. */
289         if (rk_conf) {
290                 LM_DBG("Destroying instance of Kafka configuration\n");
291                 rd_kafka_conf_destroy(rk_conf);
292         }
293
294         /* Free list of configuration properties. */
295         if (kfk_conf) {
296                 kfk_conf_free(kfk_conf);
297         }
298
299         /* Free list of topics. */
300         while (kfk_topic) {
301                 kfk_topic_t *next = kfk_topic->next;
302                 kfk_topic_free(kfk_topic);
303                 kfk_topic = next;
304         }
305 }
306
307 /**
308  * \brief Free a general configuration object.
309  */
310 static void kfk_conf_free(kfk_conf_t *kconf)
311 {
312         if (kconf == NULL) {
313                 /* Nothing to free. */
314                 return;
315         }
316
317         kfk_conf_node_t *knode = kconf->property;
318         while (knode) {
319                 kfk_conf_node_t *next = knode->next;
320                 pkg_free(knode);
321                 knode = next;
322         }
323
324         free_params(kconf->attrs);
325         pkg_free(kconf);
326 }
327
328 /**
329  * \brief Parse general configuration properties for Kafka.
330  */
331 int kfk_conf_parse(char *spec)
332 {
333         param_t *pit = NULL;
334         param_hooks_t phooks;
335         kfk_conf_t *kconf = NULL;
336
337         if (kfk_conf != NULL) {
338                 LM_ERR("Configuration already set\n");
339                 goto error;
340         }
341         
342         str s;
343         s.s = spec;
344         s.len = strlen(spec);
345         if(s.s[s.len-1]==';') {
346                 s.len--;
347         }
348         if (parse_params(&s, CLASS_ANY, &phooks, &pit) < 0) {
349                 LM_ERR("Failed parsing params value\n");
350                 goto error;
351         }
352
353         kconf = (kfk_conf_t*)pkg_malloc(sizeof(kfk_conf_t));
354         if (kconf == NULL) {
355                 LM_ERR("No more pkg memory\n");
356                 goto error;
357         }
358         memset(kconf, 0, sizeof(kfk_conf_t));
359         kconf->attrs = pit;
360         kconf->spec = spec;
361         for (pit = kconf->attrs; pit; pit=pit->next)
362         {
363                 /* Parse a property. */
364                 kfk_conf_node_t *knode = NULL;
365                 knode = (kfk_conf_node_t*)pkg_malloc(sizeof(kfk_conf_node_t));
366                 if (knode == NULL) {
367                         LM_ERR("No more pkg memory\n");
368                         goto error;
369                 }
370                 memset(knode, 0, sizeof(kfk_conf_node_t));
371                 
372                 knode->sname = &pit->name;
373                 knode->svalue = &pit->body;
374                 if (knode->sname && knode->svalue) {
375                         LM_DBG("Parsed property: %.*s -> %.*s\n",
376                                    knode->sname->len, knode->sname->s,
377                                    knode->svalue->len, knode->svalue->s);
378                 }
379
380                 /* Place node at beginning of knode list. */
381             knode->next = kconf->property;
382                 kconf->property = knode;
383         } /* for pit */
384
385         kfk_conf = kconf;
386         return 0;
387
388 error:
389         if(pit!=NULL) {
390                 free_params(pit);
391         }
392
393         if(kconf != NULL) {
394                 kfk_conf_free(kconf);
395         }
396         return -1;
397 }
398
399 /**
400  * \brief Configure Kafka properties.
401  *
402  * \return 0 on success. 
403  */
404 static int kfk_conf_configure()
405 {
406         if (kfk_conf == NULL) {
407                 /* Nothing to configure. */
408                 LM_DBG("No properties to configure\n");
409                 return 0;
410         }
411
412         LM_DBG("Configuring properties\n");
413         
414         kfk_conf_node_t *knode = kfk_conf->property;
415         while (knode) {
416                 kfk_conf_node_t *next = knode->next;
417                 str *sname = knode->sname;
418                 str *svalue = knode->svalue;
419                 knode = next;
420                 
421                 if (sname == NULL || sname->len == 0 || sname->s == NULL) {
422                         LM_ERR("Bad name in configuration property\n");
423                         continue;
424                 }
425
426                 if (svalue == NULL || svalue->len == 0 || svalue->s == NULL) {
427                         LM_ERR("Bad value in configuration property\n");
428                         continue;
429                 }
430
431                 /* We temporarily convert to zstring. */
432                 char cname = sname->s[sname->len];
433                 sname->s[sname->len] = '\0';
434                 char cvalue = svalue->s[svalue->len];
435                 svalue->s[svalue->len] = '\0';
436                 
437                 LM_DBG("Setting property: %s -> %s\n", sname->s, svalue->s);
438                 
439                 if (rd_kafka_conf_set(rk_conf, sname->s, svalue->s,
440                                                           errstr, sizeof(errstr)) !=
441                         RD_KAFKA_CONF_OK) {
442                         LM_ERR("Configuration failed: %s\n", errstr);
443
444                         /* We restore zstrings back to str */
445                         sname->s[sname->len] = cname;
446                         svalue->s[svalue->len] = cvalue;
447                         return -1;
448                 }
449
450                 /* We restore zstrings back to str */
451                 sname->s[sname->len] = cname;
452                 svalue->s[svalue->len] = cvalue;
453
454         } /* while knode */
455         
456         return 0;
457 }
458
459 /**
460  * \brief Free a topic object.
461  */
462 static void kfk_topic_free(kfk_topic_t *ktopic)
463 {
464         if (ktopic == NULL) {
465                 /* Nothing to free. */
466                 return;
467         }
468
469         kfk_conf_node_t *knode = ktopic->property;
470         while (knode) {
471                 kfk_conf_node_t *next = knode->next;
472                 pkg_free(knode);
473                 knode = next;
474         }
475
476         /* Destroy rd Kafka topic. */
477         if (ktopic->rd_topic) {
478                 rd_kafka_topic_destroy(ktopic->rd_topic);
479         }
480         
481         free_params(ktopic->attrs);
482         pkg_free(ktopic);
483 }
484
485 /**
486  * \brief Parse topic properties for Kafka.
487  */
488 int kfk_topic_parse(char *spec)
489 {
490         param_t *pit = NULL;
491         param_hooks_t phooks;
492         kfk_topic_t *ktopic = NULL;
493
494         str s;
495         s.s = spec;
496         s.len = strlen(spec);
497         if(s.s[s.len-1]==';') {
498                 s.len--;
499         }
500         if (parse_params(&s, CLASS_ANY, &phooks, &pit) < 0) {
501                 LM_ERR("Failed parsing params value\n");
502                 goto error;
503         }
504
505         ktopic = (kfk_topic_t*)pkg_malloc(sizeof(kfk_topic_t));
506         if (ktopic == NULL) {
507                 LM_ERR("No more pkg memory\n");
508                 goto error;
509         }
510         memset(ktopic, 0, sizeof(kfk_topic_t));
511         ktopic->attrs = pit;
512         ktopic->spec = spec;
513         for (pit = ktopic->attrs; pit; pit=pit->next)
514         {
515                 /* Check for topic name. */
516                 if (pit->name.len==4 && strncmp(pit->name.s, "name", 4)==0) {
517                         if (ktopic->topic_name != NULL) {
518                                 LM_ERR("Topic name already set\n");
519                                 goto error;
520                         }
521                         ktopic->topic_name = &pit->body;
522                         LM_DBG("Topic name: %.*s\n", pit->body.len, pit->body.s);
523                         
524                 } else {
525
526                         /* Parse a property. */
527                         kfk_conf_node_t *knode = NULL;
528                         knode = (kfk_conf_node_t*)pkg_malloc(sizeof(kfk_conf_node_t));
529                         if (knode == NULL) {
530                                 LM_ERR("No more pkg memory\n");
531                                 goto error;
532                         }
533                         memset(knode, 0, sizeof(kfk_conf_node_t));
534                         
535                         knode->sname = &pit->name;
536                         knode->svalue = &pit->body;
537                         if (knode->sname && knode->svalue) {
538                                 LM_DBG("Topic parsed property: %.*s -> %.*s\n",
539                                            knode->sname->len, knode->sname->s,
540                                            knode->svalue->len, knode->svalue->s);
541                         }
542                         
543                         /* Place node at beginning of ktopic list. */
544                         knode->next = ktopic->property;
545                         ktopic->property = knode;
546                 } /* if pit->name.len == 4 */
547         } /* for pit */
548
549         /* Topic name is mandatory. */
550         if(ktopic->topic_name == NULL)
551         {
552                 LM_ERR("No topic name\n");
553                 goto error;
554         }
555
556         /* Place topic at beginning of topic list. */
557         ktopic->next = kfk_topic;
558         kfk_topic = ktopic;
559         return 0;
560
561 error:
562         if(pit!=NULL) {
563                 free_params(pit);
564         }
565
566         if(ktopic != NULL) {
567                 kfk_topic_free(ktopic);
568         }
569         return -1;
570 }
571
572 /**
573  * \brief Create and configure a topic.
574  *
575  * \return 0 on success.
576  */
577 static int kfk_topic_configure(kfk_topic_t *ktopic)
578 {
579         rd_kafka_topic_conf_t *topic_conf = NULL;
580         rd_kafka_topic_t *rkt = NULL;
581         
582         if (ktopic == NULL) {
583                 LM_ERR("No topic to create\n");
584                 goto error;
585         }
586         
587         /* Check topic name. */
588         if (!ktopic->topic_name || !ktopic->topic_name->s || ktopic->topic_name->len == 0) {
589                 LM_ERR("Bad topic name\n");
590                 goto error;
591         }
592
593         int topic_found = kfk_topic_exist(ktopic->topic_name);
594         if (topic_found == -1) {
595                 LM_ERR("Failed to search for topic %.*s in cluster\n",
596                            ktopic->topic_name->len, ktopic->topic_name->s);
597                 goto error;
598         } else if (topic_found == 0) {
599                 LM_ERR("Topic not found %.*s in cluster\n",
600                            ktopic->topic_name->len, ktopic->topic_name->s);
601                 goto error;
602         }
603         
604         LM_DBG("Creating topic: %.*s\n",
605                    ktopic->topic_name->len, ktopic->topic_name->s);
606
607         /* Topic configuration */
608
609         topic_conf = rd_kafka_topic_conf_new();
610
611         kfk_conf_node_t *knode = kfk_topic->property;
612         while (knode) {
613                 kfk_conf_node_t *next = knode->next;
614                 str *sname = knode->sname;
615                 str *svalue = knode->svalue;
616                 knode = next;
617                 
618                 if (sname == NULL || sname->len == 0 || sname->s == NULL) {
619                         LM_ERR("Bad name in topic configuration property\n");
620                         continue;
621                 }
622
623                 if (svalue == NULL || svalue->len == 0 || svalue->s == NULL) {
624                         LM_ERR("Bad value in topic configuration property\n");
625                         continue;
626                 }
627
628                 /* We temporarily convert to zstring. */
629                 char cname = sname->s[sname->len];
630                 sname->s[sname->len] = '\0';
631                 char cvalue = svalue->s[svalue->len];
632                 svalue->s[svalue->len] = '\0';
633                 
634                 LM_DBG("Setting topic property: %s -> %s\n", sname->s, svalue->s);
635
636                 rd_kafka_conf_res_t res;
637                 res = rd_kafka_topic_conf_set(topic_conf, sname->s, svalue->s,
638                                                                           errstr, sizeof(errstr));
639                 if (res != RD_KAFKA_CONF_OK) {
640                         LM_ERR("Failed to set topic configuration: %s -> %s\n",
641                                    sname->s, svalue->s);
642
643                         /* We restore zstrings back to str */
644                         sname->s[sname->len] = cname;
645                         svalue->s[svalue->len] = cvalue;
646
647                         goto error;
648                 }
649
650                 /* We restore zstrings back to str */
651                 sname->s[sname->len] = cname;
652                 svalue->s[svalue->len] = cvalue;
653
654         } /* while knode */
655
656         /* We temporarily convert to zstring. */
657         char c_topic_name = ktopic->topic_name->s[ktopic->topic_name->len];
658         ktopic->topic_name->s[ktopic->topic_name->len] = '\0';
659
660         rkt = rd_kafka_topic_new(rk, ktopic->topic_name->s, topic_conf);
661         if (!rkt) {
662                 LM_ERR("Failed to create topic (%s): %s\n",
663                            ktopic->topic_name->s,
664                            rd_kafka_err2str(rd_kafka_last_error()));
665
666                 /* We restore zstrings back to str */
667                 ktopic->topic_name->s[ktopic->topic_name->len] = c_topic_name;
668
669                 goto error;
670         }
671         topic_conf = NULL; /* Now owned by topic */
672         LM_DBG("Topic created: %s\n", ktopic->topic_name->s);
673         
674         /* We restore zstrings back to str */
675         ktopic->topic_name->s[ktopic->topic_name->len] = c_topic_name;
676
677         /* Everything went fine. */
678         ktopic->rd_topic = rkt;
679         return 0;
680
681 error:
682
683         /* Destroy topic configuration. */
684         if (topic_conf) {
685                 rd_kafka_topic_conf_destroy(topic_conf);
686         }
687
688         /* Destroy topic */
689         if (rkt) {
690                 LM_DBG("Destroying topic\n");
691                 rd_kafka_topic_destroy(rkt);
692         }
693
694         return -1;
695 }
696         
697 /**
698  * \brief Create and configure a list of topics.
699  *
700  * \return 0 on success.
701  */
702 static int kfk_topic_list_configure()
703 {
704         kfk_topic_t *ktopic = kfk_topic;
705
706         while (ktopic) {
707                 kfk_topic_t *next = ktopic->next;
708                 /* Create current topic. */
709                 if (kfk_topic_configure(ktopic)) {
710                         LM_ERR("Failed to create topic: %.*s\n",
711                                    ktopic->topic_name->len, ktopic->topic_name->s);
712                         return -1;
713                 }
714                 ktopic = next;
715         }
716
717         return 0;
718 }
719
720 /* -1 means RD_POLL_INFINITE */
721 /* 100000 means 100 seconds */
722 #define METADATA_TIMEOUT 100000 /**< Timeout when asking for metadata in milliseconds. */
723
724 /**
725  * \brief check that a topic exists in cluster.
726  *
727  * \return 0 if topic does not exist.
728  * \return 1 if topic does exist.
729  * \return -1 on error.
730  */
731 static int kfk_topic_exist(str *topic_name)
732 {
733         /* Where to receive metadata. */
734         const struct rd_kafka_metadata *metadatap = NULL;
735         int i;
736         int topic_found = 0; /* Topic not found by default. */
737
738         if (!topic_name || topic_name->len == 0 || topic_name->s == NULL) {
739                 LM_ERR("Bad topic name\n");
740                 goto error;
741         }
742
743         /* Get metadata for all topics. */
744         rd_kafka_resp_err_t res;
745         res = rd_kafka_metadata(rk, 1, NULL, &metadatap, METADATA_TIMEOUT);
746         if (res != RD_KAFKA_RESP_ERR_NO_ERROR) {
747                 LM_ERR("Failed to get metadata: %s\n", rd_kafka_err2str(res));
748                 goto error;
749         }
750
751         /* List topics */
752         for (i=0; i<metadatap->topic_cnt; i++) {
753                 rd_kafka_metadata_topic_t *t = &metadatap->topics[i];
754                 if (t->topic) {
755                         LM_DBG("Metadata Topic: %s\n", t->topic);
756                         if (strncmp(topic_name->s, t->topic, topic_name->len) == 0) {
757                                 topic_found = 1;
758                                 LM_DBG("Metadata Topic (%s) found!\n", t->topic);
759                                 break;
760                         }
761                 }
762         } // for (i=0; i<m->topic_cnt; i++)
763
764         /* Destroy metadata. */
765         rd_kafka_metadata_destroy(metadatap);
766
767         if (topic_found == 0) {
768                 LM_DBG("Topic not found: %.*s\n", topic_name->len, topic_name->s);
769                 return 0;
770         }
771
772         LM_DBG("Topic found: %.*s\n", topic_name->len, topic_name->s);
773         return 1;
774         
775 error:
776
777         /* Destroy metadata. */
778         if (metadatap) {
779                 rd_kafka_metadata_destroy(metadatap);
780         }
781         
782         return -1;
783 }
784
785 /**
786  * \brief get a topic based on its name.
787  *
788  * \return topic if it founds a matching one, NULL otherwise.
789  */
790 static rd_kafka_topic_t* kfk_topic_get(str *topic_name)
791 {
792         rd_kafka_topic_t *result = NULL;
793
794         if (!topic_name || topic_name->len == 0 || topic_name->s == NULL) {
795                 LM_ERR("Bad topic name\n");
796                 goto clean;
797         }
798
799         kfk_topic_t *ktopic = kfk_topic;
800         while (ktopic) {
801                 kfk_topic_t *next = ktopic->next;
802
803                 if (topic_name->len == ktopic->topic_name->len &&
804                         strncmp(topic_name->s, ktopic->topic_name->s, topic_name->len) == 0) {
805                         LM_DBG("Topic name match: %.*s\n",
806                                    ktopic->topic_name->len,
807                                    ktopic->topic_name->s);
808                         result = ktopic->rd_topic;
809                         break;
810                 }
811
812                 ktopic = next;
813         }
814         
815 clean:
816         
817         return result;
818 }
819
820 /**
821  * \brief send a message to a topic.
822  *
823  * \param topic_name name of the topic
824  * \param message message to send.
825  *
826  * \return 0 on success.
827  */
828 int kfk_message_send(str *topic_name, str *message)
829 {
830     /* Get topic from name. */
831         rd_kafka_topic_t *rkt = kfk_topic_get(topic_name);
832
833         if (!rkt) {
834                 LM_ERR("Topic not found: %.*s\n", topic_name->len, topic_name->s);
835                 return -1;
836         }
837
838         /* Send a message. */
839         if (rd_kafka_produce(
840                         rkt,
841                         RD_KAFKA_PARTITION_UA,
842                         RD_KAFKA_MSG_F_COPY,
843                         /* Payload and length */
844                         message->s,
845                         message->len,
846                         /* Optional key and its length */
847                         NULL, 0,
848                         /* Message opaque, provided in
849                          * delivery report callback as
850                          * msg_opaque. */
851                         NULL) == -1) {
852                 rd_kafka_resp_err_t err = rd_kafka_last_error();
853                 LM_ERR("Error sending message: %s\n", rd_kafka_err2str(err));
854
855                 return -1;
856         }
857
858         LM_DBG("Message sent\n");
859
860         /* Poll to handle delivery reports */
861         rd_kafka_poll(rk, 0);
862         LM_DBG("Message polled\n");
863
864         return 0;
865 }
866
867 /**
868  * \brief Initialize statistics.
869  *
870  * \return 0 on success.
871  */
872 int kfk_stats_init()
873 {
874         LM_DBG("Initializing statistics\n");
875
876         stats_lock = lock_alloc();
877         if (!stats_lock) {
878                 LM_ERR("Cannot allocate stats lock\n");
879                 return -1;
880         }
881
882         if(lock_init(stats_lock) == NULL) {
883                 LM_ERR("cannot init stats lock\n");
884                 lock_dealloc(stats_lock);
885                 stats_lock = NULL;
886                 return -1;
887         }
888
889         stats_general = shm_malloc(sizeof(kfk_stats_t));
890         if (!stats_general) {
891                 LM_ERR("Out of shared memory\n");
892                 return -1;
893         }
894         memset(stats_general, 0, sizeof(kfk_stats_t));
895
896         return 0;
897 }
898
899 /**
900  * \brief Close statistics.
901  */
902 void kfk_stats_close()
903 {
904         LM_DBG("Closing statistics\n");
905
906         if (stats_lock) {
907                 LM_DBG("Freeing lock\n");
908                 lock_destroy(stats_lock);
909                 lock_dealloc(stats_lock);
910                 stats_lock = NULL;
911         }
912
913         kfk_stats_t *current_topic = stats_general;
914         while (current_topic) {
915                 kfk_stats_t *next = current_topic->next;
916                 kfk_stats_topic_free(current_topic);
917                 current_topic = next;
918         }
919 }
920
921 /**
922  * \brief free a kfk_stats_t structure.
923  */
924 static void kfk_stats_topic_free(kfk_stats_t *st_topic)
925 {
926         if (!st_topic) {
927                 /* Nothing to free. */
928                 return;
929         }
930
931         /* Free topic_name str. */
932         if (st_topic->topic_name) {
933                 if (st_topic->topic_name->s) {
934                         shm_free(st_topic->topic_name->s);
935                 }
936                 shm_free(st_topic->topic_name);
937         }
938
939         shm_free(st_topic);
940 }
941
942 /**
943  * \brief create a new stats_topic node.
944  *
945  * \return the new kfk_stats_t on success.
946  * \return NULL on error.
947  */
948 static kfk_stats_t* kfk_stats_topic_new(const char *topic, rd_kafka_resp_err_t err)
949 {
950         kfk_stats_t *st = NULL;
951
952         if (!topic) {
953                 LM_ERR("No topic\n");
954                 goto error;
955         }
956         int topic_len = strlen(topic);
957         if (topic_len == 0) {
958                 LM_ERR("Void topic\n");
959                 goto error;
960         }
961         
962         st = shm_malloc(sizeof(kfk_stats_t));
963         if (!st) {
964                 LM_ERR("Out of shared memory\n");
965                 goto error;
966         }
967         memset(st, 0, sizeof(kfk_stats_t));
968
969         st->topic_name = shm_malloc(sizeof(str));
970         if (!st->topic_name) {
971                 LM_ERR("Out of shared memory\n");
972                 goto error;
973         }
974         memset(st->topic_name, 0, sizeof(str));
975
976         st->topic_name->s = shm_malloc(topic_len + 1);
977         if (!st->topic_name->s) {
978                 LM_ERR("Out of shared memory\n");
979                 goto error;
980         }
981         memcpy(st->topic_name->s, topic, topic_len);
982         st->topic_name->s[topic_len] = '\0';
983         st->topic_name->len = topic_len;
984         
985         st->total++;
986         if (err) {
987                 st->error++;
988         }
989
990         return st;
991         
992 error:
993
994         if (st) {
995                 kfk_stats_topic_free(st);
996         }
997         
998         return NULL;
999 }
1000
1001 /**
1002  * \brief add a new message delivery to statistics.
1003  *
1004  * \return 0 on success.
1005  */
1006 static int kfk_stats_add(const char *topic, rd_kafka_resp_err_t err)
1007 {
1008         LM_DBG("Adding stats: (topic: %s) (error: %d)\n",
1009                    topic, err);
1010
1011         if (topic == NULL || *topic == '\0') {
1012                 LM_ERR("No topic to add to statistics\n");
1013                 return -1;
1014         }
1015         int topic_len = strlen(topic);
1016         
1017         lock_get(stats_lock);
1018
1019         stats_general->total++;
1020
1021         if (err) {
1022                 stats_general->error++;
1023         }
1024
1025         LM_DBG("General stats: total = %" PRIu64 "  error = %" PRIu64 "\n",
1026                    stats_general->total, stats_general->error);
1027
1028         kfk_stats_t **stats_pre = &(stats_general->next);
1029         while (*stats_pre != NULL) {
1030                 LM_DBG("Topic search: %.*s\n", (*stats_pre)->topic_name->len,
1031                                    (*stats_pre)->topic_name->s);
1032                 if ((*stats_pre)->topic_name->len == topic_len &&
1033                         strncmp(topic, (*stats_pre)->topic_name->s, (*stats_pre)->topic_name->len) == 0) {
1034                         /* Topic match. */
1035                         LM_DBG("Topic match: %.*s\n", (*stats_pre)->topic_name->len,
1036                                    (*stats_pre)->topic_name->s);
1037                         break;
1038                 }
1039
1040                 stats_pre = &((*stats_pre)->next);
1041         }
1042
1043         if (*stats_pre == NULL) {
1044                 /* Topic not found. */
1045                 LM_DBG("Topic: %s not found\n", topic);
1046                 
1047                 /* Add a new stats topic. */
1048                 kfk_stats_t *new_topic = NULL;
1049                 new_topic = kfk_stats_topic_new(topic, err);
1050                 if (!new_topic) {
1051                         LM_ERR("Failed to create stats for topic: %s\n", topic);
1052                         goto error;
1053                 }
1054
1055                 *stats_pre = new_topic;
1056                 LM_DBG("Created Topic stats (%s): total = %" PRIu64 "  error = %" PRIu64 "\n",
1057                            topic, new_topic->total, new_topic->error);
1058
1059                 goto clean;
1060         }
1061
1062         /* Topic found. Increase statistics. */
1063         kfk_stats_t *current = *stats_pre;
1064         current->total++;
1065         if (err) {
1066                 current->error++;
1067         }
1068
1069         LM_DBG("Topic stats (%s): total = %" PRIu64 "  error = %" PRIu64 "\n",
1070                    topic, current->total, current->error);
1071
1072 clean:
1073         lock_release(stats_lock);
1074         
1075         return 0;
1076
1077 error:
1078         lock_release(stats_lock);
1079
1080         return -1;
1081 }
1082
1083 /**
1084  * \brief Get total statistics.
1085  *
1086  * \param msg_total return total number of messages by reference.
1087  * \param msg_error return total number of errors by reference.
1088  *
1089  * \return 0 on success.
1090  */
1091 int kfk_stats_get(uint64_t *msg_total, uint64_t *msg_error)
1092 {
1093         lock_get(stats_lock);
1094
1095         *msg_total = stats_general->total;
1096         *msg_error = stats_general->error;
1097
1098         lock_release(stats_lock);
1099
1100         return 0;
1101 }
1102
1103 /**
1104  * \brief Get statistics for a specified topic.
1105  *
1106  * \param s_topic string with topic name.
1107  * \param msg_total return total number of messages by reference.
1108  * \param msg_error return total number of errors by reference.
1109  *
1110  * \return 0 on success.
1111  */
1112 int kfk_stats_topic_get(str *s_topic, uint64_t *msg_total, uint64_t *msg_error)
1113 {
1114         /* Default return values. */
1115         *msg_total = 0;
1116         *msg_error = 0;
1117         
1118         lock_get(stats_lock);
1119
1120         kfk_stats_t *st = stats_general->next;
1121         while (st) {
1122                 LM_DBG("Topic show search: %.*s\n", st->topic_name->len, st->topic_name->s);
1123
1124                 if (st->topic_name->len == s_topic->len &&
1125                         strncmp(s_topic->s, st->topic_name->s, s_topic->len) == 0) {
1126                         /* Topic match. */
1127                         LM_DBG("Topic show match: %.*s\n", st->topic_name->len, st->topic_name->s);
1128                         break;
1129                 }
1130
1131                 st = st->next;
1132         }
1133
1134         if (!st) {
1135                 LM_ERR("Topic not found. Showing default 0 values\n");
1136                 goto clean;
1137         }
1138
1139         *msg_total = st->total;
1140         *msg_error = st->error;
1141
1142 clean:
1143         
1144         lock_release(stats_lock);
1145
1146         return 0;
1147 }