dialog: safety check of return pointer to dmq vars field
[sip-router] / modules / dialog / dlg_dmq.c
1 /**
2 *
3 * Copyright (C) 2014 Alex Hermann (SpeakUp BV)
4 * Based on ht_dmq.c Copyright (C) 2013 Charles Chance (Sipcentric Ltd)
5 *
6 * This file is part of Kamailio, a free SIP server.
7 *
8 * Kamailio is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
12 *
13 * Kamailio is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
21 *
22 */
23
24 #include "dlg_dmq.h"
25 #include "dlg_hash.h"
26 #include "dlg_profile.h"
27 #include "dlg_var.h"
28
29 static str dlg_dmq_content_type = str_init("application/json");
30 static str dmq_200_rpl  = str_init("OK");
31 static str dmq_400_rpl  = str_init("Bad Request");
32 static str dmq_500_rpl  = str_init("Server Internal Error");
33
34 dmq_api_t dlg_dmqb;
35 dmq_peer_t* dlg_dmq_peer = NULL;
36 dmq_resp_cback_t dlg_dmq_resp_callback = {&dlg_dmq_resp_callback_f, 0};
37
38 int dmq_send_all_dlgs();
39 int dlg_dmq_request_sync();
40
41
42 /**
43 * @brief add notification peer
44 */
45 int dlg_dmq_initialize()
46 {
47         dmq_peer_t not_peer;
48
49         /* load the DMQ API */
50         if (dmq_load_api(&dlg_dmqb)!=0) {
51                 LM_ERR("cannot load dmq api\n");
52                 return -1;
53         } else {
54                 LM_DBG("loaded dmq api\n");
55         }
56
57         not_peer.callback = dlg_dmq_handle_msg;
58         not_peer.init_callback = dlg_dmq_request_sync;
59         not_peer.description.s = "dialog";
60         not_peer.description.len = 6;
61         not_peer.peer_id.s = "dialog";
62         not_peer.peer_id.len = 6;
63         dlg_dmq_peer = dlg_dmqb.register_dmq_peer(&not_peer);
64         if(!dlg_dmq_peer) {
65                 LM_ERR("error in register_dmq_peer\n");
66                 goto error;
67         } else {
68                 LM_DBG("dmq peer registered\n");
69         }
70         return 0;
71 error:
72         return -1;
73 }
74
75
76 int dlg_dmq_send(str* body, dmq_node_t* node) {
77         if (!dlg_dmq_peer) {
78                 LM_ERR("dlg_dmq_peer is null!\n");
79                 return -1;
80         }
81         if (node) {
82                 LM_DBG("sending dmq message ...\n");
83                 dlg_dmqb.send_message(dlg_dmq_peer, body, node, &dlg_dmq_resp_callback,
84                                 1, &dlg_dmq_content_type);
85         } else {
86                 LM_DBG("sending dmq broadcast...\n");
87                 dlg_dmqb.bcast_message(dlg_dmq_peer, body, 0, &dlg_dmq_resp_callback,
88                                 1, &dlg_dmq_content_type);
89         }
90         return 0;
91 }
92
93
94 /**
95 * @brief ht dmq callback
96 */
97 int dlg_dmq_handle_msg(struct sip_msg* msg, peer_reponse_t* resp, dmq_node_t* node)
98 {
99         int content_length;
100         str body;
101         dlg_cell_t *dlg;
102         int unref = 0;
103         int ret;
104         srjson_doc_t jdoc, prof_jdoc;
105         srjson_t *it = NULL;
106
107         dlg_dmq_action_t action = DLG_DMQ_NONE;
108         dlg_iuid_t iuid;
109         str profiles = {0, 0}, callid = {0, 0}, tag1 = {0,0}, tag2 = {0,0},
110                 contact1 = {0,0}, contact2 = {0,0}, k={0,0}, v={0,0};
111         str cseq1 = {0,0}, cseq2 = {0,0}, route_set1 = {0,0}, route_set2 = {0,0},
112                 from_uri = {0,0}, to_uri = {0,0}, req_uri = {0,0};
113         unsigned int init_ts = 0, start_ts = 0, lifetime = 0;
114         unsigned int state = 1;
115         srjson_t *vj;
116
117         /* received dmq message */
118         LM_DBG("dmq message received\n");
119
120         if(!msg->content_length) {
121                 LM_ERR("no content length header found\n");
122                 goto invalid2;
123         }
124         content_length = get_content_length(msg);
125         if(!content_length) {
126                 LM_DBG("content length is 0\n");
127                 goto invalid2;
128         }
129
130         body.s = get_body(msg);
131         body.len = content_length;
132
133         if (!body.s) {
134                 LM_ERR("unable to get body\n");
135                 goto error;
136         }
137
138         /* parse body */
139         LM_DBG("body: %.*s\n", body.len, body.s);
140
141         srjson_InitDoc(&jdoc, NULL);
142         jdoc.buf = body;
143
144         if(jdoc.root == NULL) {
145                 jdoc.root = srjson_Parse(&jdoc, jdoc.buf.s);
146                 if(jdoc.root == NULL)
147                 {
148                         LM_ERR("invalid json doc [[%s]]\n", jdoc.buf.s);
149                         goto invalid;
150                 }
151         }
152
153         for(it=jdoc.root->child; it; it = it->next)
154         {
155                 if ((it->string == NULL) || (strcmp(it->string, "vars")==0)) continue;
156
157                 LM_DBG("found field: %s\n", it->string);
158
159                 if (strcmp(it->string, "action")==0) {
160                         action = SRJSON_GET_UINT(it);
161                 } else if (strcmp(it->string, "h_entry")==0) {
162                         iuid.h_entry = SRJSON_GET_UINT(it);
163                 } else if (strcmp(it->string, "h_id")==0) {
164                         iuid.h_id = SRJSON_GET_UINT(it);
165                 } else if (strcmp(it->string, "init_ts")==0) {
166                         init_ts = SRJSON_GET_UINT(it);
167                 } else if (strcmp(it->string, "start_ts")==0) {
168                         start_ts = SRJSON_GET_UINT(it);
169                 } else if (strcmp(it->string, "state")==0) {
170                         state = SRJSON_GET_UINT(it);
171                 } else if (strcmp(it->string, "lifetime")==0) {
172                         lifetime = SRJSON_GET_UINT(it);
173                 } else if (strcmp(it->string, "callid")==0) {
174                         callid.s = it->valuestring;
175                         callid.len = strlen(callid.s);
176                 } else if (strcmp(it->string, "profiles")==0) {
177                         profiles.s = it->valuestring;
178                         profiles.len = strlen(profiles.s);
179                 } else if (strcmp(it->string, "tag1")==0) {
180                         tag1.s = it->valuestring;
181                         tag1.len = strlen(tag1.s);
182                 } else if (strcmp(it->string, "tag2")==0) {
183                         tag2.s = it->valuestring;
184                         tag2.len = strlen(tag2.s);
185                 } else if (strcmp(it->string, "cseq1")==0) {
186                         cseq1.s = it->valuestring;
187                         cseq1.len = strlen(cseq1.s);
188                 } else if (strcmp(it->string, "cseq2")==0) {
189                         cseq2.s = it->valuestring;
190                         cseq2.len = strlen(cseq2.s);
191                 } else if (strcmp(it->string, "route_set1")==0) {
192                         route_set1.s = it->valuestring;
193                         route_set1.len = strlen(route_set1.s);
194                 } else if (strcmp(it->string, "route_set2")==0) {
195                         route_set2.s = it->valuestring;
196                         route_set2.len = strlen(route_set2.s);
197                 } else if (strcmp(it->string, "contact1")==0) {
198                         contact1.s = it->valuestring;
199                         contact1.len = strlen(contact1.s);
200                 } else if (strcmp(it->string, "contact2")==0) {
201                         contact2.s = it->valuestring;
202                         contact2.len = strlen(contact2.s);
203                 } else if (strcmp(it->string, "from_uri")==0) {
204                         from_uri.s = it->valuestring;
205                         from_uri.len = strlen(from_uri.s);
206                 } else if (strcmp(it->string, "to_uri")==0) {
207                         to_uri.s = it->valuestring;
208                         to_uri.len = strlen(to_uri.s);
209                 } else if (strcmp(it->string, "req_uri")==0) {
210                         req_uri.s = it->valuestring;
211                         req_uri.len = strlen(req_uri.s);
212                 } else {
213                         LM_ERR("unrecognized field in json object\n");
214                 }
215         }
216
217         dlg = dlg_get_by_iuid(&iuid);
218         if (dlg) {
219                 LM_DBG("found dialog [%u:%u] at %p\n", iuid.h_entry, iuid.h_id, dlg);
220                 unref++;
221         }
222
223         switch(action) {
224                 case DLG_DMQ_UPDATE:
225                         LM_DBG("Updating dlg [%u:%u] with callid [%.*s]\n", iuid.h_entry, iuid.h_id,
226                                         callid.len, callid.s);
227                         if (!dlg) {
228                                 dlg = build_new_dlg(&callid, &from_uri, &to_uri, &tag1, &req_uri);
229                                 if (!dlg) {
230                                         LM_ERR("failed to build new dialog\n");
231                                         goto error;
232                                 }
233
234                                 if(dlg->h_entry != iuid.h_entry){
235                                         LM_ERR("inconsistent hash data from peer: "
236                                                 "make sure all Kamailio's use the same hash size\n");
237                                         shm_free(dlg);
238                                         goto error;
239                                 }
240
241                                 /* link the dialog */
242                                 link_dlg(dlg, 0, 0);
243                                 dlg_set_leg_info(dlg, &tag1, &route_set1, &contact1, &cseq1, 0);
244                                 /* override generated h_id */
245                                 dlg->h_id = iuid.h_id;
246                                 /* prevent DB sync */
247                                 dlg->dflags &= ~(DLG_FLAG_NEW|DLG_FLAG_CHANGED);
248                                 dlg->iflags |= DLG_IFLAG_DMQ_SYNC;
249                         } else {
250                                 /* remove existing profiles */
251                                 if (dlg->profile_links!=NULL) {
252                                         destroy_linkers(dlg->profile_links);
253                                         dlg->profile_links = NULL;
254                                 }
255                         }
256
257                         dlg->init_ts = init_ts;
258                         dlg->start_ts = start_ts;
259
260                         vj = srjson_GetObjectItem(&jdoc, jdoc.root, "vars");
261                         if(vj!=NULL) {
262                                 for(it=vj->child; it; it = it->next)
263                                 {
264                                         k.s = it->string;        k.len = strlen(k.s);
265                                         v.s = it->valuestring;   v.len = strlen(v.s);
266                                         set_dlg_variable(dlg, &k, &v);
267                                 }
268                         }
269                         /* add profiles */
270                         if(profiles.s!=NULL) {
271                                 srjson_InitDoc(&prof_jdoc, NULL);
272                                 prof_jdoc.buf = profiles;
273                                 dlg_json_to_profiles(dlg, &prof_jdoc);
274                                 srjson_DestroyDoc(&prof_jdoc);
275                         }
276                         if (state == dlg->state) {
277                                 break;
278                         }
279                         /* intentional fallthrough */
280
281                 case DLG_DMQ_STATE:
282                         if (!dlg) {
283                                 LM_ERR("dialog [%u:%u] not found\n", iuid.h_entry, iuid.h_id);
284                                 goto error;
285                         }
286                         if (state < dlg->state) {
287                                 LM_NOTICE("Ignoring backwards state change on dlg [%u:%u]"
288                                                 " with callid [%.*s] from state [%u] to state [%u]\n",
289                                         iuid.h_entry, iuid.h_id,
290                                         dlg->callid.len, dlg->callid.s, dlg->state, state);
291                                 break;
292                         }
293                         LM_DBG("State update dlg [%u:%u] with callid [%.*s] from state [%u]"
294                                         " to state [%u]\n", iuid.h_entry, iuid.h_id,
295                                         dlg->callid.len, dlg->callid.s, dlg->state, state);
296                         switch (state) {
297                                 case DLG_STATE_EARLY:
298                                         dlg->start_ts = start_ts;
299                                         dlg->lifetime = lifetime;
300                                         dlg_set_leg_info(dlg, &tag1, &route_set1, &contact1, &cseq1, 0);
301                                         break;
302                                 case DLG_STATE_CONFIRMED:
303                                         dlg->start_ts = start_ts;
304                                         dlg->lifetime = lifetime;
305                                         dlg_set_leg_info(dlg, &tag1, &route_set1, &contact1, &cseq1, 0);
306                                         dlg_set_leg_info(dlg, &tag2, &route_set2, &contact2, &cseq2, 1);
307                                         if (insert_dlg_timer( &dlg->tl, dlg->lifetime ) != 0) {
308                                                 LM_CRIT("Unable to insert dlg timer %p [%u:%u]\n",
309                                                         dlg, dlg->h_entry, dlg->h_id);
310                                         } else {
311                                                 /* dialog pointer inserted in timer list */
312                                                 dlg_ref(dlg, 1);
313                                         }
314                                         break;
315                                 case DLG_STATE_DELETED:
316                                         if (dlg->state == DLG_STATE_CONFIRMED) {
317                                                 ret = remove_dialog_timer(&dlg->tl);
318                                                 if (ret == 0) {
319                                                         /* one extra unref due to removal from timer list */
320                                                         unref++;
321                                                 } else if (ret < 0) {
322                                                         LM_CRIT("unable to unlink the timer on dlg %p [%u:%u]\n",
323                                                                 dlg, dlg->h_entry, dlg->h_id);
324                                                 }
325                                         }
326                                         /* prevent DB sync */
327                                         dlg->dflags |= DLG_FLAG_NEW;
328                                         /* keep dialog around for a bit, to prevent out-of-order
329                                          * syncs to reestablish the dlg */
330                                         dlg->init_ts = time(NULL);
331                                         break;
332                                 default:
333                                         LM_ERR("unhandled state update to state %u\n", state);
334                                         dlg_unref(dlg, unref);
335                                         goto error;
336                         }
337                         dlg->state = state;
338                         break;
339
340                 case DLG_DMQ_RM:
341                         if (!dlg) {
342                                 LM_DBG("dialog [%u:%u] not found\n", iuid.h_entry, iuid.h_id);
343                                 goto error;
344                         }
345                         LM_DBG("Removed dlg [%u:%u] with callid [%.*s] int state [%u]\n",
346                                         iuid.h_entry, iuid.h_id,
347                                         dlg->callid.len, dlg->callid.s, dlg->state);
348                         if (dlg->state==DLG_STATE_CONFIRMED
349                                         || dlg->state==DLG_STATE_EARLY) {
350                                 ret = remove_dialog_timer(&dlg->tl);
351                                 if (ret == 0) {
352                                         /* one extra unref due to removal from timer list */
353                                         unref++;
354                                 } else if (ret < 0) {
355                                         LM_CRIT("unable to unlink the timer on dlg %p [%u:%u]\n",
356                                                 dlg, dlg->h_entry, dlg->h_id);
357                                 }
358                         }
359                         /* prevent DB sync */
360                         dlg->dflags |= DLG_FLAG_NEW;
361                         unref++;
362                         break;
363
364                 case DLG_DMQ_SYNC:
365                         dmq_send_all_dlgs(0);
366                         break;
367
368                 case DLG_DMQ_NONE:
369                         break;
370         }
371         if (dlg && unref)
372                 dlg_unref(dlg, unref);
373
374         srjson_DestroyDoc(&jdoc);
375         resp->reason = dmq_200_rpl;
376         resp->resp_code = 200;
377         return 0;
378
379 invalid:
380         srjson_DestroyDoc(&jdoc);
381 invalid2:
382         resp->reason = dmq_400_rpl;
383         resp->resp_code = 400;
384         return 0;
385
386 error:
387         srjson_DestroyDoc(&jdoc);
388         resp->reason = dmq_500_rpl;
389         resp->resp_code = 500;
390         return 0;
391 }
392
393
394 int dlg_dmq_request_sync() {
395         srjson_doc_t jdoc;
396
397         LM_DBG("requesting sync from dmq peers\n");
398
399         srjson_InitDoc(&jdoc, NULL);
400
401         jdoc.root = srjson_CreateObject(&jdoc);
402         if(jdoc.root==NULL) {
403                 LM_ERR("cannot create json root\n");
404                 goto error;
405         }
406
407         srjson_AddNumberToObject(&jdoc, jdoc.root, "action", DLG_DMQ_SYNC);
408         jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
409         if(jdoc.buf.s==NULL) {
410                 LM_ERR("unable to serialize data\n");
411                 goto error;
412         }
413         jdoc.buf.len = strlen(jdoc.buf.s);
414         LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
415         if (dlg_dmq_send(&jdoc.buf, 0)!=0) {
416                 goto error;
417         }
418
419         jdoc.free_fn(jdoc.buf.s);
420         jdoc.buf.s = NULL;
421         srjson_DestroyDoc(&jdoc);
422         return 0;
423
424 error:
425         if(jdoc.buf.s!=NULL) {
426                 jdoc.free_fn(jdoc.buf.s);
427                 jdoc.buf.s = NULL;
428         }
429         srjson_DestroyDoc(&jdoc);
430         return -1;
431 }
432
433
434 int dlg_dmq_replicate_action(dlg_dmq_action_t action, dlg_cell_t* dlg,
435                 int needlock, dmq_node_t *node ) {
436
437         srjson_doc_t jdoc, prof_jdoc;
438         dlg_var_t *var;
439
440         LM_DBG("replicating action [%d] on [%u:%u] to dmq peers\n", action,
441                         dlg->h_entry, dlg->h_id);
442
443         if (action == DLG_DMQ_UPDATE) {
444                 if (!node && (dlg->iflags & DLG_IFLAG_DMQ_SYNC)
445                                 && ((dlg->dflags & DLG_FLAG_CHANGED_PROF) == 0)) {
446                         LM_DBG("dlg not changed, no sync\n");
447                         return 1;
448                 }
449         } else if ( (dlg->iflags & DLG_IFLAG_DMQ_SYNC) == 0 ) {
450                 LM_DBG("dlg not synced, no sync\n");
451                 return 1;
452         }
453         if (action == DLG_DMQ_STATE && (dlg->state != DLG_STATE_CONFIRMED
454                                 && dlg->state != DLG_STATE_DELETED
455                                 && dlg->state != DLG_STATE_EARLY)) {
456                 LM_DBG("not syncing state %u\n", dlg->state);
457                 return 1;
458         }
459
460         srjson_InitDoc(&jdoc, NULL);
461
462         jdoc.root = srjson_CreateObject(&jdoc);
463         if(jdoc.root==NULL) {
464                 LM_ERR("cannot create json root\n");
465                 goto error;
466         }
467
468         if (needlock)
469                 dlg_lock(d_table, &(d_table->entries[dlg->h_entry]));
470
471         srjson_AddNumberToObject(&jdoc, jdoc.root, "action", action);
472         srjson_AddNumberToObject(&jdoc, jdoc.root, "h_entry", dlg->h_entry);
473         srjson_AddNumberToObject(&jdoc, jdoc.root, "h_id", dlg->h_id);
474
475         switch(action) {
476                 case DLG_DMQ_UPDATE:
477                         dlg->iflags |= DLG_IFLAG_DMQ_SYNC;
478                         dlg->dflags &= ~DLG_FLAG_CHANGED_PROF;
479                         srjson_AddNumberToObject(&jdoc, jdoc.root, "init_ts",
480                                         dlg->init_ts);
481                         srjson_AddStrToObject(&jdoc, jdoc.root, "callid",
482                                         dlg->callid.s, dlg->callid.len);
483
484                         srjson_AddStrToObject(&jdoc, jdoc.root, "from_uri",
485                                         dlg->from_uri.s, dlg->from_uri.len);
486                         srjson_AddStrToObject(&jdoc, jdoc.root, "to_uri",
487                                         dlg->to_uri.s, dlg->to_uri.len);
488                         srjson_AddStrToObject(&jdoc, jdoc.root, "req_uri",
489                                         dlg->req_uri.s, dlg->req_uri.len);
490                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag1",
491                                         dlg->tag[0].s, dlg->tag[0].len);
492                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq1",
493                                         dlg->cseq[0].s, dlg->cseq[0].len);
494                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set1",
495                                         dlg->route_set[0].s, dlg->route_set[0].len);
496                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact1",
497                                         dlg->contact[0].s, dlg->contact[0].len);
498
499                         if (dlg->vars != NULL) {
500                                 srjson_t *pj = NULL;
501                                 pj = srjson_CreateObject(&jdoc);
502                 for(var=dlg->vars ; var ; var=var->next) {
503                                         srjson_AddStrToObject(&jdoc, pj, var->key.s,
504                                                         var->value.s, var->value.len);
505                 }
506                 srjson_AddItemToObject(&jdoc, jdoc.root, "vars", pj);
507                         }
508
509                         if (dlg->profile_links) {
510                                 srjson_InitDoc(&prof_jdoc, NULL);
511                                 dlg_profiles_to_json(dlg, &prof_jdoc);
512                                 if(prof_jdoc.buf.s!=NULL) {
513                                         LM_DBG("adding profiles: [%.*s]\n",
514                                                         prof_jdoc.buf.len, prof_jdoc.buf.s);
515                                         srjson_AddStrToObject(&jdoc, jdoc.root, "profiles",
516                                                         prof_jdoc.buf.s, prof_jdoc.buf.len);
517                                         prof_jdoc.free_fn(prof_jdoc.buf.s);
518                                         prof_jdoc.buf.s = NULL;
519                                 }
520                                 srjson_DestroyDoc(&prof_jdoc);
521                         }
522                         /* intentional fallthrough */
523
524                 case DLG_DMQ_STATE:
525                         srjson_AddNumberToObject(&jdoc, jdoc.root, "state", dlg->state);
526                         switch (dlg->state) {
527                                 case DLG_STATE_EARLY:
528                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "start_ts",
529                                                         dlg->start_ts);
530                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "lifetime",
531                                                         dlg->lifetime);
532
533                                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag1",
534                                                         dlg->tag[0].s, dlg->tag[0].len);
535                                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq1",
536                                                         dlg->cseq[0].s, dlg->cseq[0].len);
537                                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set1",
538                                                         dlg->route_set[0].s, dlg->route_set[0].len);
539                                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact1",
540                                                         dlg->contact[0].s, dlg->contact[0].len);
541                                         break;
542                                 case DLG_STATE_CONFIRMED:
543                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "start_ts",
544                                                         dlg->start_ts);
545                                         srjson_AddNumberToObject(&jdoc, jdoc.root, "lifetime",
546                                                         dlg->lifetime);
547
548                                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag1",
549                                                         dlg->tag[0].s, dlg->tag[0].len);
550                                         srjson_AddStrToObject(&jdoc, jdoc.root, "tag2",
551                                                         dlg->tag[1].s, dlg->tag[1].len);
552                                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq1",
553                                                         dlg->cseq[0].s, dlg->cseq[0].len);
554                                         srjson_AddStrToObject(&jdoc, jdoc.root, "cseq2",
555                                                         dlg->cseq[1].s, dlg->cseq[1].len);
556                                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set1",
557                                                         dlg->route_set[0].s, dlg->route_set[0].len);
558                                         srjson_AddStrToObject(&jdoc, jdoc.root, "route_set2",
559                                                         dlg->route_set[1].s, dlg->route_set[1].len);
560                                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact1",
561                                                         dlg->contact[0].s, dlg->contact[0].len);
562                                         srjson_AddStrToObject(&jdoc, jdoc.root, "contact2",
563                                                         dlg->contact[1].s, dlg->contact[1].len);
564
565                                         break;
566                                 case DLG_STATE_DELETED:
567                                         //dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
568                                         break;
569                                 default:
570                                         LM_DBG("not syncing state %u\n", dlg->state);
571                         }
572                         break;
573
574                 case DLG_DMQ_RM:
575                         srjson_AddNumberToObject(&jdoc, jdoc.root, "state", dlg->state);
576                         dlg->iflags &= ~DLG_IFLAG_DMQ_SYNC;
577                         break;
578
579                 case DLG_DMQ_NONE:
580                 case DLG_DMQ_SYNC:
581                         break;
582         }
583         if (needlock)
584                 dlg_unlock(d_table, &(d_table->entries[dlg->h_entry]));
585
586         jdoc.buf.s = srjson_PrintUnformatted(&jdoc, jdoc.root);
587         if(jdoc.buf.s==NULL) {
588                 LM_ERR("unable to serialize data\n");
589                 goto error;
590         }
591         jdoc.buf.len = strlen(jdoc.buf.s);
592         LM_DBG("sending serialized data %.*s\n", jdoc.buf.len, jdoc.buf.s);
593         if (dlg_dmq_send(&jdoc.buf, node)!=0) {
594                 goto error;
595         }
596
597         jdoc.free_fn(jdoc.buf.s);
598         jdoc.buf.s = NULL;
599         srjson_DestroyDoc(&jdoc);
600         return 0;
601
602 error:
603         if(jdoc.buf.s!=NULL) {
604                 jdoc.free_fn(jdoc.buf.s);
605                 jdoc.buf.s = NULL;
606         }
607         srjson_DestroyDoc(&jdoc);
608         return -1;
609 }
610
611
612 int dmq_send_all_dlgs(dmq_node_t* dmq_node) {
613         int index;
614         dlg_entry_t entry;
615         dlg_cell_t *dlg;
616
617         LM_DBG("sending all dialogs \n");
618
619         for(index = 0; index< d_table->size; index++){
620                 /* lock the whole entry */
621                 entry = (d_table->entries)[index];
622                 dlg_lock( d_table, &entry);
623
624                 for(dlg = entry.first; dlg != NULL; dlg = dlg->next){
625                                 dlg->dflags |= DLG_FLAG_CHANGED_PROF;
626                                 dlg_dmq_replicate_action(DLG_DMQ_UPDATE, dlg, 0, dmq_node);
627                 }
628
629                 dlg_unlock( d_table, &entry);
630         }
631
632         return 0;
633 }
634
635
636 /**
637 * @brief dmq response callback
638 */
639 int dlg_dmq_resp_callback_f(struct sip_msg* msg, int code,
640                             dmq_node_t* node, void* param)
641 {
642         LM_DBG("dmq response callback triggered [%p %d %p]\n", msg, code, param);
643         return 0;
644 }