Merge pull request #3 from AndreyRybkin/master
authorDaniel-Constantin Mierla <miconda@gmail.com>
Sat, 3 Jan 2015 20:14:24 +0000 (21:14 +0100)
committerDaniel-Constantin Mierla <miconda@gmail.com>
Sat, 3 Jan 2015 20:14:24 +0000 (21:14 +0100)
usrloc_dmq: add module for usrloc sync via dmq

modules/usrloc_dmq/Makefile [new file with mode: 0644]
modules/usrloc_dmq/doc/Makefile [new file with mode: 0644]
modules/usrloc_dmq/doc/usrloc_dmq.xml [new file with mode: 0644]
modules/usrloc_dmq/doc/usrloc_dmq_admin.xml [new file with mode: 0644]
modules/usrloc_dmq/usrloc_dmq.c [new file with mode: 0644]
modules/usrloc_dmq/usrloc_sync.c [new file with mode: 0644]
modules/usrloc_dmq/usrloc_sync.h [new file with mode: 0644]

diff --git a/modules/usrloc_dmq/Makefile b/modules/usrloc_dmq/Makefile
new file mode 100644 (file)
index 0000000..443fe44
--- /dev/null
@@ -0,0 +1,9 @@
+include ../../Makefile.defs
+auto_gen=
+NAME=usrloc_dmq.so
+
+DEFS+=-DKAMAILIO_MOD_INTERFACE
+
+SERLIBPATH=../../lib
+SER_LIBS+=$(SERLIBPATH)/srutils/srutils
+include ../../Makefile.modules
diff --git a/modules/usrloc_dmq/doc/Makefile b/modules/usrloc_dmq/doc/Makefile
new file mode 100644 (file)
index 0000000..db2e055
--- /dev/null
@@ -0,0 +1,4 @@
+docs = usrloc_dmq.xml
+
+docbook_dir = ../../../docbook
+include $(docbook_dir)/Makefile.module
diff --git a/modules/usrloc_dmq/doc/usrloc_dmq.xml b/modules/usrloc_dmq/doc/usrloc_dmq.xml
new file mode 100644 (file)
index 0000000..565cc8a
--- /dev/null
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+
+<book xmlns:xi="http://www.w3.org/2001/XInclude">
+    <bookinfo>
+       <title>usrloc_dmq Module</title>
+       <productname class="trade">&kamailioname;</productname>
+       <authorgroup>
+           <author>
+               <firstname>Andrey</firstname>
+               <surname>Rybkin</surname>
+               <affiliation><orgname>bks.tv</orgname></affiliation>
+               <email>rybkin.a@bks.tv</email>
+               <address>
+               <otheraddr>
+               <ulink></ulink>
+               </otheraddr>
+               </address>
+           </author>
+           <editor>
+               <firstname>Andrey</firstname>
+               <surname>Rybkin</surname>
+               <affiliation><orgname>bks.tv</orgname></affiliation>
+               <email>rybkin.a@bks.tv</email>
+               <address>
+               <otheraddr>
+               <ulink></ulink>
+               </otheraddr>
+               </address>
+           </editor>
+       </authorgroup>
+       <copyright>
+           <year>2014</year>
+       </copyright>
+    </bookinfo>
+    <toc></toc>
+    
+    <xi:include href="usrloc_dmq_admin.xml"/>
+    
+    
+</book>
diff --git a/modules/usrloc_dmq/doc/usrloc_dmq_admin.xml b/modules/usrloc_dmq/doc/usrloc_dmq_admin.xml
new file mode 100644 (file)
index 0000000..29564b5
--- /dev/null
@@ -0,0 +1,74 @@
+<?xml version="1.0" encoding='ISO-8859-1'?>
+<!DOCTYPE book PUBLIC "-//OASIS//DTD DocBook XML V4.4//EN"
+"http://www.oasis-open.org/docbook/xml/4.4/docbookx.dtd" [
+
+<!-- Include general documentation entities -->
+<!ENTITY % docentities SYSTEM "../../../docbook/entities.xml">
+%docentities;
+
+]>
+<!-- Module User's Guide -->
+
+<chapter>
+       
+       <title>&adminguide;</title>
+       
+       <section>
+       <title>Overview</title>
+       <para>
+               The module add usrloc contacts replication between multiple servers via DMQ module.
+       </para>
+       </section>
+       <section>
+       <title>Dependencies</title>
+       <section>
+               <title>&kamailio; Modules</title>
+               <para>
+               The following modules must be loaded before this module:
+                       <itemizedlist>
+                       <listitem>
+                       <para>
+                               <emphasis>DMQ module must be loaded first.</emphasis>.
+                               <emphasis>USRLOC module must be loaded first.</emphasis>.
+                       </para>
+                       </listitem>
+                       </itemizedlist>
+               </para>
+       </section>
+       <section>
+       <title>Parameters</title>
+       <section id="usrloc_dmq.p.enable">
+               <title><varname>enable</varname> (int)</title>
+               <para>
+               USRLOC replication
+                       0 - disabled
+                       1 - enabled
+               </para>
+               <emphasis>
+                       Default value is 0.
+               </emphasis>
+               </para>
+       </section>
+       <section id="usrloc_dmq.p.flag">
+               <title><varname>flag</varname> (int)</title>
+               <para>
+                       Flag to be used for marking if a contact should be constructed for the DMQ
+               </para>
+               <para>
+               <emphasis>
+                       Default value is 2.
+               </emphasis>
+               </para>
+               <example>
+               <title>Set <varname>flag</varname> parameter</title>
+               <programlisting format="linespecific">
+...
+modparam("usrloc_dmq", "flag", 2)
+...
+</programlisting>
+               </example>
+       </section>
+       </section>
+
+</chapter>
+
diff --git a/modules/usrloc_dmq/usrloc_dmq.c b/modules/usrloc_dmq/usrloc_dmq.c
new file mode 100644 (file)
index 0000000..8291664
--- /dev/null
@@ -0,0 +1,87 @@
+#include <stdio.h>
+#include "../../sr_module.h"
+#include "../../dprint.h"
+#include "../../error.h"
+#include "../../modules/usrloc/usrloc.h"
+#include "../usrloc/ul_callback.h"
+#include "../../modules/sl/sl.h"
+#include "../../mod_fix.h"
+
+#include "usrloc_sync.h"
+
+static int mod_init(void); 
+static int child_init(int);
+
+int enable_usrloc = 0;
+int usrloc_syncflag = 2;
+
+MODULE_VERSION
+
+static param_export_t params[] = {
+       {"enable", INT_PARAM, &enable_usrloc},
+       {"flag", INT_PARAM, &usrloc_syncflag},
+       {0, 0, 0}
+};
+
+struct module_exports exports = {
+       "usrloc_dmq",                           /* module name */
+       DEFAULT_DLFLAGS,                /* dlopen flags */
+       0,                                              /* exported functions */
+       params,                                 /* exported parameters */
+       0,                                              /* exported statistics */
+       0,                                      /* exported MI functions */
+       0,                                              /* exported pseudo-variables */
+       0,                                              /* extra processes */
+       mod_init,                               /* module initialization function */
+       0,                                      /* response handling function */
+       0,                                              /* destroy function */
+       child_init              /* per-child init function */
+};
+
+
+static int mod_init(void)
+{
+               LM_ERR("dmq_sync loaded: usrloc=%d\n", enable_usrloc);
+               
+               if (enable_usrloc) {
+                       usrloc_dmq_flag = 1 << usrloc_syncflag;
+                       bind_usrloc_t bind_usrloc;
+                       
+                       bind_usrloc = (bind_usrloc_t)find_export("ul_bind_usrloc", 1, 0);
+                       if (!bind_usrloc) {
+                               LM_ERR("can't bind usrloc\n");
+                               return -1;
+                       }
+                       if (bind_usrloc(&ul) < 0) {
+                               LM_ERR("Can't bind ul\n");
+                return -1;
+                       }                       
+                       if(ul.register_ulcb != NULL) {
+                               if(ul.register_ulcb(ULCB_MAX, ul_cb_contact, 0)< 0)
+                               {
+                                       LM_ERR("can not register callback for expired contacts\n");
+                                       return -1;
+                               }
+                       }                                       
+                       if (!usrloc_dmq_initialize()){
+                               LM_DBG("usrloc_dmq initialized\n");
+                       } else {
+                               LM_ERR("Error in usrloc_dmq_initialize()\n");
+                       }
+               }
+               return 0;
+}
+
+static int child_init(int rank)
+{
+
+       if (rank == PROC_MAIN) {
+               LM_ERR("child_init PROC_MAIN\n");
+               return 0;
+       }
+       if(rank == PROC_INIT || rank == PROC_TCP_MAIN) {
+               LM_ERR("child_init PROC_INIT\n");
+               return 0;
+       }
+       return 0;
+}
diff --git a/modules/usrloc_dmq/usrloc_sync.c b/modules/usrloc_dmq/usrloc_sync.c
new file mode 100644 (file)
index 0000000..3f049ff
--- /dev/null
@@ -0,0 +1,541 @@
+#include "usrloc_sync.h"
+#include "../usrloc/usrloc.h"
+#include "../usrloc/ul_callback.h"
+#include "../usrloc/dlist.h"
+#include "../../dprint.h"
+#include "../../parser/parse_from.h"
+#include "../../parser/parse_addr_spec.h"
+
+static str usrloc_dmq_content_type = str_init("application/json");
+static str dmq_200_rpl  = str_init("OK");
+static str dmq_400_rpl  = str_init("Bad Request");
+static str dmq_500_rpl  = str_init("Server Internal Error");
+
+dmq_api_t usrloc_dmqb;
+dmq_peer_t* usrloc_dmq_peer = NULL;
+dmq_resp_cback_t usrloc_dmq_resp_callback = {&usrloc_dmq_resp_callback_f, 0};
+
+int usrloc_dmq_send_all();
+int usrloc_dmq_request_sync();
+int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node);
+usrloc_api_t ul;
+
+#define MAX_AOR_LEN 256
+int extract_aor(str* _uri, str* _a, sip_uri_t *_pu)
+{
+       static char aor_buf[MAX_AOR_LEN];
+       sip_uri_t turi;
+       sip_uri_t *puri;
+       str *uri;
+       
+       memset(aor_buf, 0, MAX_AOR_LEN);
+       uri=_uri;
+
+       if(_pu!=NULL)
+               puri = _pu;
+       else
+               puri = &turi;
+
+       if (parse_uri(uri->s, uri->len, puri) < 0) {
+               LM_ERR("failed to parse AoR [%.*s]\n", uri->len, uri->s);
+               return -1;
+       }
+       
+       if ( (puri->user.len + puri->host.len + 1) > MAX_AOR_LEN) {
+               LM_ERR("Address Of Record too long\n");
+               return -2;
+       }
+
+       _a->s = aor_buf;
+       _a->len = puri->user.len;
+
+       if (un_escape(&puri->user, _a) < 0) {
+               LM_ERR("failed to unescape username\n");
+               return -3;
+       }
+
+       strlower(_a);
+
+       return 0;
+}
+
+int add_contact(str aor, ucontact_info_t* ci)
+{
+       urecord_t* r;
+       udomain_t* _d;  
+       ucontact_t* c;
+       str contact;
+       int res;
+       
+       ul.get_udomain("location", &_d);
+       ul.lock_udomain(_d, &aor);
+       res = ul.get_urecord(_d, &aor, &r);
+       if (res < 0) {
+               LM_ERR("failed to retrieve record from usrloc\n");
+               goto error;
+       } else if ( res == 0) {
+               LM_DBG("'%.*s' found in usrloc\n", aor.len, ZSW(aor.s));
+               res = ul.get_ucontact_by_instance(r, &aor, ci, &c);
+               LM_DBG("get_ucontact_by_instance = %d\n", res);
+               if (res==-1) {
+                       LM_ERR("Invalid cseq\n");
+                       goto error;
+               } else if (res > 0 ) {
+                       LM_DBG("Not found contact\n");
+                       ul.insert_ucontact(r, &contact, ci, &c);
+               } else if (res == 0) {
+                       LM_DBG("Found contact\n");
+                       ul.update_ucontact(r, c, ci);
+               }
+       } else {
+               LM_DBG("'%.*s' Not found in usrloc\n", aor.len, ZSW(aor.s));
+               ul.insert_urecord(_d, &aor, &r);
+               LM_DBG("Insert record\n");
+               contact.s = ci->c->s;
+               contact.len = ci->c->len;
+               ul.insert_ucontact(r, &contact, ci, &c);
+               LM_DBG("Insert ucontact\n");
+       }       
+       
+               LM_DBG("Release record\n");
+               ul.release_urecord(r);
+               LM_DBG("Unlock udomain\n");
+               ul.unlock_udomain(_d, &aor);
+               return 0;       
+       error:
+               ul.unlock_udomain(_d, &aor);
+               return -1;
+}
+
+void usrloc_get_all_ucontact(dmq_node_t* node)
+{
+       int rval, len=0;
+       void *buf, *cp;
+       str c;
+       str path;
+       str ruid;
+       unsigned int aorhash;
+       struct socket_info* send_sock;
+       unsigned int flags;
+       
+       len = 0;
+       buf = NULL;
+
+    if (ul.get_all_ucontacts == NULL){
+        LM_ERR("ul.get_all_ucontacts is NULL\n");
+        goto done;
+    }
+       rval = ul.get_all_ucontacts(buf, len, 0, 0, 1);
+       if (rval<0) {
+               LM_ERR("failed to fetch contacts\n");
+               goto done;
+       }
+       if (rval > 0) {
+               if (buf != NULL)
+                       pkg_free(buf);
+               len = rval * 2;
+               buf = pkg_malloc(len);
+               if (buf == NULL) {
+                       LM_ERR("out of pkg memory\n");
+                       goto done;
+               }
+               rval = ul.get_all_ucontacts(buf, len, 0, 0, 1);
+               if (rval != 0) {
+                       pkg_free(buf);
+                       goto done;
+               }
+       }
+       if (buf == NULL)
+               goto done;      
+       cp = buf;
+    while (1) {
+        memcpy(&(c.len), cp, sizeof(c.len));
+        if (c.len == 0)
+            break;
+        c.s = (char*)cp + sizeof(c.len);
+        cp =  (char*)cp + sizeof(c.len) + c.len;
+        memcpy( &send_sock, cp, sizeof(send_sock));
+        cp = (char*)cp + sizeof(send_sock);
+        memcpy( &flags, cp, sizeof(flags));
+        cp = (char*)cp + sizeof(flags);
+        memcpy( &(path.len), cp, sizeof(path.len));
+        path.s = path.len ? ((char*)cp + sizeof(path.len)) : NULL ;
+        cp =  (char*)cp + sizeof(path.len) + path.len;
+        memcpy( &(ruid.len), cp, sizeof(ruid.len));
+        ruid.s = ruid.len ? ((char*)cp + sizeof(ruid.len)) : NULL ;
+        cp =  (char*)cp + sizeof(ruid.len) + ruid.len;
+        memcpy( &aorhash, cp, sizeof(aorhash));
+        cp = (char*)cp + sizeof(aorhash);
+
+
+        str aor;
+        sip_uri_t puri;
+        urecord_t* r;
+        udomain_t* _d;
+        ucontact_t* ptr = 0;
+
+        int res;
+
+        if (extract_aor(&c, &aor, &puri) < 0) {
+            LM_ERR("failed to extract address of record\n");
+            continue;
+        }
+        ul.get_udomain("location", &_d);
+
+        res = ul.get_urecord_by_ruid(_d, aorhash, &ruid, &r, &ptr);
+        aor = r->aor;
+        if (res > 0) {
+            LM_DBG("'%.*s' Not found in usrloc\n", aor.len, ZSW(aor.s));
+            ul.release_urecord(r);
+            ul.unlock_udomain(_d, &aor);
+            continue;
+        }
+        LM_DBG("- AoR: %.*s  AoRhash=%d  Flags=%d\n", aor.len, aor.s, aorhash, flags);
+
+        while (ptr) {
+            usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, node);
+            ptr = ptr->next;
+        }
+        ul.release_urecord(r);
+        ul.unlock_udomain(_d, &aor);
+    }
+       pkg_free(buf);
+       
+done:
+       c.s = ""; c.len = 0;
+}
+
+
+int usrloc_dmq_initialize()
+{
+       dmq_peer_t not_peer;
+       
+       /* load the DMQ API */
+       if (dmq_load_api(&usrloc_dmqb)!=0) {
+               LM_ERR("cannot load dmq api\n");
+               return -1;
+       } else {
+               LM_DBG("loaded dmq api\n");
+       }
+       not_peer.callback = usrloc_dmq_handle_msg;
+       not_peer.init_callback = usrloc_dmq_request_sync;
+       not_peer.description.s = "usrloc";
+       not_peer.description.len = 6;
+       not_peer.peer_id.s = "usrloc";
+       not_peer.peer_id.len = 6;
+       usrloc_dmq_peer = usrloc_dmqb.register_dmq_peer(&not_peer);
+       if(!usrloc_dmq_peer) {
+               LM_ERR("error in register_dmq_peer\n");
+               goto error;
+       } else {
+               LM_DBG("dmq peer registered\n");
+       }
+       return 0;
+error:
+       return -1;
+}
+
+
+int usrloc_dmq_send(str* body, dmq_node_t* node) {
+       if (!usrloc_dmq_peer) {
+               LM_ERR("dlg_dmq_peer is null!\n");
+               return -1;
+       }
+       if (node) {
+               LM_DBG("sending dmq message ...\n");
+               usrloc_dmqb.send_message(usrloc_dmq_peer, body, node, &usrloc_dmq_resp_callback, 1, &usrloc_dmq_content_type);
+       } else {
+               LM_DBG("sending dmq broadcast...\n");
+               usrloc_dmqb.bcast_message(usrloc_dmq_peer, body, 0, &usrloc_dmq_resp_callback, 1, &usrloc_dmq_content_type);
+       }
+       return 0;
+}
+
+
+/**
+* @brief ht dmq callback
+*/
+int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
+{
+       int content_length;
+       str body;
+       srjson_doc_t jdoc;
+       srjson_t *it = NULL;
+       static ucontact_info_t ci;
+
+       int action, expires, cseq, flags, cflags, q, last_modified, methods, reg_id;
+       str aor, ruid, c, received, path, callid, user_agent, instance;
+
+       parse_from_header(msg);
+       body = ((struct to_body*)msg->from->parsed)->uri;
+       
+       LM_DBG("dmq message received from %.*s\n", body.len, body.s);
+
+       if(!msg->content_length) {
+               LM_ERR("no content length header found\n");
+               goto invalid;
+       }
+       content_length = get_content_length(msg);
+       if(!content_length) {
+               LM_DBG("content length is 0\n");
+               goto invalid;
+       }
+
+       body.s = get_body(msg);
+       body.len = content_length;
+
+       if (!body.s) {
+               LM_ERR("unable to get body\n");
+               goto error;
+       }
+
+       srjson_InitDoc(&jdoc, NULL);
+       jdoc.buf = body;
+       if(jdoc.root == NULL) {
+               jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
+               if(jdoc.root == NULL)
+               {
+                       LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
+                       goto invalid;
+               }
+       }
+       
+       for(it=jdoc.root->child; it; it = it->next)
+       {
+               if (it->string == NULL) continue;
+               
+               if (strcmp(it->string, "action")==0) {
+                       action = it->valueint;
+               } else if (strcmp(it->string, "aor")==0) {
+                       aor.s = it->valuestring;
+                       aor.len = strlen(aor.s);
+               } else if (strcmp(it->string, "ruid")==0) {
+                       ruid.s = it->valuestring;
+                       ruid.len = strlen(ruid.s);
+               } else if (strcmp(it->string, "c")==0) {
+                       c.s = it->valuestring;
+                       c.len = strlen(c.s);
+               } else if (strcmp(it->string, "received")==0) {
+                       received.s = it->valuestring;
+                       received.len = strlen(received.s);
+               } else if (strcmp(it->string, "path")==0) {
+                       path.s = it->valuestring;
+                       path.len = strlen(path.s);
+               } else if (strcmp(it->string, "callid")==0) {
+                       callid.s = it->valuestring;
+                       callid.len = strlen(callid.s);
+               } else if (strcmp(it->string, "user_agent")==0) {
+                       user_agent.s = it->valuestring;
+                       user_agent.len = strlen(user_agent.s);
+               } else if (strcmp(it->string, "instance")==0) {
+                       instance.s = it->valuestring;
+                       instance.len = strlen(instance.s);
+               } else if (strcmp(it->string, "expires")==0) { //
+                       expires = it->valueint;
+               } else if (strcmp(it->string, "cseq")==0) {
+                       cseq = it->valueint;
+               } else if (strcmp(it->string, "flags")==0) {
+                       flags = it->valueint;
+               } else if (strcmp(it->string, "cflags")==0) {
+                       cflags = it->valueint;
+               } else if (strcmp(it->string, "q")==0) {
+                       q = it->valueint;
+               } else if (strcmp(it->string, "last_modified")==0) {
+                       last_modified = it->valueint;
+               } else if (strcmp(it->string, "methods")==0) {
+                       methods = it->valueint;
+               } else if (strcmp(it->string, "reg_id")==0) {
+                       reg_id = it->valueint;
+               } else {
+                       LM_ERR("unrecognized field in json object\n");
+               }               
+       } 
+       srjson_DestroyDoc(&jdoc);
+       memset( &ci, 0, sizeof(ucontact_info_t));
+       ci.ruid = ruid;
+       ci.c = &c;
+       ci.received = received;
+       ci.path = &path;
+       ci.expires = expires;
+       ci.q = q;
+       ci.callid = &callid;
+       ci.cseq = cseq;
+       ci.flags = flags;
+       ci.flags |= usrloc_dmq_flag;
+       ci.cflags = cflags;
+       ci.user_agent = &user_agent;
+       ci.methods = methods;
+       ci.instance = instance;
+       ci.reg_id = reg_id;
+       ci.tcpconn_id = -1;
+       ci.last_modified = last_modified;
+
+       switch(action) {
+               case DMQ_UPDATE:
+                                               LM_DBG("Received DMQ_UPDATE. Update contact info...\n");
+                                               add_contact(aor, &ci);
+                                               break;
+               case DMQ_RM:
+                                               LM_DBG("Received DMQ_RM. Delete contact info...\n");
+                                               break;
+               case DMQ_SYNC:
+                                               LM_DBG("Received DMQ_SYNC. Sending all contacts...\n");
+                                               usrloc_get_all_ucontact(node);
+                                               break;
+               case DMQ_NONE:
+                                               LM_DBG("Received DMQ_NONE. Not used...\n");
+                                               break;
+                                               
+               default:  goto invalid;
+       }
+       
+       resp->reason = dmq_200_rpl;
+       resp->resp_code = 200;
+       return 0;
+
+invalid:
+       resp->reason = dmq_400_rpl;
+       resp->resp_code = 400;
+       return 0;
+
+error:
+       resp->reason = dmq_500_rpl;
+       resp->resp_code = 500;
+       return 0;
+}
+
+
+int usrloc_dmq_request_sync() {
+       srjson_doc_t jdoc;
+       LM_DBG("requesting sync from dmq peers\n");
+       srjson_InitDoc(&jdoc, NULL);
+
+       jdoc.root = srjson_CreateObject(&jdoc);
+       if(jdoc.root==NULL) {
+               LM_ERR("cannot create json root\n");
+               goto error;
+       }
+
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DMQ_SYNC);
+       jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+       if(jdoc.buf.s==NULL) {
+               LM_ERR("unable to serialize data\n");
+               goto error;
+       }
+       jdoc.buf.len = strlen(jdoc.buf.s);
+       LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+       if (usrloc_dmq_send(&jdoc.buf, 0)!=0) {
+               goto error;
+       }
+
+       jdoc.free_fn(jdoc.buf.s);
+       jdoc.buf.s = NULL;
+       srjson_DestroyDoc(&jdoc);
+       return 0;
+
+error:
+       if(jdoc.buf.s!=NULL) {
+               jdoc.free_fn(jdoc.buf.s);
+               jdoc.buf.s = NULL;
+       }
+       srjson_DestroyDoc(&jdoc);
+       return -1;
+}
+
+int usrloc_dmq_send_contact(ucontact_t* ptr, str aor, int action, dmq_node_t* node) {
+       srjson_doc_t jdoc;
+       srjson_InitDoc(&jdoc, NULL);
+       
+       int flags;
+
+       jdoc.root = srjson_CreateObject(&jdoc);
+       if(jdoc.root==NULL) {
+               LM_ERR("cannot create json root\n");
+               goto error;
+       }
+
+       flags = ptr->flags;
+       flags &= ~usrloc_dmq_flag;
+
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
+       
+       srjson_AddStrToObject(&jdoc, jdoc.root, "aor", aor.s, aor.len);
+       srjson_AddStrToObject(&jdoc, jdoc.root, "ruid", ptr->ruid.s, ptr->ruid.len);
+       srjson_AddStrToObject(&jdoc, jdoc.root, "c", ptr->c.s, ptr->c.len);
+       srjson_AddStrToObject(&jdoc, jdoc.root, "received", ptr->received.s, ptr->received.len);
+       srjson_AddStrToObject(&jdoc, jdoc.root, "path", ptr->path.s, ptr->path.len);
+       srjson_AddStrToObject(&jdoc, jdoc.root, "callid", ptr->callid.s, ptr->callid.len);
+       srjson_AddStrToObject(&jdoc, jdoc.root, "user_agent", ptr->user_agent.s, ptr->user_agent.len);
+       srjson_AddStrToObject(&jdoc, jdoc.root, "instance", ptr->instance.s, ptr->instance.len);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "expires", ptr->expires);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "cseq", ptr->cseq);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "flags", flags);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "cflags", ptr->cflags);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "q", ptr->q);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "last_modified", ptr->last_modified);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "methods", ptr->methods);
+       srjson_AddNumberToObject(&jdoc, jdoc.root, "reg_id", ptr->reg_id);
+
+       jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
+       if(jdoc.buf.s==NULL) {
+               LM_ERR("unable to serialize data\n");
+               goto error;
+       }
+       jdoc.buf.len = strlen(jdoc.buf.s);
+       
+       LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
+       if (usrloc_dmq_send(&jdoc.buf, node)!=0) {
+               goto error;
+       }
+
+       jdoc.free_fn(jdoc.buf.s);
+       jdoc.buf.s = NULL;
+       srjson_DestroyDoc(&jdoc);
+       return 0;
+
+error:
+       if(jdoc.buf.s!=NULL) {
+               jdoc.free_fn(jdoc.buf.s);
+               jdoc.buf.s = NULL;
+       }
+       srjson_DestroyDoc(&jdoc);
+       return -1;
+}
+
+int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code,
+                            dmq_node_t* node, void* param)
+{
+       LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
+       return 0;
+}
+
+void ul_cb_contact(ucontact_t* ptr, int type, void* param)
+{
+       str aor;
+
+               LM_DBG("Callback from usrloc with type=%d\n", type);
+               aor.s = ptr->aor->s;
+               aor.len = ptr->aor->len;
+               
+               if (!(ptr->flags & usrloc_dmq_flag)) {
+               
+                       switch(type){
+                               case UL_CONTACT_INSERT:
+                                                                                       usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0);
+                                                                               break;
+                               case UL_CONTACT_UPDATE:
+                                                                                       usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE, 0);
+                                                                               break;
+                               case UL_CONTACT_DELETE:
+                                                                                       //usrloc_dmq_send_contact(ptr, aor, DMQ_RM);
+                                                                                       LM_DBG("Contact <%.*s> deleted\n", aor.len, aor.s);
+                                                                               break;
+                               case UL_CONTACT_EXPIRE:
+                                                                                       //usrloc_dmq_send_contact(ptr, aor, DMQ_UPDATE);
+                                                                                       LM_DBG("Contact <%.*s> expired\n", aor.len, aor.s);
+                                                                               break;
+                       }
+               } else {
+                       LM_DBG("Contact recieved from DMQ... skip\n");
+               }
+}
diff --git a/modules/usrloc_dmq/usrloc_sync.h b/modules/usrloc_dmq/usrloc_sync.h
new file mode 100644 (file)
index 0000000..383a62d
--- /dev/null
@@ -0,0 +1,32 @@
+#ifndef _DMQ_SYNC_USRLOC_H_
+#define _DMQ_SYNC_USRLOC_H_
+
+#include "../dmq/bind_dmq.h"
+#include "../../lib/srutils/srjson.h"
+#include "../../parser/msg_parser.h"
+#include "../../parser/parse_content.h"
+#include "../usrloc/usrloc.h"
+
+int usrloc_dmq_flag;
+
+extern dmq_api_t usrloc_dmqb;
+extern dmq_peer_t* usrloc_dmq_peer;
+extern dmq_resp_cback_t usrloc_dmq_resp_callback;
+extern rpc_export_t ul_rpc[];
+
+usrloc_api_t ul;
+
+typedef enum {
+       DMQ_NONE,
+       DMQ_UPDATE,
+       DMQ_RM,
+       DMQ_SYNC,
+} usrloc_dmq_action_t;
+
+int usrloc_dmq_resp_callback_f(struct sip_msg* msg, int code, dmq_node_t* node, void* param);
+int usrloc_dmq_initialize();
+int usrloc_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node);
+int usrloc_dmq_request_sync();
+void ul_cb_contact(ucontact_t* c, int type, void* param);
+
+#endif