0d47056f27342141b1849f952b1260c6ad775c94
[sip-router] / modules / dialog / dlg_dmq.c
1 /**
2 *
3 * Copyright (C) 2014 Alex Hermann (SpeakUp BV)
4 * Based on ht_dmq.c Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
5 *
6 * This file is part of Kamailio, a free SIP server.
7 *
8 * Kamailio is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
12 *
13 * Kamailio is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21 *
22 */
23
24 #include "dlg_dmq.h"
25 #include "dlg_hash.h"
26 #include "dlg_profile.h"
27 #include "dlg_var.h"
28
29 static str dlg_dmq_content_type = str_init("application/json");
30 static str dmq_200_rpl  = str_init("OK");
31 static str dmq_400_rpl  = str_init("Bad Request");
32 static str dmq_500_rpl  = str_init("Server Internal Error");
33
34 dmq_api_t dlg_dmqb;
35 dmq_peer_t* dlg_dmq_peer = NULL;
36 dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0};
37
38 int dmq_send_all_dlgs();
39 int dlg_dmq_request_sync();
40
41
42 /**
43 * @brief add notification peer
44 */
45 int dlg_dmq_initialize()
46 {
47         dmq_peer_t not_peer;
48
49         /* load the DMQ API */
50         if (dmq_load_api(&dlg_dmqb)!=0) {
51                 LM_ERR("cannot load dmq api\n");
52                 return -1;
53         } else {
54                 LM_DBG("loaded dmq api\n");
55         }
56
57         not_peer.callback = dlg_dmq_handle_msg;
58         not_peer.init_callback = dlg_dmq_request_sync;
59         not_peer.description.s = "dialog";
60         not_peer.description.len = 6;
61         not_peer.peer_id.s = "dialog";
62         not_peer.peer_id.len = 6;
63         dlg_dmq_peer = dlg_dmqb.register_dmq_peer(&not_peer);
64         if(!dlg_dmq_peer) {
65                 LM_ERR("error in register_dmq_peer\n");
66                 goto error;
67         } else {
68                 LM_DBG("dmq peer registered\n");
69         }
70         return 0;
71 error:
72         return -1;
73 }
74
75
76 int dlg_dmq_send(str* body, dmq_node_t* node) {
77         if (!dlg_dmq_peer) {
78                 LM_ERR("dlg_dmq_peer is null!\n");
79                 return -1;
80         }
81         if (node) {
82                 LM_DBG("sending dmq message ...\n");
83                 dlg_dmqb.send_message(dlg_dmq_peer, body, node, &dlg_dmq_resp_callback,
84                                 1, &dlg_dmq_content_type);
85         } else {
86                 LM_DBG("sending dmq broadcast...\n");
87                 dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback,
88                                 1, &dlg_dmq_content_type);
89         }
90         return 0;
91 }
92
93
94 /**
95 * @brief ht dmq callback
96 */
97 int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
98 {
99         int content_length;
100         str body;
101         dlg_cell_t *dlg;
102         int unref = 0;
103         int ret;
104         srjson_doc_t jdoc, prof_jdoc;
105         srjson_t *it = NULL;
106
107         dlg_dmq_action_t action = DLG_DMQ_NONE;
108         dlg_iuid_t iuid;
109         str profiles = {0, 0}, callid = {0, 0}, tag1 = {0,0}, tag2 = {0,0},
110                 contact1 = {0,0}, contact2 = {0,0}, k={0,0}, v={0,0};
111         str cseq1 = {0,0}, cseq2 = {0,0}, route_set1 = {0,0}, route_set2 = {0,0},
112                 from_uri = {0,0}, to_uri = {0,0}, req_uri = {0,0};
113         unsigned int init_ts = 0, start_ts = 0, lifetime = 0;
114         unsigned int state = 1;
115
116         /* received dmq message */
117         LM_DBG("dmq message received\n");
118
119         if(!msg->content_length) {
120                 LM_ERR("no content length header found\n");
121                 goto invalid2;
122         }
123         content_length = get_content_length(msg);
124         if(!content_length) {
125                 LM_DBG("content length is 0\n");
126                 goto invalid2;
127         }
128
129         body.s = get_body(msg);
130         body.len = content_length;
131
132         if (!body.s) {
133                 LM_ERR("unable to get body\n");
134                 goto error;
135         }
136
137         /* parse body */
138         LM_DBG("body: %.*s\n", body.len, body.s);
139
140         srjson_InitDoc(&jdoc, NULL);
141         jdoc.buf = body;
142
143         if(jdoc.root == NULL) {
144                 jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
145                 if(jdoc.root == NULL)
146                 {
147                         LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
148                         goto invalid;
149                 }
150         }
151
152         for(it=jdoc.root->child; it; it = it->next)
153         {
154                 if ((it->string == NULL) || (strcmp(it->string, "vars")==0)) continue;
155
156                 LM_DBG("found field: %s\n", it->string);
157
158                 if (strcmp(it->string, "action")==0) {
159                         action = SRJSON_GET_UINT(it);
160                 } else if (strcmp(it->string, "h_entry")==0) {
161                         iuid.h_entry = SRJSON_GET_UINT(it);
162                 } else if (strcmp(it->string, "h_id")==0) {
163                         iuid.h_id = SRJSON_GET_UINT(it);
164                 } else if (strcmp(it->string, "init_ts")==0) {
165                         init_ts = SRJSON_GET_UINT(it);
166                 } else if (strcmp(it->string, "start_ts")==0) {
167                         start_ts = SRJSON_GET_UINT(it);
168                 } else if (strcmp(it->string, "state")==0) {
169                         state = SRJSON_GET_UINT(it);
170                 } else if (strcmp(it->string, "lifetime")==0) {
171                         lifetime = SRJSON_GET_UINT(it);
172                 } else if (strcmp(it->string, "callid")==0) {
173                         callid.s = it->valuestring;
174                         callid.len = strlen(callid.s);
175                 } else if (strcmp(it->string, "profiles")==0) {
176                         profiles.s = it->valuestring;
177                         profiles.len = strlen(profiles.s);
178                 } else if (strcmp(it->string, "tag1")==0) {
179                         tag1.s = it->valuestring;
180                         tag1.len = strlen(tag1.s);
181                 } else if (strcmp(it->string, "tag2")==0) {
182                         tag2.s = it->valuestring;
183                         tag2.len = strlen(tag2.s);
184                 } else if (strcmp(it->string, "cseq1")==0) {
185                         cseq1.s = it->valuestring;
186                         cseq1.len = strlen(cseq1.s);
187                 } else if (strcmp(it->string, "cseq2")==0) {
188                         cseq2.s = it->valuestring;
189                         cseq2.len = strlen(cseq2.s);
190                 } else if (strcmp(it->string, "route_set1")==0) {
191                         route_set1.s = it->valuestring;
192                         route_set1.len = strlen(route_set1.s);
193                 } else if (strcmp(it->string, "route_set2")==0) {
194                         route_set2.s = it->valuestring;
195                         route_set2.len = strlen(route_set2.s);
196                 } else if (strcmp(it->string, "contact1")==0) {
197                         contact1.s = it->valuestring;
198                         contact1.len = strlen(contact1.s);
199                 } else if (strcmp(it->string, "contact2")==0) {
200                         contact2.s = it->valuestring;
201                         contact2.len = strlen(contact2.s);
202                 } else if (strcmp(it->string, "from_uri")==0) {
203                         from_uri.s = it->valuestring;
204                         from_uri.len = strlen(from_uri.s);
205                 } else if (strcmp(it->string, "to_uri")==0) {
206                         to_uri.s = it->valuestring;
207                         to_uri.len = strlen(to_uri.s);
208                 } else if (strcmp(it->string, "req_uri")==0) {
209                         req_uri.s = it->valuestring;
210                         req_uri.len = strlen(req_uri.s);
211                 } else {
212                         LM_ERR("unrecognized field in json object\n");
213                 }
214         }
215
216         dlg = dlg_get_by_iuid(&iuid);
217         if (dlg) {
218                 LM_DBG("found dialog [%u:%u] at %p\n", iuid.h_entry, iuid.h_id, dlg);
219                 unref++;
220         }
221
222         switch(action) {
223                 case DLG_DMQ_UPDATE:
224                         LM_DBG("Updating dlg [%u:%u] with callid [%.*s]\n", iuid.h_entry, iuid.h_id,
225                                         callid.len, callid.s);
226                         if (!dlg) {
227                                 dlg = build_new_dlg(&callid, &from_uri, &to_uri, &tag1, &req_uri);
228                                 if (!dlg) {
229                                         LM_ERR("failed to build new dialog\n");
230                                         goto error;
231                                 }
232
233                                 if(dlg->h_entry != iuid.h_entry){
234                                         LM_ERR("inconsistent hash data from peer: "
235                                                 "make sure all Kamailio's use the same hash size\n");
236                                         shm_free(dlg);
237                                         goto error;
238                                 }
239
240                                 /* link the dialog */
241                                 link_dlg(dlg, 0, 0);
242                                 dlg_set_leg_info(dlg, &tag1, &route_set1, &contact1, &cseq1, 0);
243                                 /* override generated h_id */
244                                 dlg->h_id = iuid.h_id;
245                                 /* prevent DB sync */
246                                 dlg->dflags &= ~(DLG_FLAG_NEW|DLG_FLAG_CHANGED);
247                                 dlg->iflags |= DLG_IFLAG_DMQ_SYNC;
248                         } else {
249                                 /* remove existing profiles */
250                                 if (dlg->profile_links!=NULL) {
251                                         destroy_linkers(dlg->profile_links);
252                                         dlg->profile_links = NULL;
253                                 }
254                         }
255
256                         dlg->init_ts = init_ts;
257                         dlg->start_ts = start_ts;
258
259                         srjson_t *vj;
260                         vj = srjson_GetObjectItem(&jdoc, jdoc.root, "vars");
261                         for(it=vj->child; it; it = it->next)
262                         {
263                                 k.s = it->string;        k.len = strlen(k.s);
264                                 v.s = it->valuestring;   v.len = strlen(v.s);
265                                 set_dlg_variable(dlg, &k, &v);
266                         }
267                         /* add profiles */
268                         if(profiles.s!=NULL) {
269                                 srjson_InitDoc(&prof_jdoc, NULL);
270                                 prof_jdoc.buf = profiles;
271                                 dlg_json_to_profiles(dlg, &prof_jdoc);
272                                 srjson_DestroyDoc(&prof_jdoc);
273                         }
274                         if (state == dlg->state) {
275                                 break;
276                         }
277                         /* intentional fallthrough */
278
279                 case DLG_DMQ_STATE:
280                         if (!dlg) {
281                                 LM_ERR("dialog [%u:%u] not found\n", iuid.h_entry, iuid.h_id);
282                                 goto error;
283                         }
284                         if (state < dlg->state) {
285                                 LM_NOTICE("Ignoring backwards state change on dlg [%u:%u]"
286                                                 " with callid [%.*s] from state [%u] to state [%u]\n",
287                                         iuid.h_entry, iuid.h_id,
288                                         dlg->callid.len, dlg->callid.s, dlg->state, state);
289                                 break;
290                         }
291                         LM_DBG("State update dlg [%u:%u] with callid [%.*s] from state [%u]"
292                                         " to state [%u]\n", iuid.h_entry, iuid.h_id,
293                                         dlg->callid.len, dlg->callid.s, dlg->state, state);
294                         switch (state) {
295                                 case DLG_STATE_EARLY:
296                                         dlg->start_ts = start_ts;
297                                         dlg->lifetime = lifetime;
298                                         dlg_set_leg_info(dlg, &tag1, &route_set1, &contact1, &cseq1, 0);
299                                         break;
300                                 case DLG_STATE_CONFIRMED:
301                                         dlg->start_ts = start_ts;
302                                         dlg->lifetime = lifetime;
303                                         dlg_set_leg_info(dlg, &tag1, &route_set1, &contact1, &cseq1, 0);
304                                         dlg_set_leg_info(dlg, &tag2, &route_set2, &contact2, &cseq2, 1);
305                                         if (insert_dlg_timer( &dlg->tl, dlg->lifetime ) != 0) {
306                                                 LM_CRIT("Unable to insert dlg timer %p [%u:%u]\n",
307                                                         dlg, dlg->h_entry, dlg->h_id);
308                                         } else {
309                                                 /* dialog pointer inserted in timer list */
310                                                 dlg_ref(dlg, 1);
311                                         }
312                                         break;
313                                 case DLG_STATE_DELETED:
314                                         if (dlg->state == DLG_STATE_CONFIRMED) {
315                                                 ret = remove_dialog_timer(&dlg->tl);
316                                                 if (ret == 0) {
317                                                         /* one extra unref due to removal from timer list */
318                                                         unref++;
319                                                 } else if (ret < 0) {
320                                                         LM_CRIT("unable to unlink the timer on dlg %p [%u:%u]\n",
321                                                                 dlg, dlg->h_entry, dlg->h_id);
322                                                 }
323                                         }
324                                         /* prevent DB sync */
325                                         dlg->dflags |= DLG_FLAG_NEW;
326                                         /* keep dialog around for a bit, to prevent out-of-order
327                                          * syncs to reestablish the dlg */
328                                         dlg->init_ts = time(NULL);
329                                         break;
330                                 default:
331                                         LM_ERR("unhandled state update to state %u\n", state);
332                                         dlg_unref(dlg, unref);
333                                         goto error;
334                         }
335                         dlg->state = state;
336                         break;
337
338                 case DLG_DMQ_RM:
339                         if (!dlg) {
340                                 LM_DBG("dialog [%u:%u] not found\n", iuid.h_entry, iuid.h_id);
341                                 goto error;
342                         }
343                         LM_DBG("Removed dlg [%u:%u] with callid [%.*s] int state [%u]\n",
344                                         iuid.h_entry, iuid.h_id,
345                                         dlg->callid.len, dlg->callid.s, dlg->state);
346                         if (dlg->state==DLG_STATE_CONFIRMED
347                                         || dlg->state==DLG_STATE_EARLY) {
348                                 ret = remove_dialog_timer(&dlg->tl);
349                                 if (ret == 0) {
350                                         /* one extra unref due to removal from timer list */
351                                         unref++;
352                                 } else if (ret < 0) {
353                                         LM_CRIT("unable to unlink the timer on dlg %p [%u:%u]\n",
354                                                 dlg, dlg->h_entry, dlg->h_id);
355                                 }
356                         }
357                         /* prevent DB sync */
358                         dlg->dflags |= DLG_FLAG_NEW;
359                         unref++;
360                         break;
361
362                 case DLG_DMQ_SYNC:
363                         dmq_send_all_dlgs(0);
364                         break;
365
366                 case DLG_DMQ_NONE:
367                         break;
368         }
369         if (dlg && unref)
370                 dlg_unref(dlg, unref);
371
372         srjson_DestroyDoc(&jdoc);
373         resp->reason = dmq_200_rpl;
374         resp->resp_code = 200;
375         return 0;
376
377 invalid:
378         srjson_DestroyDoc(&jdoc);
379 invalid2:
380         resp->reason = dmq_400_rpl;
381         resp->resp_code = 400;
382         return 0;
383
384 error:
385         srjson_DestroyDoc(&jdoc);
386         resp->reason = dmq_500_rpl;
387         resp->resp_code = 500;
388         return 0;
389 }
390
391
392 int dlg_dmq_request_sync() {
393         srjson_doc_t jdoc;
394
395         LM_DBG("requesting sync from dmq peers\n");
396
397         srjson_InitDoc(&jdoc, NULL);
398
399         jdoc.root = srjson_CreateObject(&jdoc);
400         if(jdoc.root==NULL) {
401                 LM_ERR("cannot create json root\n");
402                 goto error;
403         }
404
405         srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DLG_DMQ_SYNC);
406         jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
407         if(jdoc.buf.s==NULL) {
408                 LM_ERR("unable to serialize data\n");
409                 goto error;
410         }
411         jdoc.buf.len = strlen(jdoc.buf.s);
412         LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
413         if (dlg_dmq_send(&jdoc.buf, 0)!=0) {
414                 goto error;
415         }
416
417         jdoc.free_fn(jdoc.buf.s);
418         jdoc.buf.s = NULL;
419         srjson_DestroyDoc(&jdoc);
420         return 0;
421
422 error:
423         if(jdoc.buf.s!=NULL) {
424                 jdoc.free_fn(jdoc.buf.s);
425                 jdoc.buf.s = NULL;
426         }
427         srjson_DestroyDoc(&jdoc);
428         return -1;
429 }
430
431
432 int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg,
433                 int needlock, dmq_node_t *node ) {
434
435         srjson_doc_t jdoc, prof_jdoc;
436         dlg_var_t *var;
437
438         LM_DBG("replicating action [%d] on [%u:%u] to dmq peers\n", action,
439                         dlg->h_entry, dlg->h_id);
440
441         if (action == DLG_DMQ_UPDATE) {
442                 if (!node && (dlg->iflags & DLG_IFLAG_DMQ_SYNC)
443                                 && ((dlg->dflags & DLG_FLAG_CHANGED_PROF) == 0)) {
444                         LM_DBG("dlg not changed, no sync\n");
445                         return 1;
446                 }
447         } else if ( (dlg->iflags & DLG_IFLAG_DMQ_SYNC) == 0 ) {
448                 LM_DBG("dlg not synced, no sync\n");
449                 return 1;
450         }
451         if (action == DLG_DMQ_STATE && (dlg->state != DLG_STATE_CONFIRMED
452                                 && dlg->state != DLG_STATE_DELETED
453                                 && dlg->state != DLG_STATE_EARLY)) {
454                 LM_DBG("not syncing state %u\n", dlg->state);
455                 return 1;
456         }
457
458         srjson_InitDoc(&jdoc, NULL);
459
460         jdoc.root = srjson_CreateObject(&jdoc);
461         if(jdoc.root==NULL) {
462                 LM_ERR("cannot create json root\n");
463                 goto error;
464         }
465
466         if (needlock)
467                 dlg_lock(d_table, &(d_table->entries[dlg->h_entry]));
468
469         srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
470         srjson_AddNumberToObject(&jdoc, jdoc.root, "h_entry", dlg->h_entry);
471         srjson_AddNumberToObject(&jdoc, jdoc.root, "h_id", dlg->h_id);
472
473         switch(action) {
474                 case DLG_DMQ_UPDATE:
475                         dlg->iflags |= DLG_IFLAG_DMQ_SYNC;
476                         dlg->dflags &= ~DLG_FLAG_CHANGED_PROF;
477                         srjson_AddNumberToObject(&jdoc, jdoc.root, "init_ts",
478                                         dlg->init_ts);
479                         srjson_AddStrToObject(&jdoc, jdoc.root, "callid",
480                                         dlg->callid.s, dlg->callid.len);
481
482                         srjson_AddStrToObject(&jdoc, jdoc.root, "from_uri",
483                                         dlg->from_uri.s, dlg->from_uri.len);
484                         srjson_AddStrToObject(&jdoc, jdoc.root, "to_uri",
485                                         dlg->to_uri.s, dlg->to_uri.len);
486                         srjson_AddStrToObject(&jdoc, jdoc.root, "req_uri",
487                                         dlg->req_uri.s, dlg->req_uri.len);
488                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag1",
489                                         dlg->tag[0].s, dlg->tag[0].len);
490                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq1",
491                                         dlg->cseq[0].s, dlg->cseq[0].len);
492                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set1",
493                                         dlg->route_set[0].s, dlg->route_set[0].len);
494                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact1",
495                                         dlg->contact[0].s, dlg->contact[0].len);
496
497                         if (dlg->vars != NULL) {
498                                 srjson_t *pj = NULL;
499                                 pj = srjson_CreateObject(&jdoc);
500                 for(var=dlg->vars ; var ; var=var->next) {
501                                         srjson_AddStrToObject(&jdoc, pj, var->key.s,
502                                                         var->value.s, var->value.len);
503                 }
504                 srjson_AddItemToObject(&jdoc, jdoc.root, "vars", pj);
505                         }
506
507                         if (dlg->profile_links) {
508                                 srjson_InitDoc(&prof_jdoc, NULL);
509                                 dlg_profiles_to_json(dlg, &prof_jdoc);
510                                 if(prof_jdoc.buf.s!=NULL) {
511                                         LM_DBG("adding profiles: [%.*s]\n",
512                                                         prof_jdoc.buf.len, prof_jdoc.buf.s);
513                                         srjson_AddStrToObject(&jdoc, jdoc.root, "profiles",
514                                                         prof_jdoc.buf.s, prof_jdoc.buf.len);
515                                         prof_jdoc.free_fn(prof_jdoc.buf.s);
516                                         prof_jdoc.buf.s = NULL;
517                                 }
518                                 srjson_DestroyDoc(&prof_jdoc);
519                         }
520                         /* intentional fallthrough */
521
522                 case DLG_DMQ_STATE:
523                         srjson_AddNumberToObject(&jdoc, jdoc.root, "state", dlg->state);
524                         switch (dlg->state) {
525                                 case DLG_STATE_EARLY:
526                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "start_ts",
527                                                         dlg->start_ts);
528                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "lifetime",
529                                                         dlg->lifetime);
530
531                                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag1",
532                                                         dlg->tag[0].s, dlg->tag[0].len);
533                                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq1",
534                                                         dlg->cseq[0].s, dlg->cseq[0].len);
535                                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set1",
536                                                         dlg->route_set[0].s, dlg->route_set[0].len);
537                                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact1",
538                                                         dlg->contact[0].s, dlg->contact[0].len);
539                                         break;
540                                 case DLG_STATE_CONFIRMED:
541                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "start_ts",
542                                                         dlg->start_ts);
543                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "lifetime",
544                                                         dlg->lifetime);
545
546                                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag1",
547                                                         dlg->tag[0].s, dlg->tag[0].len);
548                                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag2",
549                                                         dlg->tag[1].s, dlg->tag[1].len);
550                                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq1",
551                                                         dlg->cseq[0].s, dlg->cseq[0].len);
552                                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq2",
553                                                         dlg->cseq[1].s, dlg->cseq[1].len);
554                                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set1",
555                                                         dlg->route_set[0].s, dlg->route_set[0].len);
556                                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set2",
557                                                         dlg->route_set[1].s, dlg->route_set[1].len);
558                                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact1",
559                                                         dlg->contact[0].s, dlg->contact[0].len);
560                                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact2",
561                                                         dlg->contact[1].s, dlg->contact[1].len);
562
563                                         break;
564                                 case DLG_STATE_DELETED:
565                                         //dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
566                                         break;
567                                 default:
568                                         LM_DBG("not syncing state %u\n", dlg->state);
569                         }
570                         break;
571
572                 case DLG_DMQ_RM:
573                         srjson_AddNumberToObject(&jdoc, jdoc.root, "state", dlg->state);
574                         dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
575                         break;
576
577                 case DLG_DMQ_NONE:
578                 case DLG_DMQ_SYNC:
579                         break;
580         }
581         if (needlock)
582                 dlg_unlock(d_table, &(d_table->entries[dlg->h_entry]));
583
584         jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
585         if(jdoc.buf.s==NULL) {
586                 LM_ERR("unable to serialize data\n");
587                 goto error;
588         }
589         jdoc.buf.len = strlen(jdoc.buf.s);
590         LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
591         if (dlg_dmq_send(&jdoc.buf, node)!=0) {
592                 goto error;
593         }
594
595         jdoc.free_fn(jdoc.buf.s);
596         jdoc.buf.s = NULL;
597         srjson_DestroyDoc(&jdoc);
598         return 0;
599
600 error:
601         if(jdoc.buf.s!=NULL) {
602                 jdoc.free_fn(jdoc.buf.s);
603                 jdoc.buf.s = NULL;
604         }
605         srjson_DestroyDoc(&jdoc);
606         return -1;
607 }
608
609
610 int dmq_send_all_dlgs(dmq_node_t* dmq_node) {
611         int index;
612         dlg_entry_t entry;
613         dlg_cell_t *dlg;
614
615         LM_DBG("sending all dialogs \n");
616
617         for(index = 0; index< d_table->size; index++){
618                 /* lock the whole entry */
619                 entry = (d_table->entries)[index];
620                 dlg_lock( d_table, &entry);
621
622                 for(dlg = entry.first; dlg != NULL; dlg = dlg->next){
623                                 dlg->dflags |= DLG_FLAG_CHANGED_PROF;
624                                 dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node);
625                 }
626
627                 dlg_unlock( d_table, &entry);
628         }
629
630         return 0;
631 }
632
633
634 /**
635 * @brief dmq response callback
636 */
637 int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code,
638                             dmq_node_t* node, void* param)
639 {
640         LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
641         return 0;
642 }