modules/sipcapture HEPv3 implementation. ALPHA!!! The final version will be after... adubovikov/hepv3
authorAlexandr Dubovikov <alexandr.dubovikov@gmail.com>
Fri, 3 Aug 2012 21:21:59 +0000 (23:21 +0200)
committerAlexandr Dubovikov <alexandr.dubovikov@gmail.com>
Fri, 3 Aug 2012 21:21:59 +0000 (23:21 +0200)
modules/sipcapture/sipcapture.c
modules/sipcapture/sipcapture.h

index 818c342..a6a560a 100644 (file)
@@ -92,6 +92,9 @@ MODULE_VERSION
 
 #define NR_KEYS 37
 
+#define HEP_BUF_SIZE  65535
+
+
 /* module function prototypes */
 static int mod_init(void);
 static int child_init(int rank);
@@ -104,6 +107,16 @@ int raw_capture_socket(struct ip_addr* ip, str* iface, int port_start, int port_
 int raw_capture_rcv_loop(int rsock, int port1, int port2, int ipip);
 
 
+/* HEPv2 HEPv3 */
+int hepv2_received(char *buf, unsigned int *len, struct receive_info *ri);
+int hepv3_received(char *buf, unsigned int *len, struct receive_info *ri);
+int parsing_hepv3_message(char *buf, unsigned int *len);
+
+int init_tcp_listen_children();
+int tcp_capture_rcv_loop(int tsock);
+int get_tcp_stream(int tcp_sock_desc);
+int tcp_capture_socket();
+
 
 static struct mi_root* sip_capture_mi(struct mi_root* cmd, void* param );
 
@@ -165,6 +178,12 @@ int db_insert_mode = 0;
 int promisc_on = 0;
 int bpf_on = 0;
 int hep_offset = 0; //this stores the hep header added offset 
+int tcp_hep_port = 9060;
+int tcp_hep_capture_on   = 0;
+int tcp_hep_socket = -1; /* tcp hep socket */
+int fork_on_accept = 0; /* fork on new accept */
+unsigned int hep_tcp_sock_children = 1;
+
 
 str raw_socket_listen = { 0, 0 };
 str raw_interface = { 0, 0 };
@@ -273,6 +292,10 @@ static param_export_t params[] = {
        {"raw_interface",               STR_PARAM, &raw_interface.s   },
         {"promiscious_on",             INT_PARAM, &promisc_on   },             
         {"raw_moni_bpf_on",            INT_PARAM, &bpf_on   },         
+        {"hep_tcp_sock_children",       INT_PARAM, &hep_tcp_sock_children   },
+        {"hep_tcp_capture_on",          INT_PARAM, &tcp_hep_capture_on   },
+        {"fork_on_accept",              INT_PARAM, &fork_on_accept   },
+        {"tcp_hep_port",               INT_PARAM, &tcp_hep_port   },
        {0, 0, 0}
 };
 
@@ -557,11 +580,24 @@ static int mod_init(void) {
                         
                }               
        }
+       
+       if(tcp_hep_capture_on ) {
+
+                register_procs(hep_tcp_sock_children);
+
+                if(!tcp_capture_socket()) {
+                                LM_ERR("coudn't initialize tcp HEP capture socket");
+                                goto error;
+
+                }
+        }
+
 
        return 0;
 #ifdef __OS_linux                                               
 error:
        if(raw_sock_desc) close(raw_sock_desc);
+       if(tcp_hep_socket) close(tcp_hep_socket);
        return -1;      
 #endif
 }
@@ -592,10 +628,15 @@ int extract_host_port(void)
 
 static int child_init(int rank)
 {
-       if (rank == PROC_MAIN && (ipip_capture_on || moni_capture_on)) {
-                if (init_rawsock_children() < 0) return -1;
+       if (rank == PROC_MAIN) {
+                if((ipip_capture_on || moni_capture_on) && (init_rawsock_children() < 0))
+                                                 return -1;
+
+                if(tcp_hep_capture_on && (init_tcp_listen_children() < 0))
+                                                return -1;
         }
 
+
        if (rank==PROC_INIT || rank==PROC_MAIN || rank==PROC_TCP_MAIN)
                return 0; /* do nothing for the main process */
 
@@ -678,11 +719,47 @@ static void destroy(void)
 int hep_msg_received(void *data)
 {
 
-       void **srevp;
-       char *buf;
-       unsigned *len;
-       struct receive_info *ri;
-       
+        void **srevp;
+        char *buf;
+        unsigned *len;
+        struct receive_info *ri;
+
+        struct hep_hdr *heph;
+
+        srevp = (void**)data;
+
+        buf = (char *)srevp[0];
+        len = (unsigned *)srevp[1];
+        ri = (struct receive_info *)srevp[2];
+
+        /* hep_hdr */
+        heph = (struct hep_hdr*) buf;
+
+        /* Check version */
+        if(heph->hp_v == 1 || heph->hp_v == 2)  {
+
+                return hepv2_received(buf, len, ri);
+        }
+        else if(!memcmp(buf, "\x48\x45\x50\x33",4)) {
+
+                //LOG(L_ERR, "DATA: HEPv3\r\n");
+                //LM_ERR("ZZ: [%c%c%c%c]\n", buf[0], buf[1], buf[2], buf[3]);
+                return hepv3_received(buf, len, ri);
+        }
+        else {
+
+                LOG(L_ERR, "ERROR: sipcapture:hep_msg_received: not supported version or bad length: v:[%d] l:[%d]\n",
+                                                heph->hp_v, heph->hp_l);
+                return -1;
+        }
+}
+
+
+/**
+ * HEP v1 && v2 message
+ */
+int hepv2_received(char *buf, unsigned int *len, struct receive_info *ri)
+{
        int hl;
         struct hep_hdr *heph;
         struct ip_addr dst_ip, src_ip;
@@ -696,20 +773,8 @@ int hep_msg_received(void *data)
         struct hep_ip6hdr *hepip6h = NULL;
 #endif /* USE_IPV6 */
 
-       if(!hep_capture_on) {
-               LOG(L_ERR, "ERROR: sipcapture:hep_msg_received HEP is not enabled\n");
-                return -1;     
-       }       
-       
        hep_offset = 0; 
        
-       srevp = (void**)data;
-               
-       buf = (char *)srevp[0];
-       len = (unsigned *)srevp[1];
-       ri = (struct receive_info *)srevp[2];
-
-
        hl = hep_offset = sizeof(struct hep_hdr);
         end = buf + *len;
         if (unlikely(*len<hep_offset)) {
@@ -732,13 +797,6 @@ int hep_msg_received(void *data)
                default:
                         LOG(L_ERR, "ERROR: sipcapture:hep_msg_received:  unsupported family [%d]\n", heph->hp_f);
                         return -1;
-                }
-
-       /* Check version */
-        if((heph->hp_v != 1 && heph->hp_v != 2) || hl != heph->hp_l) {
-               LOG(L_ERR, "ERROR: sipcapture:hep_msg_received: not supported version or bad length: v:[%d] l:[%d] vs [%d]\n",
-                                                heph->hp_v, heph->hp_l, hl);
-                return -1;
        }
 
         /* PROTO */
@@ -773,7 +831,7 @@ int hep_msg_received(void *data)
                         break;
 #endif /* USE_IPV6 */
 
-         }
+       }
 
        /* VOIP payload */
         hep_payload = buf + hep_offset;
@@ -794,7 +852,6 @@ int hep_msg_received(void *data)
         }
 
 
-
        /* fill ip from the packet to dst_ip && to */
         switch(heph->hp_f){
 
@@ -834,6 +891,21 @@ int hep_msg_received(void *data)
 }
 
 
+/**
+ * HEP message
+ */
+int hepv3_received(char *buf, unsigned int *len, struct receive_info *ri)
+{
+       if(!parsing_hepv3_message(buf, len)) {
+               LM_ERR("couldnot parse hepv3 message\n");
+               return -1;
+        }
+
+       return -1;
+}
+
+
+
 static int sip_capture_prepare(sip_msg_t *msg)
 {
        /* We need parse all headers */
@@ -1708,3 +1780,422 @@ int raw_capture_rcv_loop(int rsock, int port1, int port2, int ipip) {
 }
 
 
+/* Local raw socket */
+int tcp_capture_socket()
+{
+       struct sockaddr_in local;
+
+       // open TCP socket
+       LM_DBG("Starting TCP server on port %d - ", tcp_hep_port);
+
+       if ((tcp_hep_socket=socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+               LM_ERR("tcp socket error");
+               exit(errno);
+       }
+
+       local.sin_family=AF_INET;
+       // listen on port <server_port>
+       local.sin_port=htons(tcp_hep_port);
+       // listen on any ip interface
+       local.sin_addr.s_addr=htonl(INADDR_ANY);
+       memset(&local.sin_zero, 0, sizeof(local.sin_zero));
+
+       // bind and listen on port and ip interface
+       if (bind(tcp_hep_socket, (struct sockaddr*)&local, sizeof(local)) < 0) {
+               LM_ERR("tcp bind error");
+               return -1;
+       }
+       if (listen(tcp_hep_socket, 15) < 0) {
+               LM_ERR("tcp listen error");
+               return -1;
+       }
+
+       LM_DBG("TCP HEPV3 success\n");
+
+       return 1;
+}
+
+/* Local raw socket */
+int init_tcp_listen_children()
+{
+       int fdd;
+       struct sockaddr_in remote;
+       int rlen, i;
+        pid_t pid;
+
+        for(i = 0; i < hep_tcp_sock_children; i++) {
+                pid = fork_process(PROC_UNIXSOCK,"homer tcp socket", 1);
+                if (pid < 0) {
+                        ERR("Unable to fork: %s\n", strerror(errno));
+                        return -1;
+                } else if (pid == 0) { /* child */
+                       while(1) {
+                               rlen=sizeof(remote);
+                                // accept connection
+                               if ((fdd=accept(tcp_hep_socket, (struct sockaddr*)&remote, (socklen_t *)&rlen)) < 0) {
+                                       LM_ERR("accept error");
+                                       return -1;
+                                }              
+
+                               get_tcp_stream(fdd);            
+                       }
+                }
+                /* Parent */
+        }
+        
+        return 1;
+}
+
+/*
+ *  TCP listeners
+ */
+int get_tcp_stream(int tcp_sock_desc)
+{
+
+        if(fork_on_accept) {
+       
+               // create child to be able to get multiple connections
+               switch(fork()) {
+                       case -1: 
+                               LM_ERR("Unable to fork: %s\n", strerror(errno));
+                               return -1;                      
+                       case 0:
+                               tcp_capture_rcv_loop(tcp_sock_desc);
+                               exit(0);
+                       default:
+                               // parent process: prepare for new connections
+                               close(tcp_sock_desc);
+                               break;          
+               }
+        }
+        else {
+                
+              tcp_capture_rcv_loop(tcp_sock_desc);
+              
+        }
+        
+        LM_DBG("Raw TCP socket server successfully initialized\n");
+        return 1;
+}
+
+
+/* Local raw receive loop */
+int tcp_capture_rcv_loop(int tsock) {
+
+       char *msgtmp = NULL;
+       static char recvtmp[3000];
+       int blen;
+        struct hep_ctrl *hep_header;
+        int left_length = 0;        
+        unsigned int total_length = 0;
+
+        while ((blen=read(tsock, recvtmp, 6))>0) {
+        
+                hep_header = NULL;
+
+               memset(&hep_header, 0, sizeof(struct hep_ctrl));
+               
+                if(!memcmp(recvtmp, "\x48\x45\x50\x33", 4)) {
+                               
+                        hep_header = (struct hep_ctrl*) recvtmp;
+                        total_length = ntohs(hep_header->length);
+                        left_length = total_length - blen;
+                              
+                        //LM_ERR("RD: TL:[%d] LL:[%d] NT: [%d]\n", total_length, left_length, hep_header->length);
+
+                        msgtmp =  pkg_malloc(total_length+1);
+                       if(msgtmp==NULL) {
+                               LM_ERR("msgtmp: no more pkg memory left\n");
+                               return -1;
+                        }
+
+                       memset(msgtmp, 0, (total_length+1));
+                                
+                       /* Copy to our buffer*/
+                       memcpy(msgtmp, recvtmp, blen);                          
+
+                        while ((blen=read(tsock, recvtmp, left_length))>0) {                              
+                                                                       
+                               memcpy(msgtmp+(total_length-left_length), recvtmp, blen);       
+                               left_length-=blen;                                                              
+                               if(left_length <= 0) break;                                                           
+                        }
+
+                        
+                        if(!parsing_hepv3_message(msgtmp, &total_length)) {
+                                goto error;
+                                                        
+                        }                                                
+
+                       if(msgtmp) pkg_free(msgtmp);                              
+                }
+        }
+
+       return 0;
+error:
+        if(msgtmp) pkg_free(msgtmp);
+        if(tsock) close(tsock);
+        return -1;                     
+}
+
+
+int parsing_hepv3_message(char *buf, unsigned int *blen) {
+
+       union sockaddr_union from;
+       union sockaddr_union to;
+        struct receive_info ri;
+       char *tmp;
+       struct ip_addr dst_ip, src_ip;
+       struct socket_info* si = 0;
+       int tmp_len, i;
+       char *payload = NULL;
+        struct hep_chunk *chunk;       
+        struct hep_generic_recv *hg;
+        int totelem = 0;
+        int chunk_vendor=0, chunk_type=0, chunk_length=0;
+        int total_length = 0;
+
+
+       hg = (struct hep_generic_recv*)pkg_malloc(sizeof(struct hep_generic_recv));
+       if(hg==NULL) {
+               LM_ERR("no more pkg memory left for hg\n");
+               return -1;
+        }
+                                                                       
+       memset(hg, 0, sizeof(struct hep_generic_recv));
+       
+               
+       /* HEADER */
+       hg->header  = (hep_ctrl_t *) (buf);
+
+       /*Packet size */
+       total_length = ntohs(hg->header->length);
+
+       ri.src_port = 0;
+       ri.dst_port = 0;
+       dst_ip.af = 0;
+        src_ip.af = 0;
+                               
+       payload = NULL;
+
+       i = sizeof(hep_ctrl_t);         
+               
+       while(i < total_length) {
+                
+               /*OUR TMP DATA */                                  
+                tmp = buf+i;
+
+                chunk = (struct hep_chunk*) tmp;
+                             
+                chunk_vendor = ntohs(chunk->vendor_id);                             
+                chunk_type = ntohs(chunk->type_id);
+                chunk_length = ntohs(chunk->length);
+                       
+
+
+                /* if chunk_length */
+                if(chunk_length == 0) {
+                        /* BAD LEN we drop this packet */
+                        goto error;
+                }
+
+                /* SKIP not general Chunks */
+                if(chunk_vendor != 0) {
+                        i+=chunk_length;
+                }
+                else {                                                                                                                               
+                        switch(chunk_type) {
+                                     
+                                case 0:
+                                        goto error;
+                                        break;
+                                     
+                                case 1:                                                                          
+                                        hg->ip_family  = (hep_chunk_uint8_t *) (tmp);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;
+                                case 2:
+                                        hg->ip_proto  = (hep_chunk_uint8_t *) (tmp);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;                                                     
+                                case 3:
+                                        hg->hep_src_ip4  = (hep_chunk_ip4_t *) (tmp);
+                                        i+=chunk_length;
+                                        src_ip.af=AF_INET;
+                                       src_ip.len=4;
+                                       src_ip.u.addr32[0] = hg->hep_src_ip4->data.s_addr;
+                                       totelem++;
+                                       break;
+                                case 4:
+                                        hg->hep_dst_ip4  = (hep_chunk_ip4_t *) (tmp);
+                                        i+=chunk_length;                                                     
+                                       dst_ip.af=AF_INET;
+                                       dst_ip.len=4;
+                                       dst_ip.u.addr32[0] = hg->hep_dst_ip4->data.s_addr;
+                                        totelem++;
+
+                                        break;
+#ifdef USE_IPV6                                                     
+                                case 5:
+                                        hg->hep_src_ip6  = (hep_chunk_ip6_t *) (tmp);
+                                        i+=chunk_length;
+                                        src_ip.af=AF_INET6;
+                                       src_ip.len=16;
+                                       memcpy(src_ip.u.addr, &hg->hep_src_ip6->data, 16);
+                                       totelem++;
+                                        break;
+                                case 6:
+                                        hg->hep_dst_ip6  = (hep_chunk_ip6_t *) (tmp);
+                                        i+=chunk_length;                                                     
+                                        dst_ip.af=AF_INET6;
+                                       dst_ip.len=16;
+                                       memcpy(dst_ip.u.addr, &hg->hep_dst_ip6->data, 16);
+                                       totelem++;
+                                        break;
+#endif                                             
+        
+                                case 7:
+                                        hg->src_port  = (hep_chunk_uint16_t *) (tmp);
+                                        ri.src_port = ntohs(hg->src_port->data);
+                                        i+=chunk_length;                      
+                                        totelem++;
+                                        break;
+
+                                case 8:
+                                        hg->dst_port  = (hep_chunk_uint16_t *) (tmp);
+                                        ri.dst_port = ntohs(hg->dst_port->data);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;
+                                case 9:
+                                        hg->time_sec  = (hep_chunk_uint32_t *) (tmp);
+                                        hg->time_sec->data = ntohl(hg->time_sec->data);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;                                                     
+                                                     
+                                case 10:
+                                        hg->time_usec  = (hep_chunk_uint32_t *) (tmp);
+                                        hg->time_usec->data = ntohl(hg->time_usec->data);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;      
+
+                                case 11:
+                                        hg->proto_t  = (hep_chunk_uint8_t *) (tmp);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;                                                                                                                                                         
+
+                                case 12:
+                                        hg->capt_id  = (hep_chunk_uint32_t *) (tmp);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;
+
+                                case 13:
+                                        hg->keep_tm  = (hep_chunk_uint16_t *) (tmp);
+                                        i+=chunk_length;
+                                        break;                                                     
+
+                                case 14:
+                                        hg->auth_key  = (hep_chunk_str_t *) (tmp);
+                                        i+=chunk_length;                                                                             
+                                        break;
+                                                     
+                                case 15:
+                                        hg->payload_chunk  = (hep_chunk_t *) (tmp);
+                                        payload = (char *) tmp+sizeof(hep_chunk_t);
+                                        i+=chunk_length;
+                                        totelem++;
+                                        break;
+                                                     
+                                default:
+                                        i+=chunk_length;
+                                        break;
+                        }                                        
+                }
+        }                                                                                                                
+                        
+        /* CHECK how much elements */
+        if(totelem < 9) {                        
+                LM_ERR("Not all elements [%d]\n", totelem);                        
+                goto done;
+        }                 
+
+        if ( dst_ip.af == 0 || src_ip.af == 0)  {
+                LM_ERR("NO IP's set\n");
+                goto done;
+        }
+
+                        
+        ip_addr2su(&to, &dst_ip, ri.dst_port);
+        ip_addr2su(&from, &src_ip, ri.src_port);
+                        
+        ri.src_su=from;
+        su2ip_addr(&ri.src_ip, &from);
+        su2ip_addr(&ri.dst_ip, &to);
+
+       if(hg->ip_proto->data == IPPROTO_TCP) ri.proto=PROTO_TCP;
+       else if(hg->ip_proto->data == IPPROTO_UDP) ri.proto=PROTO_UDP;
+
+       /* a little bit memory */
+        si=(struct socket_info*) pkg_malloc(sizeof(struct socket_info));
+        if (si==0) {
+                LOG(L_ERR, "ERROR: new_sock_info: memory allocation error\n");
+                goto error;
+        }
+
+       memset(si, 0, sizeof(struct socket_info));
+        si->address = ri.dst_ip;
+        si->socket=-1;
+
+        /* set port & proto */
+        si->port_no = ri.dst_port;
+
+        if(hg->ip_proto->data == IPPROTO_TCP) si->proto=PROTO_TCP;
+        else if(hg->ip_proto->data == IPPROTO_UDP) si->proto=PROTO_UDP;
+
+        si->flags=0;
+        si->addr_info_lst=0;
+
+        si->address_str.s = ip_addr2a(&si->address);;
+        si->address_str.len = strlen(si->address_str.s);                                                
+
+        si->port_no_str.s = int2str(si->port_no, &tmp_len);
+        si->port_no_str.len = tmp_len;
+       si->address_str.len = strlen(si->address_str.s);
+
+        si->name.len = si->address_str.len;
+        si->name.s = si->address_str.s;
+        ri.bind_address=si;
+
+
+       /*TIME*/ 
+        heptime->tv_sec = hg->time_sec->data;
+        heptime->tv_usec = hg->time_usec->data;
+        heptime->captid = ntohs(hg->capt_id->data);
+          
+        if(payload != NULL ) {
+                /* and now recieve message */
+                receive_msg(payload, ntohs(hg->payload_chunk->length), &ri);
+        }
+        
+done:
+        if(si) pkg_free(si);
+        if(hg) pkg_free(hg);                     
+
+        return 1;
+        
+error:
+
+        if(si) pkg_free(si);
+        if(hg) pkg_free(hg);
+                
+        return -1;           
+        
+}
+
index aef023d..a7cc35a 100644 (file)
@@ -97,3 +97,98 @@ struct hep_ip6hdr {
         struct in6_addr hp6_dst;        /* destination address */
 };
 #endif
+
+/* HEPv3 types */
+
+struct hep_chunk {
+       u_int16_t vendor_id;
+       u_int16_t type_id;
+       u_int16_t length;
+} __attribute__((packed));
+
+typedef struct hep_chunk hep_chunk_t;
+
+struct hep_chunk_uint8 {
+       hep_chunk_t chunk;
+       u_int8_t data;
+} __attribute__((packed));
+
+typedef struct hep_chunk_uint8 hep_chunk_uint8_t;
+
+struct hep_chunk_uint16 {
+       hep_chunk_t chunk;
+       u_int16_t data;
+} __attribute__((packed));
+
+typedef struct hep_chunk_uint16 hep_chunk_uint16_t;
+
+struct hep_chunk_uint32 {
+       hep_chunk_t chunk;
+       u_int32_t data;
+} __attribute__((packed));
+
+typedef struct hep_chunk_uint32 hep_chunk_uint32_t;
+
+struct hep_chunk_str {
+       hep_chunk_t chunk;
+       char *data;
+} __attribute__((packed));
+
+typedef struct hep_chunk_str hep_chunk_str_t;
+
+struct hep_chunk_ip4 {
+       hep_chunk_t chunk;
+       struct in_addr data;
+} __attribute__((packed));
+
+typedef struct hep_chunk_ip4 hep_chunk_ip4_t;
+
+#ifdef USE_IPV6
+struct hep_chunk_ip6 {
+       hep_chunk_t chunk;
+       struct in6_addr data;
+} __attribute__((packed));
+#endif
+
+typedef struct hep_chunk_ip6 hep_chunk_ip6_t;
+
+struct hep_chunk_payload {
+    hep_chunk_t chunk;
+    char *data;
+} __attribute__((packed));
+
+typedef struct hep_chunk_payload hep_chunk_payload_t;
+
+
+struct hep_ctrl {
+    char id[4];
+    u_int16_t length;
+} __attribute__((packed));
+
+typedef struct hep_ctrl hep_ctrl_t;
+
+
+/* Structure of HEP */
+
+struct hep_generic_recv {
+        hep_ctrl_t         *header;
+        hep_chunk_uint8_t  *ip_family;
+        hep_chunk_uint8_t  *ip_proto;
+        hep_chunk_uint16_t *src_port;
+        hep_chunk_uint16_t *dst_port;
+        hep_chunk_uint32_t *time_sec;
+        hep_chunk_uint32_t *time_usec;
+        hep_chunk_ip4_t    *hep_src_ip4;
+        hep_chunk_ip4_t            *hep_dst_ip4;
+#ifdef USE_IPV6
+        hep_chunk_ip6_t    *hep_src_ip6;
+        hep_chunk_ip6_t    *hep_dst_ip6;
+#endif                
+        hep_chunk_uint8_t  *proto_t;
+        hep_chunk_uint32_t *capt_id;
+        hep_chunk_uint16_t *keep_tm;
+        hep_chunk_str_t    *auth_key;        
+        hep_chunk_t   *payload_chunk;
+} __attribute__((packed));
+
+typedef struct hep_generic_recv hep_generic_recv_t;