modules/sipcapture: Extended sipcapture to support multiple tables
authorDragos Dinu <dragos.dinu@1and1.ro>
Fri, 20 Jul 2012 07:48:48 +0000 (10:48 +0300)
committerMarius Zbihlei <marius.zbihlei@1and1.ro>
Fri, 20 Jul 2012 07:48:48 +0000 (10:48 +0300)
The sipcapture module can support storing the information to multiple sql tables.
Tests have shown that a major bottleneck against scalability on multi core CPU of
the capture node instance was caused by using a single MySQL Table.
The frontend (HOMER) will be soon patched to support retrieval of data from multiple
sources.

The decision to select witch table is written to, can be configured from random,
round robin or hashing via username or callid.

modules/sipcapture/README
modules/sipcapture/doc/sipcapture_admin.xml
modules/sipcapture/hash_mode.c [new file with mode: 0644]
modules/sipcapture/hash_mode.h [new file with mode: 0644]
modules/sipcapture/sipcapture.c
modules/sipcapture/sipcapture.h

index 5afa2c0..a9d07af 100644 (file)
@@ -29,17 +29,19 @@ Alexandr Dubovikov
 
               3.1. db_url (str)
               3.2. table_name (str)
-              3.3. db_insert_mode (integer)
-              3.4. capture_on (integer)
-              3.5. hep_capture_on (integer)
-              3.6. raw_ipip_capture_on (integer)
-              3.7. raw_moni_capture_on (integer)
-              3.8. raw_socket_listen (string)
-              3.9. raw_interface (string)
-              3.10. raw_sock_children (integer)
-              3.11. promiscuous_on (integer)
-              3.12. raw_moni_bpf_on (integer)
-              3.13. capture_node (str)
+              3.3. mt_mode (str)
+              3.4. hash_source (str)
+              3.5. db_insert_mode (integer)
+              3.6. capture_on (integer)
+              3.7. hep_capture_on (integer)
+              3.8. raw_ipip_capture_on (integer)
+              3.9. raw_moni_capture_on (integer)
+              3.10. raw_socket_listen (string)
+              3.11. raw_interface (string)
+              3.12. raw_sock_children (integer)
+              3.13. promiscuous_on (integer)
+              3.14. raw_moni_bpf_on (integer)
+              3.15. capture_node (str)
 
         4. MI Commands
 
@@ -52,17 +54,19 @@ Alexandr Dubovikov
 
    1.1. Set db_url parameter
    1.2. Set sip_capture parameter
-   1.3. db_insert_mode example
-   1.4. Set capture_on parameter
-   1.5. Set hep_capture_on parameter
-   1.6. Set raw_ipip_capture_on parameter
-   1.7. Set raw_moni_capture_on parameter
-   1.8. Set raw_socket_listen parameter
-   1.9. Set raw_socket_listen parameter
+   1.3. Set mt_mode parameter
+   1.4. Set mt_mode parameter
+   1.5. db_insert_mode example
+   1.6. Set capture_on parameter
+   1.7. Set hep_capture_on parameter
+   1.8. Set raw_ipip_capture_on parameter
+   1.9. Set raw_moni_capture_on parameter
    1.10. Set raw_socket_listen parameter
-   1.11. Set hep_capture_on parameter
-   1.12. Set raw_moni_bpf_on parameter
-   1.13. Set capture_node parameter
+   1.11. Set raw_socket_listen parameter
+   1.12. Set raw_socket_listen parameter
+   1.13. Set hep_capture_on parameter
+   1.14. Set raw_moni_bpf_on parameter
+   1.15. Set capture_node parameter
 
 Chapter 1. Admin Guide
 
@@ -78,17 +82,19 @@ Chapter 1. Admin Guide
 
         3.1. db_url (str)
         3.2. table_name (str)
-        3.3. db_insert_mode (integer)
-        3.4. capture_on (integer)
-        3.5. hep_capture_on (integer)
-        3.6. raw_ipip_capture_on (integer)
-        3.7. raw_moni_capture_on (integer)
-        3.8. raw_socket_listen (string)
-        3.9. raw_interface (string)
-        3.10. raw_sock_children (integer)
-        3.11. promiscuous_on (integer)
-        3.12. raw_moni_bpf_on (integer)
-        3.13. capture_node (str)
+        3.3. mt_mode (str)
+        3.4. hash_source (str)
+        3.5. db_insert_mode (integer)
+        3.6. capture_on (integer)
+        3.7. hep_capture_on (integer)
+        3.8. raw_ipip_capture_on (integer)
+        3.9. raw_moni_capture_on (integer)
+        3.10. raw_socket_listen (string)
+        3.11. raw_interface (string)
+        3.12. raw_sock_children (integer)
+        3.13. promiscuous_on (integer)
+        3.14. raw_moni_bpf_on (integer)
+        3.15. capture_node (str)
 
    4. MI Commands
 
@@ -133,17 +139,19 @@ Chapter 1. Admin Guide
 
    3.1. db_url (str)
    3.2. table_name (str)
-   3.3. db_insert_mode (integer)
-   3.4. capture_on (integer)
-   3.5. hep_capture_on (integer)
-   3.6. raw_ipip_capture_on (integer)
-   3.7. raw_moni_capture_on (integer)
-   3.8. raw_socket_listen (string)
-   3.9. raw_interface (string)
-   3.10. raw_sock_children (integer)
-   3.11. promiscuous_on (integer)
-   3.12. raw_moni_bpf_on (integer)
-   3.13. capture_node (str)
+   3.3. mt_mode (str)
+   3.4. hash_source (str)
+   3.5. db_insert_mode (integer)
+   3.6. capture_on (integer)
+   3.7. hep_capture_on (integer)
+   3.8. raw_ipip_capture_on (integer)
+   3.9. raw_moni_capture_on (integer)
+   3.10. raw_socket_listen (string)
+   3.11. raw_interface (string)
+   3.12. raw_sock_children (integer)
+   3.13. promiscuous_on (integer)
+   3.14. raw_moni_bpf_on (integer)
+   3.15. capture_node (str)
 
 3.1. db_url (str)
 
@@ -158,7 +166,8 @@ modparam("sipcapture", "db_url", "mysql://user:passwd@host/dbname")
 
 3.2. table_name (str)
 
-   Name of the table's name where to store the SIP messages.
+   Name of the table's name where to store the SIP messages. Can contain
+   multiple tables, separated by "|".
 
    Default value is "sip_capture".
 
@@ -166,8 +175,37 @@ modparam("sipcapture", "db_url", "mysql://user:passwd@host/dbname")
 ...
 modparam("sipcapture", "table_name", "homer_capture")
 ...
+modparam("sipcapture", "table_name", "homer_capture1|homer_capture2");
+...
+
+3.3. mt_mode (str)
+
+   Name of the mode used for storing data in multiple tables. Modes can be
+   "rand" (random), "round_robin" (use a round_robin algorithm) or "hash"
+   (use hashing to determine the table to store). These modes are only
+   triggered if there is more than one table specified in table_name
+   parameter, separated by "|".
+
+   Default value is "rand".
+
+   Example 1.3. Set mt_mode parameter
+...
+modparam("sipcapture", "mt_mode", "hash")
+...
+
+3.4. hash_source (str)
 
-3.3. db_insert_mode (integer)
+   The field of the SIP message used for hashing, when mt_mode is set to
+   "hash". The value can be "call_id", "to_user" or "from_user".
+
+   Default value is "call_id".
+
+   Example 1.4. Set mt_mode parameter
+...
+modparam("sipcapture", "hash_source", "to_user")
+...
+
+3.5. db_insert_mode (integer)
 
    If set to 1, use INSERT DELAYED to store sip message into capture table
    when the DB driver has support for it. If no INSERT DELAYED support is
@@ -175,43 +213,43 @@ modparam("sipcapture", "table_name", "homer_capture")
 
    Default value is 0 (no INSERT DELAYED).
 
-   Example 1.3. db_insert_mode example
+   Example 1.5. db_insert_mode example
 modparam("sipcapture", "db_insert_mode", 1)
 
-3.4. capture_on (integer)
+3.6. capture_on (integer)
 
    Parameter to enable/disable capture globaly (on(1)/off(0))
 
    Default value is "0".
 
-   Example 1.4. Set capture_on parameter
+   Example 1.6. Set capture_on parameter
 ...
 modparam("sipcapture", "capture_on", 1)
 ...
 
-3.5. hep_capture_on (integer)
+3.7. hep_capture_on (integer)
 
    Parameter to enable/disable capture of HEP (on(1)/off(0))
 
    Default value is "0".
 
-   Example 1.5. Set hep_capture_on parameter
+   Example 1.7. Set hep_capture_on parameter
 ...
 modparam("sipcapture", "hep_capture_on", 1)
 ...
 
-3.6. raw_ipip_capture_on (integer)
+3.8. raw_ipip_capture_on (integer)
 
    Parameter to enable/disable IPIP capturing (on(1)/off(0))
 
    Default value is "0".
 
-   Example 1.6. Set raw_ipip_capture_on parameter
+   Example 1.8. Set raw_ipip_capture_on parameter
 ...
 modparam("sipcapture", "raw_ipip_capture_on", 1)
 ...
 
-3.7. raw_moni_capture_on (integer)
+3.9. raw_moni_capture_on (integer)
 
    Parameter to enable/disable monitoring/mirroring port capturing
    (on(1)/off(0)) Only one mode on raw socket can be enabled! Monitoring
@@ -219,12 +257,12 @@ modparam("sipcapture", "raw_ipip_capture_on", 1)
 
    Default value is "0".
 
-   Example 1.7. Set raw_moni_capture_on parameter
+   Example 1.9. Set raw_moni_capture_on parameter
 ...
 modparam("sipcapture", "raw_moni_capture_on", 1)
 ...
 
-3.8. raw_socket_listen (string)
+3.10. raw_socket_listen (string)
 
    Parameter indicate an listen IP address of RAW socket for IPIP
    capturing. You can also define a port/portrange for IPIP/Mirroring
@@ -242,49 +280,49 @@ modparam("sipcapture", "raw_moni_capture_on", 1)
 
    Default value is "".
 
-   Example 1.8. Set raw_socket_listen parameter
+   Example 1.10. Set raw_socket_listen parameter
 ...
 modparam("sipcapture", "raw_socket_listen", "10.0.0.1:5060-5090")
 ...
 modparam("sipcapture", "raw_socket_listen", "10.0.0.1:5060")
 ...
 
-3.9. raw_interface (string)
+3.11. raw_interface (string)
 
    Name of the interface to bind on the raw socket.
 
    Default value is "".
 
-   Example 1.9. Set raw_socket_listen parameter
+   Example 1.11. Set raw_socket_listen parameter
 ...
 modparam("sipcapture", "raw_interface", "eth0")
 ...
 
-3.10. raw_sock_children (integer)
+3.12. raw_sock_children (integer)
 
    Parameter define how much children must be created to listen the raw
    socket.
 
    Default value is "1".
 
-   Example 1.10. Set raw_socket_listen parameter
+   Example 1.12. Set raw_socket_listen parameter
 ...
 modparam("sipcapture", "raw_sock_children", 6)
 ...
 
-3.11. promiscuous_on (integer)
+3.13. promiscuous_on (integer)
 
    Parameter to enable/disable promiscuous mode on the raw socket. Linux
    only.
 
    Default value is "0".
 
-   Example 1.11. Set hep_capture_on parameter
+   Example 1.13. Set hep_capture_on parameter
 ...
 modparam("sipcapture", "promiscuous_on", 1)
 ...
 
-3.12. raw_moni_bpf_on (integer)
+3.14. raw_moni_bpf_on (integer)
 
    Activate Linux Socket Filter (LSF based on BPF) on the mirroring
    interface. The structure is defined in linux/filter.h. The default LSF
@@ -293,18 +331,18 @@ modparam("sipcapture", "promiscuous_on", 1)
 
    Default value is "0".
 
-   Example 1.12. Set raw_moni_bpf_on parameter
+   Example 1.14. Set raw_moni_bpf_on parameter
 ...
 modparam("sipcapture", "raw_moni_bpf_on", 1)
 ...
 
-3.13. capture_node (str)
+3.15. capture_node (str)
 
    Name of the capture node.
 
    Default value is "homer01".
 
-   Example 1.13. Set capture_node parameter
+   Example 1.15. Set capture_node parameter
 ...
 modparam("sipcapture", "capture_node", "homer03")
 ...
index 0122035..1e0f9df 100644 (file)
@@ -104,7 +104,7 @@ modparam("sipcapture", "db_url", "mysql://user:passwd@host/dbname")
        <section>
                <title><varname>table_name</varname> (str)</title>
                <para>
-               Name of the table's name where to store the SIP messages. 
+               Name of the table's name where to store the SIP messages. Can contain multiple tables, separated by "|".
                </para>
                <para>
                <emphasis>
@@ -117,7 +117,46 @@ modparam("sipcapture", "db_url", "mysql://user:passwd@host/dbname")
 ...
 modparam("sipcapture", "table_name", "homer_capture")
 ...
-
+modparam("sipcapture", "table_name", "homer_capture1|homer_capture2");
+...
+</programlisting>
+               </example>
+       </section>
+       <section>
+               <title><varname>mt_mode</varname> (str)</title>
+               <para>
+               Name of the mode used for storing data in multiple tables. Modes can be "rand" (random), "round_robin" (use a round_robin algorithm) or "hash" (use hashing to determine the table to store). These modes are only triggered if there is more than one table specified in table_name parameter, separated by "|". 
+               </para>
+               <para>
+               <emphasis>
+                       Default value is "rand".
+               </emphasis>
+               </para>
+               <example>
+               <title>Set <varname>mt_mode</varname> parameter</title>
+               <programlisting format="linespecific">
+...
+modparam("sipcapture", "mt_mode", "hash")
+...
+</programlisting>
+               </example>
+       </section>
+       <section>
+               <title><varname>hash_source</varname> (str)</title>
+               <para>
+               The field of the SIP message used for hashing, when mt_mode is set to "hash". The value can be "call_id", "to_user" or "from_user".
+               </para>
+               <para>
+               <emphasis>
+                       Default value is "call_id".
+               </emphasis>
+               </para>
+               <example>
+               <title>Set <varname>mt_mode</varname> parameter</title>
+               <programlisting format="linespecific">
+...
+modparam("sipcapture", "hash_source", "to_user")
+...
 </programlisting>
                </example>
        </section>
diff --git a/modules/sipcapture/hash_mode.c b/modules/sipcapture/hash_mode.c
new file mode 100644 (file)
index 0000000..7e0206a
--- /dev/null
@@ -0,0 +1,129 @@
+#include "../../sr_module.h"
+#include "../../crc.h"
+
+#include <ctype.h>
+
+#include "sipcapture.h"
+#include "hash_mode.h"
+
+static int get_source(struct _sipcapture_object *sco, enum hash_source source,
+                            str *source_string);
+static int get_call_id (struct _sipcapture_object *sco, str *source_string);
+static int get_from_user (struct _sipcapture_object *sco, str *source_string);
+static int get_to_user (struct _sipcapture_object *sco, str *source_string);
+
+
+static int first_token (str *source_string);
+
+
+int hash_func (struct _sipcapture_object * sco,
+                         enum hash_source source, int denominator) {
+       int ret;
+       unsigned int hash;
+       str source_string;
+
+       if(get_source (sco, source, &source_string) == -1) {
+               return -1;
+       }
+
+       LM_DBG("source string: [%.*s]\n", source_string.len, source_string.s);
+       crc32_uint(&source_string, &hash);
+
+       ret = hash % denominator;
+       return ret;
+}
+
+static int get_source (struct _sipcapture_object *sco, enum hash_source source,
+                             str *source_string) {
+       source_string->s = NULL;
+       source_string->len = 0;
+
+       switch (source) {
+                       case hs_call_id:
+                       return get_call_id (sco, source_string);
+                       case hs_from_user:
+                       return get_from_user(sco, source_string);
+                       case hs_to_user:
+                       return get_to_user(sco, source_string);
+                       default:
+                       LM_ERR("unknown hash source %i.\n",
+                            (int) source);
+                       return -1;
+       }
+}
+
+static int get_call_id (struct _sipcapture_object * sco, str *source_string) {
+
+       if (!sco->callid.s || !sco->callid.len)
+       {
+               return -1;
+       }
+       source_string->s = sco->callid.s;
+       source_string->len = sco->callid.len;
+       first_token (source_string);
+       return 0;
+}
+
+static int get_from_user (struct _sipcapture_object * sco, str *source_string) {
+
+       if (!sco->from_user.s || !sco->from_user.len)
+       {
+               return -1;
+       }
+       source_string->s = sco->from_user.s;
+       source_string->len = sco->from_user.len;
+       return 0;
+}
+
+
+static int get_to_user (struct _sipcapture_object * sco, str *source_string) {
+
+       if (!sco->to_user.s || !sco->to_user.len)
+       {
+               return -1;
+       }
+       source_string->s = sco->to_user.s;
+       source_string->len = sco->to_user.len;
+       return 0;
+}
+
+
+static int first_token (str *source_string) {
+       size_t len;
+
+       if (source_string->s == NULL || source_string->len == 0) {
+               return 0;
+       }
+
+       while (source_string->len > 0 && isspace (*source_string->s)) {
+               ++source_string->s;
+               --source_string->len;
+       }
+       for (len = 0; len < source_string->len; ++len) {
+               if (isspace (source_string->s[len])) {
+                       source_string->len = len;
+                       break;
+               }
+       }
+       return 0;
+}
+
+
+enum hash_source get_hash_source (const char *hash_source){
+
+       if (strcasecmp ("call_id", hash_source) == 0){
+               return hs_call_id;
+       }
+       else if (strcasecmp("from_user", hash_source) == 0)
+       {
+               return hs_from_user;
+       }
+       else if (strcasecmp("to_user", hash_source) == 0)
+       {
+               return hs_to_user;
+       }
+       else {
+               return hs_error;
+       }
+
+}
diff --git a/modules/sipcapture/hash_mode.h b/modules/sipcapture/hash_mode.h
new file mode 100644 (file)
index 0000000..1b5b53e
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * $Id$
+ *
+ * Copyright (C) 2012 dragos.dinu@1and1.ro, 1&1 Internet AG
+ *
+ * This file is part of SIP-router, a free SIP server.
+ *
+ * SIP-router is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version
+ *
+ * SIP-router is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License 
+ * along with this program; if not, write to the Free Software 
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ */
+
+
+#ifndef HASH_MODE_H
+#define HASH_MODE_H 1
+
+
+
+
+/*
+ * Determines from which part of a message will be used to calculate the hash
+ * Possible values are:
+ * 
+ * hs_call_id     the content of the Call-ID header field
+ * hs_from_user   the username part of the URI in the From header field
+ * hs_to_user     the username part of the URI in the To header field
+ * hs_error       no hash specified
+*/
+enum hash_source {
+       hs_call_id = 1,
+       hs_from_user,
+       hs_to_user,
+       hs_error
+};
+
+
+/*
+ * CRC32 hash function
+ * Returns an integer number between 0 and denominator - 1 based on
+ * the hash source from the msg. The hash algorithm is CRC32.
+*/
+int hash_func (struct _sipcapture_object * sco,
+                         enum hash_source source, int denominator);
+
+/*
+ * Gets the hash source type.
+*/
+enum hash_source get_hash_source (const char *hash_source);
+
+#endif
index ce7251d..818c342 100644 (file)
@@ -74,6 +74,7 @@
 #include "../../resolve.h"
 #include "../../receive.h"
 #include "sipcapture.h"
+#include "hash_mode.h"
 
 #ifdef STATISTICS
 #include "../../lib/kcore/statistics.h"
 
 MODULE_VERSION
 
-struct _sipcapture_object {
-       str method;
-       str reply_reason;
-       str ruri;
-       str ruri_user;
-       str from_user;
-       str from_tag;
-       str to_user;
-       str to_tag;
-       str pid_user;
-       str contact_user;
-       str auth_user;
-       str callid;
-       str callid_aleg;
-       str via_1;
-       str via_1_branch;
-       str cseq;
-       str diversion;
-       str reason;
-       str content_type;
-       str authorization;
-       str user_agent;
-       str source_ip;
-       int source_port;
-       str destination_ip;
-       int destination_port;
-       str contact_ip;
-       int contact_port;
-       str originator_ip;
-       int originator_port;
-       int proto;
-       int family;
-       str rtp_stat;
-       int type;
-        long long tmstamp;
-       str node;       
-       str msg;        
-#ifdef STATISTICS
-       stat_var *stat;
-#endif
-};
 
 #define ETHHDR 14 /* sizeof of ethhdr structure */
 
@@ -149,6 +109,8 @@ static struct mi_root* sip_capture_mi(struct mi_root* cmd, void* param );
 
 static str db_url              = str_init(DEFAULT_RODB_URL);
 static str table_name          = str_init("sip_capture");
+static str hash_source         = str_init("call_id");
+static str mt_mode                     = str_init("rand");
 static str id_column           = str_init("id");
 static str date_column         = str_init("date");
 static str micro_ts_column     = str_init("micro_ts");
@@ -226,6 +188,22 @@ static struct sock_filter BPF_code[] = { { 0x28, 0, 0, 0x0000000c }, { 0x15, 0,
 db1_con_t *db_con = NULL;              /*!< database connection */
 db_func_t db_funcs;                    /*!< Database functions */
 
+str* table_names = NULL;
+unsigned int no_tables = 0;
+
+/*multiple table mode*/
+enum e_mt_mode{
+       mode_random = 1,
+       mode_hash,
+       mode_round_robin,
+       mode_error
+};
+
+enum e_mt_mode mtmode = mode_random ;
+enum hash_source source = hs_error;
+
+unsigned int rr_idx = 0;
+
 struct hep_timehdr* heptime;
 
 
@@ -244,6 +222,8 @@ static cmd_export_t cmds[] = {
 static param_export_t params[] = {
        {"db_url",                      STR_PARAM, &db_url.s            },
        {"table_name",                  STR_PARAM, &table_name.s        },
+       {"hash_source",                         STR_PARAM, &hash_source.s       },
+       {"mt_mode",                                     STR_PARAM, &mt_mode.s   },
        {"id_column",                   STR_PARAM, &id_column.s         },
        {"date_column",                 STR_PARAM, &date_column.s       },      
        {"micro_ts_column",             STR_PARAM, &micro_ts_column.s   },
@@ -337,6 +317,70 @@ struct module_exports exports = {
 };
 
 
+static int mt_init(void) {
+
+       char *p = NULL;
+       int i = 0;
+
+       /*parse and save table names*/
+       no_tables = 1;
+       p = table_name.s;
+
+       while (*p)
+       {
+               if (*p== '|')
+               {
+                       no_tables++;
+               }
+               p++;
+       }
+
+       table_names = (str*)pkg_malloc(sizeof(str) * no_tables);
+       if(table_names == NULL) {
+               LM_ERR("no more pkg memory left\n");
+               return -1;
+       }
+       p = strtok (table_name.s,"| \t");
+       while (p != NULL)
+       {
+               LM_INFO ("INFO: table name:%s\n",p);
+               table_names[i].s =  p;
+               table_names[i].len = strlen (p);
+               i++;
+               p = strtok (NULL, "| \t");
+       }
+
+       if (strcmp (mt_mode.s, "rand") ==0)
+       {
+               mtmode = mode_random;
+       }
+       else if (strcmp (mt_mode.s, "round_robin") ==0)
+       {
+               mtmode = mode_round_robin;
+       }
+       else if (strcmp (mt_mode.s, "hash") == 0)
+       {
+               mtmode = mode_hash;
+       }
+       else {
+               LM_ERR("ERROR: sipcapture: mod_init: multiple tables mode unrecognized\n");
+               return -1;
+               
+       }
+
+
+       if ( mtmode == mode_hash && (source = get_hash_source (hash_source.s) ) == hs_error)
+       {
+               LM_ERR("ERROR: sipcapture: mod_init: hash source unrecognized\n");
+               return -1;
+       }
+
+       srand(time(NULL));
+
+       return 0;
+
+}
+
 /*! \brief Initialize sipcapture module */
 static int mod_init(void) {
 
@@ -359,6 +403,8 @@ static int mod_init(void) {
 
        db_url.len = strlen(db_url.s);
        table_name.len = strlen(table_name.s);
+       hash_source.len = strlen (hash_source.s);
+       mt_mode.len = strlen(mt_mode.s);
        date_column.len = strlen(date_column.s);
        id_column.len = strlen(id_column.s);
        micro_ts_column.len = strlen(micro_ts_column.s);
@@ -421,6 +467,11 @@ static int mod_init(void) {
                return -1;
        }
 
+       if (mt_init () <0)
+       {
+               return -1;
+       }
+
 
        if(db_insert_mode) {
                 LM_INFO("INFO: sipcapture: mod_init: you have enabled INSERT DELAYED \
@@ -561,6 +612,10 @@ static int child_init(int rank)
                 return -1;
         }
 
+    if (mtmode ==mode_round_robin && rank > 0)
+    {
+               rr_idx = rank % no_tables;
+    }
 
        return 0;
 }
@@ -611,6 +666,9 @@ static void destroy(void)
                 }                              
                close(raw_sock_desc);
        }
+       if (table_names){
+               pkg_free(table_names);
+       }
 }
 
 
@@ -793,6 +851,7 @@ static int sip_capture_store(struct _sipcapture_object *sco)
        db_val_t db_vals[NR_KEYS];
 
        str tmp;
+       int ii = 0;
 
        if(sco==NULL)
        {
@@ -996,9 +1055,25 @@ static int sip_capture_store(struct _sipcapture_object *sco)
 
        db_vals[36].val.blob_val = tmp;
 
-       LM_DBG("homer table: [%.*s]\n", table_name.len, table_name.s);          
-                
-       db_funcs.use_table(db_con, &table_name);
+       if (no_tables > 0 ){
+               if ( mtmode == mode_hash ){
+                       ii = hash_func ( sco, source , no_tables);
+                       LM_DBG ("hash idx is:%d\n", ii);
+               }
+               else if (mtmode == mode_random )
+               {
+                       ii = rand() % no_tables;
+                       LM_DBG("rand idx is:%d\n", ii);
+               }
+               else if (mtmode == mode_round_robin)
+               {
+                       ii = rr_idx;
+                       rr_idx = (rr_idx +1) % no_tables;
+                       LM_DBG("round robin idx is:%d\n", ii);
+               }
+       }
+       LM_DBG("insert into homer table: [%.*s]\n", table_names[ii].len, table_names[ii].s);
+       db_funcs.use_table(db_con, &table_names[ii]);
 
        LM_DBG("storing info...\n");
        
index 4aaa221..aef023d 100644 (file)
@@ -29,6 +29,47 @@ typedef uint16_t u_int16_t;
 #define IPPROTO_IPIP IPPROTO_ENCAP /* Solaris IPIP protocol has name ENCAP */
 #endif
 
+struct _sipcapture_object {
+       str method;
+       str reply_reason;
+       str ruri;
+       str ruri_user;
+       str from_user;
+       str from_tag;
+       str to_user;
+       str to_tag;
+       str pid_user;
+       str contact_user;
+       str auth_user;
+       str callid;
+       str callid_aleg;
+       str via_1;
+       str via_1_branch;
+       str cseq;
+       str diversion;
+       str reason;
+       str content_type;
+       str authorization;
+       str user_agent;
+       str source_ip;
+       int source_port;
+       str destination_ip;
+       int destination_port;
+       str contact_ip;
+       int contact_port;
+       str originator_ip;
+       int originator_port;
+       int proto;
+       int family;
+       str rtp_stat;
+       int type;
+        long long tmstamp;
+       str node;       
+       str msg;        
+#ifdef STATISTICS
+       stat_var *stat;
+#endif
+};
 
 struct hep_hdr{
     u_int8_t hp_v;            /* version */