2 * Copyright (C) 2009 1&1 Internet AG
4 * This file is part of Kamailio, a free SIP server.
6 * Kamailio is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version
11 * Kamailio is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23 * @brief Contains the functions exported by the module.
27 #include "../../core/sr_module.h"
28 #include "../../core/mem/mem.h"
29 #include "../../core/mem/shm_mem.h"
30 #include "../../core/rpc_lookup.h"
36 #include <sys/socket.h>
37 #include <netinet/in.h>
38 #include <arpa/inet.h>
45 static char* modp_server = NULL; /*!< format: \<host\>:\<port\>,... */
46 static int timeout = 50; /*!< timeout for queries in milliseconds */
47 static int timeoutlogs = -10; /*!< for aggregating timeout logs */
48 static int *active = NULL;
49 static uint16_t *global_id = NULL;
53 * Generic parameter that holds a string, an int or an pseudo-variable
54 * @todo replace this with gparam_t
75 /* ---- exported commands: */
76 static int pdb_query(struct sip_msg *_msg, struct multiparam_t *_number,
77 struct multiparam_t *_dstavp);
79 /* ---- fixup functions: */
80 static int pdb_query_fixup(void **arg, int arg_no);
82 /* ---- module init functions: */
83 static int mod_init(void);
84 static int child_init(int rank);
85 static int rpc_child_init(void);
86 static void mod_destroy();
88 /* debug function for the new client <-> server protocol */
89 static void pdb_msg_dbg(struct pdb_msg msg, char *dbg_msg);
91 /* build the new protocol message before transmission */
92 static int pdb_msg_format_send(struct pdb_msg *msg,
93 uint8_t version, uint8_t type,
94 uint8_t code, uint16_t id,
95 char *payload, uint16_t payload_len);
97 static cmd_export_t cmds[]={
98 { "pdb_query", (cmd_function)pdb_query, 2, pdb_query_fixup, 0, REQUEST_ROUTE | FAILURE_ROUTE },
103 static param_export_t params[] = {
104 {"server", PARAM_STRING, &modp_server },
105 {"timeout", INT_PARAM, &timeout },
110 struct module_exports exports = {
111 "pdb", /* module name */
112 DEFAULT_DLFLAGS, /* dlopen flags */
113 cmds, /* cmd (cfg function) exports */
114 params, /* param exports */
115 0, /* RPC method exports */
116 0, /* pseudo-variables exports */
117 0, /* response handling function */
118 mod_init, /* Module initialization function */
119 child_init, /* Child initialization function */
120 mod_destroy /* Destroy function */
124 struct server_item_t {
125 struct server_item_t *next;
127 unsigned short int port;
128 struct sockaddr_in dstaddr;
129 socklen_t dstaddrlen;
134 struct server_list_t {
135 struct server_item_t *head;
141 /*! global server list */
142 static struct server_list_t *server_list;
145 /* debug function for the new client <-> server protocol */
146 static void pdb_msg_dbg(struct pdb_msg msg, char *dbg_msg) {
148 char buf[PAYLOADSIZE * 3 + 1];
151 if(msg.hdr.length > sizeof(msg.hdr)) {
152 for (i = 0; i < msg.hdr.length - sizeof(msg.hdr); i++) {
153 ptr += sprintf(ptr, "%02X ", msg.bdy.payload[i]);
160 "version = %d\ntype = %d\ncode = %d\nid = %d\nlen = %d\n"
163 msg.hdr.version, msg.hdr.type, msg.hdr.code, msg.hdr.id, msg.hdr.length,
167 /* build the message before send */
168 static int pdb_msg_format_send(struct pdb_msg *msg,
169 uint8_t version, uint8_t type,
170 uint8_t code, uint16_t id,
171 char *payload, uint16_t payload_len)
173 msg->hdr.version = version;
174 msg->hdr.type = type;
175 msg->hdr.code = code;
178 if (payload == NULL) {
179 /* just ignore the NULL buff (called when just want to set the len) */
180 msg->hdr.length = sizeof(struct pdb_hdr);
183 msg->hdr.length = sizeof(struct pdb_hdr) + payload_len;
184 memcpy(msg->bdy.payload, payload, payload_len);
194 * \return 1 if query for the number succeded and the avp with the corresponding carrier id was set,
197 static int pdb_query(struct sip_msg *_msg, struct multiparam_t *_number, struct multiparam_t *_dstavp)
200 struct timeval tstart, tnow;
201 struct server_item_t *server;
202 short int carrierid, *_id;
204 char buf[sizeof(struct pdb_msg)];
208 int i, ret, nflush, bytes_received;
210 str number = STR_NULL;
212 if ((active == NULL) || (*active == 0)) return -1;
214 switch (_number->type) {
216 number = _number->u.s;
219 avp = search_first_avp(_number->u.a.flags, _number->u.a.name, &avp_val, 0);
221 LM_ERR("cannot find AVP '%.*s'\n", _number->u.a.name.s.len, _number->u.a.name.s.s);
224 if ((avp->flags&AVP_VAL_STR)==0) {
225 LM_ERR("cannot process integer value in AVP '%.*s'\n", _number->u.a.name.s.len, _number->u.a.name.s.s);
228 else number = avp_val.s;
231 if (pv_printf_s(_msg, _number->u.p, &number)<0) {
232 LM_ERR("cannot print the number\n");
237 LM_ERR("invalid number type\n");
241 LM_DBG("querying '%.*s'...\n", number.len, number.s);
242 if (server_list == NULL) return -1;
243 if (server_list->fds == NULL) return -1;
245 if (gettimeofday(&tstart, NULL) != 0) {
246 LM_ERR("gettimeofday() failed with errno=%d (%s)\n", errno, strerror(errno));
250 /* clear recv buffer */
251 server = server_list->head;
254 while (recv(server->sock, buf, sizeof(struct pdb_msg), MSG_DONTWAIT) > 0) {
256 if (gettimeofday(&tnow, NULL) != 0) {
257 LM_ERR("gettimeofday() failed with errno=%d (%s)\n", errno, strerror(errno));
260 td=(tnow.tv_usec-tstart.tv_usec+(tnow.tv_sec-tstart.tv_sec)*1000000) / 1000;
262 LM_NOTICE("exceeded timeout while flushing recv buffer.\n");
266 LM_DBG("flushed %d packets for '%s:%d'\n", nflush, server->host, server->port);
267 server = server ->next;
270 /* prepare request */
271 reqlen = number.len + 1; /* include null termination */
272 if (reqlen > PAYLOADSIZE) {
273 LM_ERR("number too long '%.*s'.\n", number.len, number.s);
276 strncpy(buf, number.s, number.len);
277 buf[number.len] = '\0';
279 switch (PDB_VERSION) {
281 pdb_msg_format_send(&msg, PDB_VERSION, PDB_TYPE_REQUEST_ID, PDB_CODE_DEFAULT, htons(*global_id), buf, reqlen);
282 pdb_msg_dbg(msg, "Kamailio pdb client sends:");
284 /* increment msg id for the next request */
285 *global_id = *global_id + 1;
287 /* send request to all servers */
288 server = server_list->head;
290 LM_DBG("sending request to '%s:%d'\n", server->host, server->port);
291 ret=sendto(server->sock, (struct pdb_msg*)&msg, msg.hdr.length, MSG_DONTWAIT, (struct sockaddr *)&(server->dstaddr), server->dstaddrlen);
293 LM_ERR("sendto() failed with errno=%d (%s)\n", errno, strerror(errno));
295 server = server->next;
299 /* send request to all servers */
300 server = server_list->head;
302 LM_DBG("sending request to '%s:%d'\n", server->host, server->port);
303 ret=sendto(server->sock, buf, reqlen, MSG_DONTWAIT, (struct sockaddr *)&(server->dstaddr), server->dstaddrlen);
305 LM_ERR("sendto() failed with errno=%d (%s)\n", errno, strerror(errno));
307 server = server->next;
312 memset(&msg, 0, sizeof(struct pdb_msg));
313 /* wait for response */
315 if (gettimeofday(&tnow, NULL) != 0) {
316 LM_ERR("gettimeofday() failed with errno=%d (%s)\n", errno, strerror(errno));
319 td=(tnow.tv_usec-tstart.tv_usec+(tnow.tv_sec-tstart.tv_sec)*1000000) / 1000;
323 LM_NOTICE("exceeded timeout while waiting for response.\n");
325 else if (timeoutlogs>1000) {
326 LM_NOTICE("exceeded timeout %d times while waiting for response.\n", timeoutlogs);
332 ret=poll(server_list->fds, server_list->nserver, timeout-td);
333 for (i=0; i<server_list->nserver; i++) {
334 if (server_list->fds[i].revents & POLLIN) {
335 if ((bytes_received = recv(server_list->fds[i].fd, buf, sizeof(struct pdb_msg), MSG_DONTWAIT)) > 0) { /* do not block - just in case select/poll was wrong */
336 switch (PDB_VERSION) {
338 memcpy(&msg, buf, bytes_received);
339 pdb_msg_dbg(msg, "Kamailio pdb client receives:");
341 _idv = msg.hdr.id; /* make gcc happy */
342 msg.hdr.id = ntohs(_idv);
344 switch (msg.hdr.code) {
346 msg.bdy.payload[sizeof(struct pdb_bdy) - 1] = '\0';
347 if (strcmp(msg.bdy.payload, number.s) == 0) {
348 _id = (short int *)&(msg.bdy.payload[reqlen]); /* make gcc happy */
349 carrierid=ntohs(*_id); /* convert to host byte order */
353 case PDB_CODE_NOT_NUMBER:
354 LM_NOTICE("Number %s has letters in it\n", number.s);
357 case PDB_CODE_NOT_FOUND:
358 LM_NOTICE("Number %s pdb_id not found\n", number.s);
362 LM_NOTICE("Invalid code %d received\n", msg.hdr.code);
369 buf[sizeof(struct pdb_msg) - 1] = '\0';
370 if (strncmp(buf, number.s, number.len) == 0) {
371 _id = (short int *)&(buf[reqlen]);
372 carrierid=ntohs(*_id); /* convert to host byte order */
379 server_list->fds[i].revents = 0;
385 LM_NOTICE("exceeded timeout while waiting for response (buffered %d lines).\n", timeoutlogs);
388 if (gettimeofday(&tnow, NULL) == 0) {
389 LM_INFO("got an answer in %f ms\n", ((double)(tnow.tv_usec-tstart.tv_usec+(tnow.tv_sec-tstart.tv_sec)*1000000))/1000);
393 if (add_avp(_dstavp->u.a.flags, _dstavp->u.a.name, avp_val)<0) {
394 LM_ERR("add AVP failed\n");
403 * fixes the module functions' parameters if it is a phone number.
404 * supports string, pseudo-variables and AVPs.
406 * @param param the parameter
407 * @return 0 on success, -1 on failure
409 static int mp_fixup(void ** param) {
411 struct multiparam_t *mp;
414 mp = (struct multiparam_t *)pkg_malloc(sizeof(struct multiparam_t));
419 memset(mp, 0, sizeof(struct multiparam_t));
421 s.s = (char *)(*param);
430 /* This is a pseudo-variable */
431 if (pv_parse_spec(&s, &avp_spec)==0) {
432 LM_ERR("pv_parse_spec failed for '%s'\n", (char *)(*param));
436 if (avp_spec.type==PVT_AVP) {
437 /* This is an AVP - could be an id or name */
439 if(pv_get_avp_name(0, &(avp_spec.pvp), &(mp->u.a.name), &(mp->u.a.flags))!=0) {
440 LM_ERR("Invalid AVP definition <%s>\n", (char *)(*param));
446 if(pv_parse_format(&s, &(mp->u.p))<0) {
447 LM_ERR("pv_parse_format failed for '%s'\n", (char *)(*param));
460 * fixes the module functions' parameters in case of AVP names.
462 * @param param the parameter
463 * @return 0 on success, -1 on failure
465 static int avp_name_fixup(void ** param) {
467 struct multiparam_t *mp;
470 s.s = (char *)(*param);
472 if (s.len <= 0) return -1;
473 if (pv_parse_spec(&s, &avp_spec)==0 || avp_spec.type!=PVT_AVP) {
474 LM_ERR("Malformed or non AVP definition <%s>\n", (char *)(*param));
478 mp = (struct multiparam_t *)pkg_malloc(sizeof(struct multiparam_t));
483 memset(mp, 0, sizeof(struct multiparam_t));
486 if(pv_get_avp_name(0, &(avp_spec.pvp), &(mp->u.a.name), &(mp->u.a.flags))!=0) {
487 LM_ERR("Invalid AVP definition <%s>\n", (char *)(*param));
498 static int pdb_query_fixup(void **arg, int arg_no)
502 if (mp_fixup(arg) < 0) {
503 LM_ERR("cannot fixup parameter %d\n", arg_no);
507 else if (arg_no == 2) {
508 /* destination avp name */
509 if (avp_name_fixup(arg) < 0) {
510 LM_ERR("cannot fixup parameter %d\n", arg_no);
520 * Adds new server structure to server list.
521 * \return 0 on success -1 otherwise
523 static int add_server(char *host, char *port)
526 struct server_item_t *server;
528 LM_DBG("adding server '%s:%s'\n", host, port);
529 server= pkg_malloc(sizeof(struct server_item_t));
530 if (server == NULL) {
534 memset(server, 0, sizeof(struct server_item_t));
536 server->next = server_list->head;
537 server_list->head = server;
539 server->host = pkg_malloc(strlen(host)+1);
540 if (server->host == NULL) {
544 strcpy(server->host, host);
546 ret=strtol(port, NULL, 10);
547 if ((ret<0) || (ret>65535)) {
548 LM_ERR("invalid port '%s'\n", port);
558 * Prepares data structures for all configured servers.
559 * \return 0 on success, -1 otherwise
561 static int prepare_server(void)
563 char *p, *dst, *end, *sep, *host, *port;
565 if (modp_server == NULL) {
566 LM_ERR("server parameter missing.\n");
570 /* Remove white space from db_sources */
571 for (p = modp_server, dst = modp_server; *p != '\0'; ++p, ++dst) {
572 while (isspace(*p)) ++p;
581 sep = strchr(p, ':');
583 LM_ERR("syntax error in sources parameter.\n");
590 sep = strchr(p, ',');
591 if (sep == NULL) sep = end;
596 if (add_server(host, port) != 0) return -1;
603 static void destroy_server_list(void)
606 while (server_list->head) {
607 struct server_item_t *server = server_list->head;
608 server_list->head = server->next;
609 if (server->host) pkg_free(server->host);
612 pkg_free(server_list);
619 * Allocates memory and builds a list of all servers defined in module parameter.
620 * \return 0 on success, -1 otherwise
622 static int init_server_list(void)
624 server_list = pkg_malloc(sizeof(struct server_list_t));
625 if (server_list == NULL) {
629 memset(server_list, 0, sizeof(struct server_list_t));
631 if (prepare_server() != 0) {
632 destroy_server_list();
641 * Initializes sockets for all servers in server list.
642 * \return 0 on success, -1 otherwise
644 static int init_server_socket(void)
646 struct server_item_t *server;
651 server_list->nserver=0;
652 server = server_list->head;
654 LM_DBG("initializing socket for '%s:%d'\n", server->host, server->port);
655 server->sock = socket(AF_INET, SOCK_DGRAM, 0);
656 if (server->sock<0) {
657 LM_ERR("socket() failed with errno=%d (%s).\n", errno, strerror(errno));
661 memset(&(server->dstaddr), 0, sizeof(server->dstaddr));
662 server->dstaddr.sin_family = AF_INET;
663 server->dstaddr.sin_port = htons(server->port);
664 hp = gethostbyname(server->host);
666 LM_ERR("gethostbyname(%s) failed with h_errno=%d.\n", server->host, h_errno);
671 memcpy(&(server->dstaddr.sin_addr.s_addr), hp->h_addr, hp->h_length);
672 server->dstaddrlen=sizeof(server->dstaddr);
674 server = server->next;
675 server_list->nserver++;
678 LM_DBG("got %d server in list\n", server_list->nserver);
679 server_list->fds = pkg_malloc(sizeof(struct pollfd)*server_list->nserver);
680 if (server_list->fds == NULL) {
684 memset(server_list->fds, 0, sizeof(struct pollfd)*server_list->nserver);
687 server = server_list->head;
689 server_list->fds[i].fd=server->sock;
690 server_list->fds[i].events=POLLIN;
691 server = server->next;
700 * Destroys sockets for all servers in server list.
702 static void destroy_server_socket(void)
705 struct server_item_t *server = server_list->head;
707 if (server->sock>0) close(server->sock);
708 server = server->next;
710 if (server_list->fds) pkg_free(server_list->fds);
715 static void pdb_rpc_status(rpc_t* rpc, void* ctx)
718 if (active == NULL) {
719 rpc->fault(ctx, 500, "Active field not initialized");
722 if (rpc->add(ctx, "{", &vh) < 0) {
723 rpc->fault(ctx, 500, "Server error");
726 rpc->struct_add(vh, "ds",
728 "status", (*active)?"active":"inactive");
731 static void pdb_rpc_activate(rpc_t* rpc, void* ctx)
733 if (active == NULL) {
734 rpc->fault(ctx, 500, "Active field not initialized");
740 static void pdb_rpc_deactivate(rpc_t* rpc, void* ctx)
742 if (active == NULL) {
743 rpc->fault(ctx, 500, "Active field not initialized");
749 static const char* pdb_rpc_status_doc[2] = {
750 "Get the pdb status.",
754 static const char* pdb_rpc_activate_doc[2] = {
759 static const char* pdb_rpc_deactivate_doc[2] = {
764 rpc_export_t pdb_rpc[] = {
765 {"pdb.status", pdb_rpc_status, pdb_rpc_status_doc, 0},
766 {"pdb.activate", pdb_rpc_activate, pdb_rpc_activate_doc, 0},
767 {"pdb.deactivate", pdb_rpc_deactivate, pdb_rpc_deactivate_doc, 0},
771 static int pdb_rpc_init(void)
773 if (rpc_register_array(pdb_rpc)!=0)
775 LM_ERR("failed to register RPC commands\n");
781 static int mod_init(void)
783 if(pdb_rpc_init()<0) {
784 LM_ERR("failed to register RPC commands\n");
787 active = shm_malloc(sizeof(*active));
788 if (active == NULL) {
794 if (init_server_list() != 0) {
799 global_id = (uint16_t*)shm_malloc(sizeof(uint16_t));
804 static int child_init (int rank)
806 if(rank==PROC_INIT || rank==PROC_TCP_MAIN)
808 return rpc_child_init();
812 static int pdb_child_initialized = 0;
814 static int rpc_child_init(void)
816 if(pdb_child_initialized)
818 if (init_server_socket() != 0) return -1;
819 pdb_child_initialized = 1;
824 static void mod_destroy(void)
826 destroy_server_socket();
827 destroy_server_list();
828 if (active) shm_free(active);