kafka: send messages with key via kafka_send_key function and kafka.send_key for...
[sip-router] / src / modules / kafka / kafka_mod.c
1 /*
2  * Copyright (C) 2019 Vicente Hernando (Sonoc https://www.sonoc.io)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * Kamailio is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  * Kamailio is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20  */
21
22 /**
23  * \file
24  * \brief Kafka :: Module Core
25  * \ingroup kfk
26  *
27  * - Module: \ref kfk
28  */
29
30 /**
31  * \defgroup kfk Kafka :: Kafka module for Kamailio
32  *
33  * This module contains functions related to Apache Kafka initialization and closing,
34  * as well as the module interface.
35  * It uses librdkafka library.
36  * Currently it only provides producer capabilites.
37  */
38
39 /* Headers */
40 #include <inttypes.h>
41
42 #include "../../core/sr_module.h"
43 #include "../../core/dprint.h"
44 #include "../../core/mod_fix.h"
45 #include "../../core/kemi.h"
46 #include "../../core/rpc.h"
47 #include "../../core/rpc_lookup.h"
48
49 #include "kfk.h"
50
51 MODULE_VERSION
52
53 /* Declaration of static variables and functions. */
54
55 static rpc_export_t rpc_cmds[];
56 static int mod_init(void);
57 static void mod_destroy(void);
58 static int child_init(int rank);
59 static int fixup_kafka_send(void** param, int param_no);
60 static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage);
61 static int w_kafka_send_key(struct sip_msg *msg, char *ptopic, char *pmessage, char *pkey);
62
63 /*
64  * Variables and functions to deal with module parameters.
65  */
66 char *brokers_param = NULL; /**< List of brokers. */
67 static int kafka_conf_param(modparam_t type, void *val);
68 static int kafka_topic_param(modparam_t type, void *val);
69
70 /**
71  * \brief Module commands
72  */
73 static cmd_export_t cmds[] = {
74         {"kafka_send", (cmd_function)w_kafka_send, 2, fixup_kafka_send,
75          0, ANY_ROUTE},
76         {"kafka_send_key", (cmd_function)w_kafka_send_key, 3, fixup_kafka_send,
77          0, ANY_ROUTE},
78     { 0, 0, 0, 0, 0, 0}
79 };
80
81 /**
82  * \brief Structure for module parameters.
83  */
84 static param_export_t params[]={
85         {"brokers", PARAM_STRING, &brokers_param},
86         {"configuration", PARAM_STRING|USE_FUNC_PARAM, (void*)kafka_conf_param},
87         {"topic", PARAM_STRING|USE_FUNC_PARAM, (void*)kafka_topic_param},
88     {0, 0, 0}
89 };
90
91 /**
92  * \brief Kafka :: Module interface
93  */
94 struct module_exports exports = {
95         "kafka",
96         DEFAULT_DLFLAGS, /* dlopen flags */
97         cmds,
98         params,
99         0,              /* exported RPC methods */
100         0,              /* exported pseudo-variables */
101         0,              /* response function */
102         mod_init,       /* module initialization function */
103         child_init,             /* per child init function */
104         mod_destroy     /* destroy function */
105 };
106
107 static int mod_init(void)
108 {
109         /* Register RPC commands. */
110         if (rpc_register_array(rpc_cmds) != 0) {
111                 LM_ERR("Failed to register RPC commands\n");
112                 return -1;
113         }
114
115         /* Initialize statistics. */
116         if (kfk_stats_init()) {
117                 LM_ERR("Failed to initialize statistics\n");
118                 return -1;
119         }
120         
121         return 0;
122 }
123
124 static int child_init(int rank)
125 {
126         /* skip child init for non-worker process ranks */
127         /* if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) */
128         /* We execute kfk_init in PROC_MAIN so it cleans messages, etc right 
129            when destroying the module. */
130         if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
131                 return 0;
132
133         if (kfk_init(brokers_param)) {
134                 LM_ERR("Failed to initialize Kafka\n");
135                 return -1;
136         }
137         return 0;
138 }
139
140 static void mod_destroy(void)
141 {
142         LM_DBG("cleaning up\n");
143
144         kfk_close();
145
146         kfk_stats_close();
147 }
148
149 /**
150  * \brief Parse configuration parameter.
151  */
152 static int kafka_conf_param(modparam_t type, void *val)
153 {
154         return kfk_conf_parse((char*)val);
155 }
156
157 /**
158  * \brief Parse topic parameter.
159  */
160 static int kafka_topic_param(modparam_t type, void *val)
161 {
162         return kfk_topic_parse((char*)val);
163 }
164
165 static int fixup_kafka_send(void** param, int param_no)
166 {
167         return fixup_spve_null(param, 1);
168 }
169
170 /**
171  * \brief Send a message via Kafka
172  */
173 static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage)
174 {
175         str s_topic;
176
177         if (ptopic == NULL) {
178                 LM_ERR("Invalid topic parameter\n");
179                 return -1;
180         }
181
182         if (get_str_fparam(&s_topic, msg, (gparam_t*)ptopic)!=0) {
183                 LM_ERR("No topic\n");
184                 return -1;
185         }
186         if (s_topic.s == NULL || s_topic.len == 0) {
187                 LM_ERR("Invalid topic string\n");
188                 return -1;
189         }
190
191         str s_message;
192
193         if (pmessage == NULL) {
194                 LM_ERR("Invalid message parameter\n");
195                 return -1;
196         }
197
198         if (get_str_fparam(&s_message, msg, (gparam_t*)pmessage)!=0) {
199                 LM_ERR("No message\n");
200                 return -1;
201         }
202         if (s_message.s == NULL || s_message.len == 0) {
203                 LM_ERR("Invalid message string\n");
204                 return -1;
205         }
206
207         if (kfk_message_send(&s_topic, &s_message, NULL)) {
208                 LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
209                            s_topic.len, s_topic.s,
210                            s_message.len, s_message.s);
211                 return -1;
212         }
213
214         LM_DBG("Message sent (Topic: %.*s) : %.*s\n",
215                    s_topic.len, s_topic.s,
216                    s_message.len, s_message.s);
217         return 1;
218 }
219
220 /**
221  * \brief Send a message via Kafka plus key parameter.
222  */
223 static int w_kafka_send_key(struct sip_msg *msg, char *ptopic, char *pmessage, char *pkey)
224 {
225         str s_topic;
226
227         if (ptopic == NULL) {
228                 LM_ERR("Invalid topic parameter\n");
229                 return -1;
230         }
231
232         if (get_str_fparam(&s_topic, msg, (gparam_t*)ptopic)!=0) {
233                 LM_ERR("No topic\n");
234                 return -1;
235         }
236         if (s_topic.s == NULL || s_topic.len == 0) {
237                 LM_ERR("Invalid topic string\n");
238                 return -1;
239         }
240
241         str s_message;
242
243         if (pmessage == NULL) {
244                 LM_ERR("Invalid message parameter\n");
245                 return -1;
246         }
247
248         if (get_str_fparam(&s_message, msg, (gparam_t*)pmessage)!=0) {
249                 LM_ERR("No message\n");
250                 return -1;
251         }
252         if (s_message.s == NULL || s_message.len == 0) {
253                 LM_ERR("Invalid message string\n");
254                 return -1;
255         }
256
257         str s_key;
258
259         if (pkey == NULL) {
260                 LM_ERR("Invalid key parameter\n");
261                 return -1;
262         }
263
264         if (get_str_fparam(&s_key, msg, (gparam_t*)pkey)!=0) {
265                 LM_ERR("No key\n");
266                 return -1;
267         }
268         if (s_key.s == NULL || s_key.len == 0) {
269                 LM_ERR("Invalid key string\n");
270                 return -1;
271         }
272
273         if (kfk_message_send(&s_topic, &s_message, &s_key)) {
274                 LM_ERR("Cannot send kafka (topic: %.*s) (key: %.*s) message: %.*s\n",
275                            s_topic.len, s_topic.s,
276                            s_key.len, s_key.s,
277                            s_message.len, s_message.s);
278                 return -1;
279         }
280
281         LM_DBG("Message key sent (Topic: %.*s) (key: %.*s) : %.*s\n",
282                    s_topic.len, s_topic.s,
283                    s_key.len, s_key.s,
284                    s_message.len, s_message.s);
285         return 1;
286 }
287
288 /**
289  * \brief KEMI function to send a Kafka message.
290  */
291 static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
292 {
293         if (s_topic == NULL || s_topic->s == NULL || s_topic->len == 0) {
294                 LM_ERR("Invalid topic string\n");
295                 return -1;
296         }
297
298         if (s_message == NULL || s_message->s == NULL || s_message->len == 0) {
299                 LM_ERR("Invalid message string\n");
300                 return -1;
301         }
302
303         if (kfk_message_send(s_topic, s_message, NULL)) {
304                 LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
305                            s_topic->len, s_topic->s,
306                            s_message->len, s_message->s);
307                 return -1;
308         }
309
310         LM_DBG("Message sent (Topic: %.*s) : %.*s\n",
311                    s_topic->len, s_topic->s,
312                    s_message->len, s_message->s);
313         return 1;
314 }
315
316 /**
317  * \brief KEMI function to send a Kafka message plus key.
318  */
319 static int ki_kafka_send_key(struct sip_msg* msg, str *s_topic, str *s_message, str *s_key)
320 {
321         if (s_topic == NULL || s_topic->s == NULL || s_topic->len == 0) {
322                 LM_ERR("Invalid topic string\n");
323                 return -1;
324         }
325
326         if (s_message == NULL || s_message->s == NULL || s_message->len == 0) {
327                 LM_ERR("Invalid message string\n");
328                 return -1;
329         }
330
331         if (s_key == NULL || s_key->s == NULL || s_key->len == 0) {
332                 LM_ERR("Invalid key string\n");
333                 return -1;
334         }
335
336         if (kfk_message_send(s_topic, s_message, s_key)) {
337                 LM_ERR("Cannot send kafka (topic: %.*s) (key: %.*s) message: %.*s\n",
338                            s_topic->len, s_topic->s,
339                            s_key->len, s_key->s,
340                            s_message->len, s_message->s);
341                 return -1;
342         }
343
344         LM_DBG("Message sent (Topic: %.*s) (key: %.*s) : %.*s\n",
345                    s_topic->len, s_topic->s,
346                    s_key->len, s_key->s,
347                    s_message->len, s_message->s);
348         return 1;
349 }
350
351 /**
352  * \brief Kafka :: Array with KEMI functions
353  */
354 /* clang-format off */
355 static sr_kemi_t sr_kemi_kafka_exports[] = {
356         { str_init("kafka"), str_init("send"),
357           SR_KEMIP_INT, ki_kafka_send,
358           { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
359                 SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
360         },
361         { str_init("kafka"), str_init("send_key"),
362           SR_KEMIP_INT, ki_kafka_send_key,
363           { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_STR,
364                 SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
365         },
366
367         { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
368 };
369 /* clang-format on */
370
371 /**
372  * \brief Kafka :: register Kafka module
373  */
374 int mod_register(char *path, int *dlflags, void *p1, void *p2)
375 {
376         sr_kemi_modules_add(sr_kemi_kafka_exports);
377         return 0;
378 }
379
380 static void rpc_kafka_stats(rpc_t *rpc, void *ctx)
381 {
382         uint64_t msg_total = 0;
383         uint64_t msg_error = 0;
384
385         if (kfk_stats_get(&msg_total, &msg_error)) {
386                 LM_ERR("Failed to get total statistics\n");
387                 rpc->fault(ctx, 500, "Failed to get total statistics");
388                 return;
389         }
390
391         LM_DBG("Total messages: %" PRIu64 "  Errors: %" PRIu64 "\n",
392                    msg_total, msg_error);
393         if (rpc->rpl_printf(ctx, "Total messages: %" PRIu64 "  Errors: %" PRIu64,
394                                                 msg_total, msg_error) < 0) {
395                 rpc->fault(ctx, 500, "Internal error showing total statistics");
396                 return;
397         }
398 }
399
400 static void rpc_kafka_stats_topic(rpc_t *rpc, void *ctx)
401 {
402         str s_topic;
403
404         if (rpc->scan(ctx, "S", &s_topic) < 1) {
405                 rpc->fault(ctx, 400, "required topic string");
406                 return;
407         }
408
409         if (s_topic.len == 0 || s_topic.s == NULL) {
410                 LM_ERR("Bad topic name\n");
411                 rpc->fault(ctx, 400, "Bad topic name");
412                 return;
413         }
414         
415         uint64_t msg_total = 0;
416         uint64_t msg_error = 0;
417
418         if (kfk_stats_topic_get(&s_topic, &msg_total, &msg_error)) {
419                 LM_ERR("Failed to get statistics for topic: %.*s\n", s_topic.len, s_topic.s);
420                 rpc->fault(ctx, 500, "Failed to get per topic statistics");
421                 return;
422         }
423
424         LM_DBG("Topic: %.*s   messages: %" PRIu64 "  Errors: %" PRIu64 "\n",
425                    s_topic.len, s_topic.s, msg_total, msg_error);
426         if (rpc->rpl_printf(ctx, "Topic: %.*s  Total messages: %" PRIu64 "  Errors: %" PRIu64,
427                                                 s_topic.len, s_topic.s, msg_total, msg_error) < 0) {
428                 rpc->fault(ctx, 500, "Internal error showing statistics for topic: %.*s",
429                                    s_topic.len, s_topic.s);
430                 return;
431         }
432 }
433
434 static const char* rpc_kafka_stats_doc[2] = {
435         "Print general topic independent statistics",
436         0
437 };
438
439 static const char* rpc_kafka_stats_topic_doc[2] = {
440         "Print statistics based on topic",
441         0
442 };
443
444 static rpc_export_t rpc_cmds[] = {
445         {"kafka.stats", rpc_kafka_stats, rpc_kafka_stats_doc, 0},
446         {"kafka.stats_topic", rpc_kafka_stats_topic, rpc_kafka_stats_topic_doc, 0},
447         {0, 0, 0, 0}
448 };