a9165da106a4cea7f60cf2c87ddeed0ed1045038
[sip-router] / src / modules / kafka / kafka_mod.c
1 /*
2  * Copyright (C) 2019 Vicente Hernando (Sonoc https://www.sonoc.io)
3  *
4  * This file is part of Kamailio, a free SIP server.
5  *
6  * Kamailio is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version
10  *
11  * Kamailio is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
19  *
20  */
21
22 /**
23  * \file
24  * \brief Kafka :: Module Core
25  * \ingroup kfk
26  *
27  * - Module: \ref kfk
28  */
29
30 /**
31  * \defgroup kfk Kafka :: Kafka module for Kamailio
32  *
33  * This module contains functions related to Apache Kafka initialization and closing,
34  * as well as the module interface.
35  * It uses librdkafka library.
36  * Currently it only provides producer capabilites.
37  */
38
39 /* Headers */
40 #include <inttypes.h>
41
42 #include "../../core/sr_module.h"
43 #include "../../core/dprint.h"
44 #include "../../core/mod_fix.h"
45 #include "../../core/kemi.h"
46 #include "../../core/rpc.h"
47 #include "../../core/rpc_lookup.h"
48
49 #include "kfk.h"
50
51 MODULE_VERSION
52
53 /* Declaration of static variables and functions. */
54
55 static rpc_export_t rpc_cmds[];
56 static int mod_init(void);
57 static void mod_destroy(void);
58 static int child_init(int rank);
59 static int fixup_kafka_send(void** param, int param_no);
60 static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage);
61
62 /*
63  * Variables and functions to deal with module parameters.
64  */
65 char *brokers_param = NULL; /**< List of brokers. */
66 static int kafka_conf_param(modparam_t type, void *val);
67 static int kafka_topic_param(modparam_t type, void *val);
68
69 /**
70  * \brief Module commands
71  */
72 static cmd_export_t cmds[] = {
73         {"kafka_send", (cmd_function)w_kafka_send, 2, fixup_kafka_send,
74          0, ANY_ROUTE},
75     { 0, 0, 0, 0, 0, 0}
76 };
77
78 /**
79  * \brief Structure for module parameters.
80  */
81 static param_export_t params[]={
82         {"brokers", PARAM_STRING, &brokers_param},
83         {"configuration", PARAM_STRING|USE_FUNC_PARAM, (void*)kafka_conf_param},
84         {"topic", PARAM_STRING|USE_FUNC_PARAM, (void*)kafka_topic_param},
85     {0, 0, 0}
86 };
87
88 /**
89  * \brief Kafka :: Module interface
90  */
91 struct module_exports exports = {
92         "kafka",
93         DEFAULT_DLFLAGS, /* dlopen flags */
94         cmds,
95         params,
96         0,              /* exported RPC methods */
97         0,              /* exported pseudo-variables */
98         0,              /* response function */
99         mod_init,       /* module initialization function */
100         child_init,             /* per child init function */
101         mod_destroy     /* destroy function */
102 };
103
104 static int mod_init(void)
105 {
106         /* Register RPC commands. */
107         if (rpc_register_array(rpc_cmds) != 0) {
108                 LM_ERR("Failed to register RPC commands\n");
109                 return -1;
110         }
111
112         /* Initialize statistics. */
113         if (kfk_stats_init()) {
114                 LM_ERR("Failed to initialize statistics\n");
115                 return -1;
116         }
117         
118         return 0;
119 }
120
121 static int child_init(int rank)
122 {
123         /* skip child init for non-worker process ranks */
124         /* if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN) */
125         /* We execute kfk_init in PROC_MAIN so it cleans messages, etc right 
126            when destroying the module. */
127         if (rank==PROC_INIT || rank==PROC_TCP_MAIN)
128                 return 0;
129
130         if (kfk_init(brokers_param)) {
131                 LM_ERR("Failed to initialize Kafka\n");
132                 return -1;
133         }
134         return 0;
135 }
136
137 static void mod_destroy(void)
138 {
139         LM_DBG("cleaning up\n");
140
141         kfk_close();
142
143         kfk_stats_close();
144 }
145
146 /**
147  * \brief Parse configuration parameter.
148  */
149 static int kafka_conf_param(modparam_t type, void *val)
150 {
151         return kfk_conf_parse((char*)val);
152 }
153
154 /**
155  * \brief Parse topic parameter.
156  */
157 static int kafka_topic_param(modparam_t type, void *val)
158 {
159         return kfk_topic_parse((char*)val);
160 }
161
162 static int fixup_kafka_send(void** param, int param_no)
163 {
164         return fixup_spve_null(param, 1);
165 }
166
167 /**
168  * \brief Send a message via Kafka
169  */
170 static int w_kafka_send(struct sip_msg* msg, char* ptopic, char *pmessage)
171 {
172         str s_topic;
173
174         if (ptopic == NULL) {
175                 LM_ERR("Invalid topic parameter\n");
176                 return -1;
177         }
178
179         if (get_str_fparam(&s_topic, msg, (gparam_t*)ptopic)!=0) {
180                 LM_ERR("No topic\n");
181                 return -1;
182         }
183         if (s_topic.s == NULL || s_topic.len == 0) {
184                 LM_ERR("Invalid topic string\n");
185                 return -1;
186         }
187
188         str s_message;
189
190         if (pmessage == NULL) {
191                 LM_ERR("Invalid message parameter\n");
192                 return -1;
193         }
194
195         if (get_str_fparam(&s_message, msg, (gparam_t*)pmessage)!=0) {
196                 LM_ERR("No message\n");
197                 return -1;
198         }
199         if (s_message.s == NULL || s_message.len == 0) {
200                 LM_ERR("Invalid message string\n");
201                 return -1;
202         }
203
204         if (kfk_message_send(&s_topic, &s_message)) {
205                 LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
206                            s_topic.len, s_topic.s,
207                            s_message.len, s_message.s);
208                 return -1;
209         }
210
211         LM_DBG("Message sent (Topic: %.*s) : %.*s\n",
212                    s_topic.len, s_topic.s,
213                    s_message.len, s_message.s);
214         return 1;
215 }
216
217 /**
218  * \brief KEMI function to send a Kafka message.
219  */
220 static int ki_kafka_send(struct sip_msg* msg, str *s_topic, str *s_message)
221 {
222         if (s_topic == NULL || s_topic->s == NULL || s_topic->len == 0) {
223                 LM_ERR("Invalid topic string\n");
224                 return -1;
225         }
226
227         if (s_message == NULL || s_message->s == NULL || s_message->len == 0) {
228                 LM_ERR("Invalid message string\n");
229                 return -1;
230         }
231
232         if (kfk_message_send(s_topic, s_message)) {
233                 LM_ERR("Cannot send kafka (topic: %.*s) message: %.*s\n",
234                            s_topic->len, s_topic->s,
235                            s_message->len, s_message->s);
236                 return -1;
237         }
238
239         LM_DBG("Message sent (Topic: %.*s) : %.*s\n",
240                    s_topic->len, s_topic->s,
241                    s_message->len, s_message->s);
242         return 1;
243 }
244
245 /**
246  * \brief Kafka :: Array with KEMI functions
247  */
248 /* clang-format off */
249 static sr_kemi_t sr_kemi_kafka_exports[] = {
250         { str_init("kafka"), str_init("send"),
251           SR_KEMIP_INT, ki_kafka_send,
252           { SR_KEMIP_STR, SR_KEMIP_STR, SR_KEMIP_NONE,
253                 SR_KEMIP_NONE, SR_KEMIP_NONE, SR_KEMIP_NONE }
254         },
255
256         { {0, 0}, {0, 0}, 0, NULL, { 0, 0, 0, 0, 0, 0 } }
257 };
258 /* clang-format on */
259
260 /**
261  * \brief Kafka :: register Kafka module
262  */
263 int mod_register(char *path, int *dlflags, void *p1, void *p2)
264 {
265         sr_kemi_modules_add(sr_kemi_kafka_exports);
266         return 0;
267 }
268
269 static void rpc_kafka_stats(rpc_t *rpc, void *ctx)
270 {
271         uint64_t msg_total = 0;
272         uint64_t msg_error = 0;
273
274         if (kfk_stats_get(&msg_total, &msg_error)) {
275                 LM_ERR("Failed to get total statistics\n");
276                 rpc->fault(ctx, 500, "Failed to get total statistics");
277                 return;
278         }
279
280         LM_DBG("Total messages: %" PRIu64 "  Errors: %" PRIu64 "\n",
281                    msg_total, msg_error);
282         if (rpc->rpl_printf(ctx, "Total messages: %" PRIu64 "  Errors: %" PRIu64,
283                                                 msg_total, msg_error) < 0) {
284                 rpc->fault(ctx, 500, "Internal error showing total statistics");
285                 return;
286         }
287 }
288
289 static void rpc_kafka_stats_topic(rpc_t *rpc, void *ctx)
290 {
291         str s_topic;
292
293         if (rpc->scan(ctx, "S", &s_topic) < 1) {
294                 rpc->fault(ctx, 400, "required topic string");
295                 return;
296         }
297
298         if (s_topic.len == 0 || s_topic.s == NULL) {
299                 LM_ERR("Bad topic name\n");
300                 rpc->fault(ctx, 400, "Bad topic name");
301                 return;
302         }
303         
304         uint64_t msg_total = 0;
305         uint64_t msg_error = 0;
306
307         if (kfk_stats_topic_get(&s_topic, &msg_total, &msg_error)) {
308                 LM_ERR("Failed to get statistics for topic: %.*s\n", s_topic.len, s_topic.s);
309                 rpc->fault(ctx, 500, "Failed to get per topic statistics");
310                 return;
311         }
312
313         LM_DBG("Topic: %.*s   messages: %" PRIu64 "  Errors: %" PRIu64 "\n",
314                    s_topic.len, s_topic.s, msg_total, msg_error);
315         if (rpc->rpl_printf(ctx, "Topic: %.*s  Total messages: %" PRIu64 "  Errors: %" PRIu64,
316                                                 s_topic.len, s_topic.s, msg_total, msg_error) < 0) {
317                 rpc->fault(ctx, 500, "Internal error showing statistics for topic: %.*s",
318                                    s_topic.len, s_topic.s);
319                 return;
320         }
321 }
322
323 static const char* rpc_kafka_stats_doc[2] = {
324         "Print general topic independent statistics",
325         0
326 };
327
328 static const char* rpc_kafka_stats_topic_doc[2] = {
329         "Print statistics based on topic",
330         0
331 };
332
333 static rpc_export_t rpc_cmds[] = {
334         {"kafka.stats", rpc_kafka_stats, rpc_kafka_stats_doc, 0},
335         {"kafka.stats_topic", rpc_kafka_stats_topic, rpc_kafka_stats_topic_doc, 0},
336         {0, 0, 0, 0}
337 };