2 * presence module- presence server implementation
4 * Copyright (C) 2006 Voice Sistem S.R.L.
6 * This file is part of Kamailio, a free SIP server.
8 * Kamailio is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version
13 * Kamailio is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
25 * \brief Kamailio presence module :: Notification with SIP NOTIFY
33 #include <libxml/parser.h>
35 #include "../../core/trim.h"
36 #include "../../core/ut.h"
37 #include "../../core/globals.h"
38 #include "../../core/str.h"
39 #include "../../lib/srdb1/db.h"
40 #include "../../lib/srdb1/db_val.h"
41 #include "../../core/hashes.h"
42 #include "../../core/socket_info.h"
43 #include "../../modules/tm/tm_load.h"
44 #include "../pua/hash.h"
45 #include "presentity.h"
48 #include "utils_func.h"
49 #include "../../core/receive.h"
51 #define ALLOC_SIZE 3000
52 #define MAX_FORWARD 70
54 int goto_on_notify_reply = -1;
56 extern int pres_local_log_level;
57 extern int pres_local_log_facility;
58 extern subs_t *_pres_subs_last_sub;
59 extern int _pres_subs_mode;
61 c_back_param *shm_dup_cbparam(subs_t *);
62 void free_cbparam(c_back_param *cb_param);
64 void p_tm_callback(struct cell *t, int type, struct tmcb_params *ps);
65 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event);
66 int add_watcher_list(subs_t *s, watcher_t *watchers);
67 str *create_winfo_xml(watcher_t *watchers, char *version, str resource,
68 str event, int STATE_FLAG);
69 void free_watcher_list(watcher_t *watchers);
71 str str_to_user_col = str_init("to_user");
72 str str_username_col = str_init("username");
73 str str_domain_col = str_init("domain");
74 str str_body_col = str_init("body");
75 str str_to_domain_col = str_init("to_domain");
76 str str_from_user_col = str_init("from_user");
77 str str_from_domain_col = str_init("from_domain");
78 str str_watcher_username_col = str_init("watcher_username");
79 str str_watcher_domain_col = str_init("watcher_domain");
80 str str_event_id_col = str_init("event_id");
81 str str_event_col = str_init("event");
82 str str_etag_col = str_init("etag");
83 str str_ruid_col = str_init("ruid");
84 str str_from_tag_col = str_init("from_tag");
85 str str_to_tag_col = str_init("to_tag");
86 str str_callid_col = str_init("callid");
87 str str_local_cseq_col = str_init("local_cseq");
88 str str_remote_cseq_col = str_init("remote_cseq");
89 str str_record_route_col = str_init("record_route");
90 str str_contact_col = str_init("contact");
91 str str_expires_col = str_init("expires");
92 str str_status_col = str_init("status");
93 str str_reason_col = str_init("reason");
94 str str_socket_info_col = str_init("socket_info");
95 str str_local_contact_col = str_init("local_contact");
96 str str_version_col = str_init("version");
97 str str_presentity_uri_col = str_init("presentity_uri");
98 str str_inserted_time_col = str_init("inserted_time");
99 str str_received_time_col = str_init("received_time");
100 str str_id_col = str_init("id");
101 str str_sender_col = str_init("sender");
102 str str_updated_col = str_init("updated");
103 str str_updated_winfo_col = str_init("updated_winfo");
104 str str_priority_col = str_init("priority");
105 str str_flags_col = str_init("flags");
106 str str_user_agent_col = str_init("user_agent");
110 char *get_status_str(int status_flag)
112 switch(status_flag) {
117 case TERMINATED_STATUS:
125 void printf_subs(subs_t *subs)
127 LM_DBG("pres_uri: %.*s\n", subs->pres_uri.len, subs->pres_uri.s);
128 LM_DBG("watcher_user@watcher_domain: %.*s@%.*s\n", subs->watcher_user.len,
129 subs->watcher_user.s, subs->watcher_domain.len,
130 subs->watcher_domain.s);
131 LM_DBG("to_user@to_domain: %.*s@%.*s\n", subs->to_user.len, subs->to_user.s,
132 subs->to_domain.len, subs->to_domain.s);
133 LM_DBG("from_user@from_domain: %.*s@%.*s\n", subs->from_user.len,
134 subs->from_user.s, subs->from_domain.len, subs->from_domain.s);
135 LM_DBG("callid/from_tag/to_tag: %.*s/%.*s/%.*s\n", subs->callid.len,
136 subs->callid.s, subs->from_tag.len, subs->from_tag.s,
137 subs->to_tag.len, subs->to_tag.s);
138 LM_DBG("local_cseq/remote_cseq: %u/%u\n", subs->local_cseq,
140 LM_DBG("local_contact/contact: %.*s/%.*s\n", subs->local_contact.len,
141 subs->local_contact.s, subs->contact.len, subs->contact.s);
142 LM_DBG("record_route: %.*s\n", subs->record_route.len,
143 subs->record_route.s);
144 LM_DBG("sockinfo_str: %.*s\n", subs->sockinfo_str.len,
145 subs->sockinfo_str.s);
147 LM_DBG("event: %.*s\n", subs->event->name.len, subs->event->name.s);
148 LM_DBG("status: %s\n", get_status_str(subs->status));
149 LM_DBG("reason: %.*s\n", subs->reason.len, subs->reason.s);
150 LM_DBG("version: %u\n", subs->version);
151 LM_DBG("expires: %u\n", subs->expires);
153 LM_DBG("updated/updated_winfo: %d/%d\n", subs->updated,
154 subs->updated_winfo);
157 int build_str_hdr(subs_t *subs, int is_body, str *hdr)
159 pres_ev_t *event = subs->event;
160 str expires = {0, 0};
163 str trans = {";transport=", 11};
166 LM_ERR("bad parameter\n");
169 expires.s = int2str(subs->expires, &expires.len);
171 status.s = get_status_str(subs->status);
172 if(status.s == NULL) {
173 LM_ERR("bad status %d\n", subs->status);
176 status.len = strlen(status.s);
179 18 /*Max-Forwards: + val*/ + CRLF_LEN
180 + 7 /*Event: */ + subs->event->name.len
181 + 4 /*;id=*/ + subs->event_id.len + CRLF_LEN
182 + 10 /*Contact: <*/ + subs->local_contact.len
183 + 1 /*>*/ + 15 /*";transport=xxxx"*/ + CRLF_LEN
184 + 20 /*Subscription-State: */ + status.len
185 + 10 /*reason/expires params*/
186 + (subs->reason.len > expires.len ? subs->reason.len : expires.len)
188 + (is_body ? (14 /*Content-Type: */ + subs->event->content_type.len
193 hdr->s = (char *)pkg_malloc(hdr->len);
195 LM_ERR("no more pkg memory\n");
200 p += sprintf(p, "Max-Forwards: %d\r\n", MAX_FORWARD);
202 p += sprintf(p, "Event: %.*s", event->name.len, event->name.s);
203 if(subs->event_id.len && subs->event_id.s) {
204 p += sprintf(p, ";id=%.*s", subs->event_id.len, subs->event_id.s);
206 memcpy(p, CRLF, CRLF_LEN);
209 p += sprintf(p, "Contact: <%.*s", subs->local_contact.len,
210 subs->local_contact.s);
211 if(subs->sockinfo_str.s != NULL
212 && str_search(&subs->local_contact, &trans) == 0) {
214 switch(subs->sockinfo_str.s[0]) {
217 memcpy(p, ";transport=sctp", 15);
222 switch(subs->sockinfo_str.s[1]) {
225 memcpy(p, ";transport=tcp", 14);
230 memcpy(p, ";transport=tls", 14);
239 memcpy(p, CRLF, CRLF_LEN);
242 p += sprintf(p, "Subscription-State: %.*s", status.len, status.s);
244 if(subs->status == TERMINATED_STATUS) {
245 LM_DBG("state = terminated\n");
246 p += sprintf(p, ";reason=%.*s", subs->reason.len, subs->reason.s);
248 p += sprintf(p, ";expires=%.*s", expires.len, expires.s);
250 memcpy(p, CRLF, CRLF_LEN);
254 p += sprintf(p, "Content-Type: %.*s\r\n", event->content_type.len,
255 event->content_type.s);
259 hdr->len = p - hdr->s;
264 int get_wi_subs_db(subs_t *subs, watcher_t *watchers)
267 db_key_t query_cols[3];
268 db_op_t query_ops[3];
269 db_val_t query_vals[3];
270 db_key_t result_cols[5];
271 db1_res_t *result = NULL;
272 db_row_t *row = NULL;
273 db_val_t *row_vals = NULL;
274 int n_result_cols = 0;
275 int n_query_cols = 0;
277 int status_col, watcher_user_col, watcher_domain_col, callid_col;
279 query_cols[n_query_cols] = &str_presentity_uri_col;
280 query_ops[n_query_cols] = OP_EQ;
281 query_vals[n_query_cols].type = DB1_STR;
282 query_vals[n_query_cols].nul = 0;
283 query_vals[n_query_cols].val.str_val = subs->pres_uri;
286 query_cols[n_query_cols] = &str_event_col;
287 query_ops[n_query_cols] = OP_EQ;
288 query_vals[n_query_cols].type = DB1_STR;
289 query_vals[n_query_cols].nul = 0;
290 query_vals[n_query_cols].val.str_val = subs->event->wipeer->name;
293 query_cols[n_query_cols] = &str_expires_col;
294 query_ops[n_query_cols] = OP_GT;
295 query_vals[n_query_cols].type = DB1_INT;
296 query_vals[n_query_cols].nul = 0;
297 query_vals[n_query_cols].val.int_val = (int)time(NULL) + pres_expires_offset;
300 result_cols[status_col = n_result_cols++] = &str_status_col;
301 result_cols[watcher_user_col = n_result_cols++] = &str_watcher_username_col;
302 result_cols[watcher_domain_col = n_result_cols++] = &str_watcher_domain_col;
303 result_cols[callid_col = n_result_cols++] = &str_callid_col;
305 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
306 LM_ERR("in use_table\n");
310 if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
311 n_query_cols, n_result_cols, 0, &result)
313 LM_ERR("querying active_watchers db table\n");
322 LM_DBG("The query in db table for active subscription"
323 " returned no result\n");
324 pa_dbf.free_result(pa_db, result);
328 for(i = 0; i < result->n; i++) {
329 row = &result->rows[i];
330 row_vals = ROW_VALUES(row);
332 sb.watcher_user.s = (char *)row_vals[watcher_user_col].val.string_val;
333 sb.watcher_user.len = strlen(sb.watcher_user.s);
335 sb.watcher_domain.s =
336 (char *)row_vals[watcher_domain_col].val.string_val;
337 sb.watcher_domain.len = strlen(sb.watcher_domain.s);
339 sb.callid.s = (char *)row_vals[callid_col].val.string_val;
340 sb.callid.len = strlen(sb.callid.s);
342 sb.event = subs->event->wipeer;
343 sb.status = row_vals[status_col].val.int_val;
345 if(add_watcher_list(&sb, watchers) < 0)
349 pa_dbf.free_result(pa_db, result);
354 pa_dbf.free_result(pa_db, result);
358 str *get_wi_notify_body(subs_t *subs, subs_t *watcher_subs)
360 str *notify_body = NULL;
362 watcher_t *watchers = NULL;
364 unsigned int hash_code;
366 int state = FULL_STATE_FLAG;
367 unsigned int now = (int)time(NULL);
370 version_str = int2str(subs->version, &len);
371 if(version_str == NULL) {
372 LM_ERR("converting int to str\n ");
376 watchers = (watcher_t *)pkg_malloc(sizeof(watcher_t));
377 if(watchers == NULL) {
378 ERR_MEM(PKG_MEM_STR);
380 memset(watchers, 0, sizeof(watcher_t));
382 if(watcher_subs != NULL) {
383 if(add_watcher_list(watcher_subs, watchers) < 0)
385 state = PARTIAL_STATE_FLAG;
390 if(pres_subs_dbmode == DB_ONLY) {
391 if(get_wi_subs_db(subs, watchers) < 0) {
392 LM_ERR("getting watchers from database\n");
396 hash_code = core_case_hash(
397 &subs->pres_uri, &subs->event->wipeer->name, shtable_size);
398 lock_get(&subs_htable[hash_code].lock);
399 s = subs_htable[hash_code].entries;
403 if(s->expires < now) {
404 LM_DBG("expired record\n");
408 if(s->event == subs->event->wipeer
409 && s->pres_uri.len == subs->pres_uri.len
410 && presence_sip_uri_match(&s->pres_uri, &subs->pres_uri)
412 if(add_watcher_list(s, watchers) < 0) {
413 lock_release(&subs_htable[hash_code].lock);
418 lock_release(&subs_htable[hash_code].lock);
420 if(add_waiting_watchers(
421 watchers, subs->pres_uri, subs->event->wipeer->name)
423 LM_ERR("failed to add waiting watchers\n");
429 notify_body = create_winfo_xml(watchers, version_str, subs->pres_uri,
430 subs->event->wipeer->name, state);
431 if(notify_body == NULL) {
432 LM_ERR("in function create_winfo_xml\n");
435 free_watcher_list(watchers);
439 free_watcher_list(watchers);
443 void free_watcher_list(watcher_t *watchers)
452 watchers = watchers->next;
459 int add_watcher_list(subs_t *s, watcher_t *watchers)
463 w = (watcher_t *)pkg_malloc(sizeof(watcher_t));
465 LM_ERR("No more private memory\n");
468 w->status = s->status;
469 if(uandd_to_uri(s->watcher_user, s->watcher_domain, &w->uri) < 0) {
470 LM_ERR("failed to create uri\n");
473 w->id.s = (char *)pkg_malloc(s->callid.len + 1);
474 if(w->id.s == NULL) {
475 LM_ERR("no more memory\n");
478 memcpy(w->id.s, s->callid.s, s->callid.len);
479 w->id.len = s->callid.len;
480 w->id.s[w->id.len] = '\0';
482 w->next = watchers->next;
496 str *build_empty_bla_body(str pres_uri)
506 doc = xmlNewDoc(BAD_CAST "1.0");
508 LM_ERR("failed to construct xml document\n");
512 node = xmlNewNode(NULL, BAD_CAST "dialog-info");
514 LM_ERR("failed to initialize node\n");
517 xmlDocSetRootElement(doc, node);
519 attr = xmlNewProp(node, BAD_CAST "xmlns",
520 BAD_CAST "urn:ietf:params:xml:ns:dialog-info");
522 LM_ERR("failed to initialize node attribute\n");
525 attr = xmlNewProp(node, BAD_CAST "version", BAD_CAST "1");
527 LM_ERR("failed to initialize node attribute\n");
531 attr = xmlNewProp(node, BAD_CAST "state", BAD_CAST "full");
533 LM_ERR("failed to initialize node attribute\n");
537 entity = (char *)pkg_malloc(pres_uri.len + 1);
539 LM_ERR("no more memory\n");
542 memcpy(entity, pres_uri.s, pres_uri.len);
543 entity[pres_uri.len] = '\0';
545 attr = xmlNewProp(node, BAD_CAST "entity", BAD_CAST entity);
547 LM_ERR("failed to initialize node attribute\n");
552 body = (str *)pkg_malloc(sizeof(str));
554 LM_ERR("no more private memory");
559 xmlDocDumpFormatMemory(doc, (xmlChar **)(void *)&text, &len, 1);
560 body->s = (char *)pkg_malloc(len);
561 if(body->s == NULL) {
562 LM_ERR("no more private memory");
567 memcpy(body->s, text, len);
582 str *get_p_notify_body(str pres_uri, pres_ev_t *event, str *etag, str *contact)
584 db_key_t query_cols[4];
585 db_val_t query_vals[4];
586 db_op_t query_ops[4];
587 db_key_t result_cols[3];
588 db1_res_t *result = NULL;
589 int body_col, etag_col = 0, sender_col;
590 str **body_array = NULL;
591 str *notify_body = NULL;
592 db_row_t *row = NULL;
594 int n_result_cols = 0;
595 int n_query_cols = 0;
597 int build_off_n = -1;
602 unsigned int hash_code;
604 static str query_str;
606 if(parse_uri(pres_uri.s, pres_uri.len, &uri) < 0) {
607 LM_ERR("while parsing uri\n");
611 /* if in db_only mode, get the presentity information from database - skip htable search */
612 if(publ_cache_mode == PS_PCACHE_HYBRID) {
613 /* search in hash table if any record exists */
614 hash_code = core_case_hash(&pres_uri, NULL, phtable_size);
615 if(search_phtable(&pres_uri, event->evp->type, hash_code) == NULL) {
616 LM_DBG("No record exists in hash_table\n");
618 /* for pidf manipulation */
619 if(event->agg_nbody) {
621 event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
629 query_cols[n_query_cols] = &str_domain_col;
630 query_vals[n_query_cols].type = DB1_STR;
631 query_vals[n_query_cols].nul = 0;
632 query_vals[n_query_cols].val.str_val = uri.host;
633 query_ops[n_query_cols] = OP_EQ;
636 query_cols[n_query_cols] = &str_username_col;
637 query_vals[n_query_cols].type = DB1_STR;
638 query_vals[n_query_cols].nul = 0;
639 query_vals[n_query_cols].val.str_val = uri.user;
640 query_ops[n_query_cols] = OP_EQ;
643 query_cols[n_query_cols] = &str_event_col;
644 query_vals[n_query_cols].type = DB1_STR;
645 query_vals[n_query_cols].nul = 0;
646 query_vals[n_query_cols].val.str_val = event->name;
647 query_ops[n_query_cols] = OP_EQ;
650 if(pres_startup_mode == 1) {
651 query_cols[n_query_cols] = &str_expires_col;
652 query_vals[n_query_cols].type = DB1_INT;
653 query_vals[n_query_cols].nul = 0;
654 query_vals[n_query_cols].val.int_val = (int)time(NULL);
655 query_ops[n_query_cols] = OP_GT;
659 result_cols[body_col = n_result_cols++] = &str_body_col;
660 result_cols[etag_col = n_result_cols++] = &str_etag_col;
661 result_cols[sender_col = n_result_cols++] = &str_sender_col;
663 if(pa_dbf.use_table(pa_db, &presentity_table) < 0) {
664 LM_ERR("in use_table\n");
668 if(pres_retrieve_order == 1) {
669 query_str = pres_retrieve_order_by;
671 query_str = str_received_time_col;
673 if(pres_startup_mode == 1) {
674 if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
675 n_query_cols, n_result_cols, &query_str, &result)
677 LM_ERR("failed to query %.*s table\n", presentity_table.len,
680 pa_dbf.free_result(pa_db, result);
684 if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols,
685 n_query_cols, n_result_cols, &query_str, &result)
687 LM_ERR("failed to query %.*s table\n", presentity_table.len,
690 pa_dbf.free_result(pa_db, result);
699 LM_DBG("The query returned no result\n[username]= %.*s"
700 "\t[domain]= %.*s\t[event]= %.*s\n",
701 uri.user.len, uri.user.s, uri.host.len, uri.host.s,
702 event->name.len, event->name.s);
704 pa_dbf.free_result(pa_db, result);
707 if(event->agg_nbody) {
708 notify_body = event->agg_nbody(&uri.user, &uri.host, NULL, 0, -1);
715 if(event->agg_nbody == NULL) {
716 LM_DBG("Event does not require aggregation\n");
717 row = &result->rows[n - 1];
718 row_vals = ROW_VALUES(row);
720 /* if event BLA - check if sender is the same as contact */
721 /* if so, send an empty dialog info document */
722 if(EVENT_DIALOG_SLA(event->evp) && contact) {
723 sender.s = (char *)row_vals[sender_col].val.string_val;
724 if(sender.s == NULL || strlen(sender.s) == 0)
725 goto after_sender_check;
726 sender.len = strlen(sender.s);
728 if(sender.len == contact->len
729 && presence_sip_uri_match(&sender, contact) == 0) {
730 notify_body = build_empty_bla_body(pres_uri);
731 pa_dbf.free_result(pa_db, result);
737 if(row_vals[body_col].val.string_val == NULL) {
738 LM_ERR("NULL notify body record\n");
741 len = strlen(row_vals[body_col].val.string_val);
743 LM_ERR("Empty notify body record\n");
746 notify_body = (str *)pkg_malloc(sizeof(str));
747 if(notify_body == NULL) {
748 ERR_MEM(PKG_MEM_STR);
750 memset(notify_body, 0, sizeof(str));
751 notify_body->s = (char *)pkg_malloc(len * sizeof(char));
752 if(notify_body->s == NULL) {
753 pkg_free(notify_body);
754 ERR_MEM(PKG_MEM_STR);
756 memcpy(notify_body->s, row_vals[body_col].val.string_val, len);
757 notify_body->len = len;
758 pa_dbf.free_result(pa_db, result);
763 LM_DBG("Event requires aggregation\n");
765 body_array = (str **)pkg_malloc((n + 2) * sizeof(str *));
766 if(body_array == NULL) {
767 ERR_MEM(PKG_MEM_STR);
769 memset(body_array, 0, (n + 2) * sizeof(str *));
772 LM_DBG("searched etag = %.*s len= %d\n", etag->len, etag->s,
774 LM_DBG("etag not NULL\n");
775 for(i = 0; i < n; i++) {
776 row = &result->rows[i];
777 row_vals = ROW_VALUES(row);
778 etags.s = (char *)row_vals[etag_col].val.string_val;
779 etags.len = strlen(etags.s);
781 LM_DBG("etag = %.*s len= %d\n", etags.len, etags.s, etags.len);
782 if((etags.len == etag->len)
783 && (strncmp(etags.s, etag->s, etags.len) == 0)) {
784 LM_DBG("found etag\n");
787 len = strlen((char *)row_vals[body_col].val.string_val);
789 LM_ERR("Empty notify body record\n");
793 size = sizeof(str) + len * sizeof(char);
794 body = (str *)pkg_malloc(size);
796 ERR_MEM(PKG_MEM_STR);
798 memset(body, 0, size);
800 body->s = (char *)body + size;
801 memcpy(body->s, (char *)row_vals[body_col].val.string_val, len);
804 body_array[i] = body;
807 for(i = 0; i < n; i++) {
808 row = &result->rows[i];
809 row_vals = ROW_VALUES(row);
811 len = strlen((char *)row_vals[body_col].val.string_val);
813 LM_ERR("Empty notify body record\n");
817 size = sizeof(str) + len * sizeof(char);
818 body = (str *)pkg_malloc(size);
820 ERR_MEM(PKG_MEM_STR);
822 memset(body, 0, size);
824 body->s = (char *)body + size;
825 memcpy(body->s, row_vals[body_col].val.string_val, len);
828 body_array[i] = body;
831 pa_dbf.free_result(pa_db, result);
834 notify_body = event->agg_nbody(
835 &uri.user, &uri.host, body_array, n, build_off_n);
839 if(body_array != NULL) {
840 for(i = 0; i < n; i++) {
842 pkg_free(body_array[i]);
844 pkg_free(body_array);
850 pa_dbf.free_result(pa_db, result);
852 if(body_array != NULL) {
853 for(i = 0; i < n; i++) {
855 pkg_free(body_array[i]);
860 pkg_free(body_array);
865 void free_notify_body(str *body, pres_ev_t *ev)
868 if(body->s != NULL) {
869 if(ev->type & WINFO_TYPE)
871 else if(ev->agg_nbody == NULL && ev->apply_auth_nbody == NULL)
874 ev->free_body(body->s);
880 static int ps_free_tm_dlg(dlg_t *td)
884 pkg_free(td->loc_uri.s);
886 pkg_free(td->rem_uri.s);
889 free_rr(&td->route_set);
895 dlg_t *ps_build_dlg_t(subs_t *subs)
898 int found_contact = 1;
900 td = (dlg_t *)pkg_malloc(sizeof(dlg_t));
902 ERR_MEM(PKG_MEM_STR);
904 memset(td, 0, sizeof(dlg_t));
906 td->loc_seq.value = subs->local_cseq;
907 td->loc_seq.is_set = 1;
909 td->id.call_id = subs->callid;
910 td->id.rem_tag = subs->from_tag;
911 td->id.loc_tag = subs->to_tag;
913 uandd_to_uri(subs->to_user, subs->to_domain, &td->loc_uri);
914 if(td->loc_uri.s == NULL) {
915 LM_ERR("while creating uri\n");
919 if(subs->contact.len == 0 || subs->contact.s == NULL) {
922 LM_DBG("CONTACT = %.*s\n", subs->contact.len, subs->contact.s);
923 td->rem_target = subs->contact;
926 uandd_to_uri(subs->from_user, subs->from_domain, &td->rem_uri);
927 if(td->rem_uri.s == NULL) {
928 LM_ERR("while creating uri\n");
932 if(found_contact == 0) {
933 td->rem_target = td->rem_uri;
935 if(subs->record_route.s && subs->record_route.len) {
937 subs->record_route.s, subs->record_route.len, &td->route_set)
939 LM_ERR("in function parse_rr_body\n");
943 td->state = DLG_CONFIRMED;
945 if(subs->sockinfo_str.len) {
949 if((tmp = as_asciiz(&subs->sockinfo_str)) == NULL) {
950 LM_ERR("no pkg memory left\n");
953 if(parse_phostport(tmp, &host.s, &host.len, &port, &proto)) {
954 LM_ERR("bad sockinfo string\n");
958 td->send_sock = grep_sock_info(
959 &host, (unsigned short)port, (unsigned short)proto);
971 str *pres_uri, pres_ev_t *event, str *sender, subs_t **s_array, int *n)
973 db_key_t query_cols[7];
974 db_op_t query_ops[7];
975 db_val_t query_vals[7];
976 db_key_t result_cols[21];
977 int n_result_cols = 0, n_query_cols = 0;
980 db1_res_t *result = NULL;
981 int from_user_col, from_domain_col, from_tag_col;
982 int to_user_col, to_domain_col, to_tag_col;
983 int expires_col = 0, callid_col, cseq_col, i, reason_col;
984 int version_col = 0, record_route_col = 0, contact_col = 0;
985 int sockinfo_col = 0, local_contact_col = 0, event_id_col = 0;
986 int watcher_user_col = 0, watcher_domain_col = 0;
987 int flags_col = 0, user_agent_col = 0;
991 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
992 LM_ERR("in use_table\n");
996 LM_DBG("querying database table = active_watchers\n");
997 query_cols[n_query_cols] = &str_presentity_uri_col;
998 query_ops[n_query_cols] = OP_EQ;
999 query_vals[n_query_cols].type = DB1_STR;
1000 query_vals[n_query_cols].nul = 0;
1001 query_vals[n_query_cols].val.str_val = *pres_uri;
1004 query_cols[n_query_cols] = &str_event_col;
1005 query_ops[n_query_cols] = OP_EQ;
1006 query_vals[n_query_cols].type = DB1_STR;
1007 query_vals[n_query_cols].nul = 0;
1008 query_vals[n_query_cols].val.str_val = event->name;
1011 query_cols[n_query_cols] = &str_status_col;
1012 query_ops[n_query_cols] = OP_EQ;
1013 query_vals[n_query_cols].type = DB1_INT;
1014 query_vals[n_query_cols].nul = 0;
1015 query_vals[n_query_cols].val.int_val = ACTIVE_STATUS;
1018 query_cols[n_query_cols] = &str_contact_col;
1019 query_ops[n_query_cols] = OP_NEQ;
1020 query_vals[n_query_cols].type = DB1_STR;
1021 query_vals[n_query_cols].nul = 0;
1023 LM_DBG("Do not send Notify to:[uri]= %.*s\n", sender->len, sender->s);
1024 query_vals[n_query_cols].val.str_val = *sender;
1026 query_vals[n_query_cols].val.str_val.s = "";
1027 query_vals[n_query_cols].val.str_val.len = 0;
1031 result_cols[to_user_col = n_result_cols++] = &str_to_user_col;
1032 result_cols[to_domain_col = n_result_cols++] = &str_to_domain_col;
1033 result_cols[from_user_col = n_result_cols++] = &str_from_user_col;
1034 result_cols[from_domain_col = n_result_cols++] = &str_from_domain_col;
1035 result_cols[watcher_user_col = n_result_cols++] = &str_watcher_username_col;
1036 result_cols[watcher_domain_col = n_result_cols++] = &str_watcher_domain_col;
1037 result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
1038 result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
1039 result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
1040 result_cols[callid_col = n_result_cols++] = &str_callid_col;
1041 result_cols[cseq_col = n_result_cols++] = &str_local_cseq_col;
1042 result_cols[record_route_col = n_result_cols++] = &str_record_route_col;
1043 result_cols[contact_col = n_result_cols++] = &str_contact_col;
1044 result_cols[expires_col = n_result_cols++] = &str_expires_col;
1045 result_cols[reason_col = n_result_cols++] = &str_reason_col;
1046 result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
1047 result_cols[local_contact_col = n_result_cols++] = &str_local_contact_col;
1048 result_cols[version_col = n_result_cols++] = &str_version_col;
1049 result_cols[flags_col = n_result_cols++] = &str_flags_col;
1050 result_cols[user_agent_col = n_result_cols++] = &str_user_agent_col;
1052 if(pa_dbf.query(pa_db, query_cols, query_ops, query_vals, result_cols,
1053 n_query_cols, n_result_cols, 0, &result)
1055 LM_ERR("while querying database\n");
1057 pa_dbf.free_result(pa_db, result);
1065 if(result->n <= 0) {
1066 LM_DBG("The query for subscribtion for [uri]= %.*s for [event]= %.*s"
1067 " returned no result\n",
1068 pres_uri->len, pres_uri->s, event->name.len, event->name.s);
1069 pa_dbf.free_result(pa_db, result);
1072 LM_DBG("found %d dialogs\n", result->n);
1074 for(i = 0; i < result->n; i++) {
1075 row = &result->rows[i];
1076 row_vals = ROW_VALUES(row);
1078 if(row_vals[reason_col].val.string_val) {
1079 if(strlen(row_vals[reason_col].val.string_val) != 0)
1083 // s.reason.len= strlen(s.reason.s);
1085 memset(&s, 0, sizeof(subs_t));
1086 s.status = ACTIVE_STATUS;
1088 s.pres_uri = *pres_uri;
1089 s.to_user.s = (char *)row_vals[to_user_col].val.string_val;
1090 s.to_user.len = strlen(s.to_user.s);
1092 s.to_domain.s = (char *)row_vals[to_domain_col].val.string_val;
1093 s.to_domain.len = strlen(s.to_domain.s);
1095 s.from_user.s = (char *)row_vals[from_user_col].val.string_val;
1096 s.from_user.len = strlen(s.from_user.s);
1098 s.from_domain.s = (char *)row_vals[from_domain_col].val.string_val;
1099 s.from_domain.len = strlen(s.from_domain.s);
1101 s.watcher_user.s = (char *)row_vals[watcher_user_col].val.string_val;
1102 s.watcher_user.len = strlen(s.watcher_user.s);
1104 s.watcher_domain.s =
1105 (char *)row_vals[watcher_domain_col].val.string_val;
1106 s.watcher_domain.len = strlen(s.watcher_domain.s);
1108 s.event_id.s = (char *)row_vals[event_id_col].val.string_val;
1109 s.event_id.len = (s.event_id.s) ? strlen(s.event_id.s) : 0;
1111 s.to_tag.s = (char *)row_vals[to_tag_col].val.string_val;
1112 s.to_tag.len = strlen(s.to_tag.s);
1114 s.from_tag.s = (char *)row_vals[from_tag_col].val.string_val;
1115 s.from_tag.len = strlen(s.from_tag.s);
1117 s.callid.s = (char *)row_vals[callid_col].val.string_val;
1118 s.callid.len = strlen(s.callid.s);
1120 s.record_route.s = (char *)row_vals[record_route_col].val.string_val;
1121 s.record_route.len = (s.record_route.s) ? strlen(s.record_route.s) : 0;
1123 s.contact.s = (char *)row_vals[contact_col].val.string_val;
1124 s.contact.len = strlen(s.contact.s);
1126 s.sockinfo_str.s = (char *)row_vals[sockinfo_col].val.string_val;
1127 s.sockinfo_str.len = s.sockinfo_str.s ? strlen(s.sockinfo_str.s) : 0;
1129 s.local_contact.s = (char *)row_vals[local_contact_col].val.string_val;
1130 s.local_contact.len = s.local_contact.s ? strlen(s.local_contact.s) : 0;
1133 s.local_cseq = row_vals[cseq_col].val.int_val + 1;
1134 if(row_vals[expires_col].val.int_val < (int)time(NULL) + pres_expires_offset)
1137 s.expires = row_vals[expires_col].val.int_val - (int)time(NULL);
1138 s.version = row_vals[version_col].val.int_val + 1;
1139 s.flags = row_vals[flags_col].val.int_val;
1140 s.user_agent.s = (char *)row_vals[user_agent_col].val.string_val;
1141 s.user_agent.len = (s.user_agent.s) ? strlen(s.user_agent.s) : 0;
1143 s_new = mem_copy_subs(&s, PKG_MEM_TYPE);
1145 LM_ERR("while copying subs_t structure\n");
1148 s_new->next = (*s_array);
1153 pa_dbf.free_result(pa_db, result);
1160 pa_dbf.free_result(pa_db, result);
1165 subs_t *get_subs_dialog(str *pres_uri, pres_ev_t *event, str *sender)
1167 unsigned int hash_code;
1168 subs_t *s = NULL, *s_new;
1169 subs_t *s_array = NULL;
1172 /* if pres_subs_dbmode!=DB_ONLY, should take the subscriptions from the
1173 hashtable only in DB_ONLY mode should take all dialogs from db */
1175 if(pres_subs_dbmode == DB_ONLY) {
1176 if(get_subs_db(pres_uri, event, sender, &s_array, &n) < 0) {
1177 LM_ERR("getting dialogs from database\n");
1181 hash_code = core_case_hash(pres_uri, &event->name, shtable_size);
1183 lock_get(&subs_htable[hash_code].lock);
1185 s = subs_htable[hash_code].entries;
1192 if(s->expires < (int)time(NULL)) {
1193 LM_DBG("expired subs\n");
1197 if((!(s->status == ACTIVE_STATUS && s->reason.len == 0
1198 && s->event == event && s->pres_uri.len == pres_uri->len
1199 && presence_sip_uri_match(&s->pres_uri, pres_uri) == 0))
1200 || (sender && sender->len == s->contact.len
1201 && presence_sip_uri_match(sender, &s->contact)
1205 s_new = mem_copy_subs(s, PKG_MEM_TYPE);
1207 LM_ERR("copying subs_t structure\n");
1208 lock_release(&subs_htable[hash_code].lock);
1211 s_new->expires -= (int)time(NULL);
1212 s_new->next = s_array;
1215 lock_release(&subs_htable[hash_code].lock);
1221 free_subs_list(s_array, PKG_MEM_TYPE, 0);
1225 int publ_notify(presentity_t *p, str pres_uri, str *body, str *offline_etag,
1228 str *notify_body = NULL;
1229 subs_t *subs_array = NULL, *s = NULL;
1232 subs_array = get_subs_dialog(&pres_uri, p->event, p->sender);
1233 if(subs_array == NULL) {
1234 LM_DBG("Could not find subs_dialog\n");
1239 /* if the event does not require aggregation - we have the final body */
1240 if(p->event->agg_nbody) {
1241 notify_body = get_p_notify_body(pres_uri, p->event, offline_etag, NULL);
1242 if(notify_body == NULL) {
1243 LM_DBG("Could not get the notify_body\n");
1250 s->auth_rules_doc = rules_doc;
1252 if(notify(s, NULL, notify_body ? notify_body : body, 0,
1253 p->event->aux_body_processing)
1255 LM_ERR("Could not send notify for %.*s\n", p->event->name.len,
1264 free_subs_list(subs_array, PKG_MEM_TYPE, 0);
1265 free_notify_body(notify_body, p->event);
1269 int publ_notify_notifier(str pres_uri, pres_ev_t *event)
1271 db_key_t query_cols[2], result_cols[3];
1272 db_val_t query_vals[2], *values;
1274 db1_res_t *result = NULL;
1275 int n_query_cols = 0, n_result_cols = 0;
1276 int r_callid_col = 0, r_to_tag_col = 0, r_from_tag_col = 0;
1280 db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
1283 LM_ERR("null database connection\n");
1287 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
1288 LM_ERR("use table failed\n");
1292 query_cols[n_query_cols] = &str_presentity_uri_col;
1293 query_vals[n_query_cols].type = DB1_STR;
1294 query_vals[n_query_cols].nul = 0;
1295 query_vals[n_query_cols].val.str_val = pres_uri;
1298 query_cols[n_query_cols] = &str_event_col;
1299 query_vals[n_query_cols].type = DB1_STR;
1300 query_vals[n_query_cols].nul = 0;
1301 query_vals[n_query_cols].val.str_val = event->name;
1304 result_cols[r_callid_col = n_result_cols++] = &str_callid_col;
1305 result_cols[r_to_tag_col = n_result_cols++] = &str_to_tag_col;
1306 result_cols[r_from_tag_col = n_result_cols++] = &str_from_tag_col;
1308 if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
1309 n_result_cols, 0, &result)
1311 LM_ERR("Can't query db\n");
1315 if(result == NULL) {
1316 LM_ERR("bad result\n");
1320 rows = RES_ROWS(result);
1321 for(i = 0; i < RES_ROW_N(result); i++) {
1322 values = ROW_VALUES(&rows[i]);
1324 subs.callid.s = (char *)VAL_STRING(&values[r_callid_col]);
1325 subs.callid.len = strlen(subs.callid.s);
1326 subs.to_tag.s = (char *)VAL_STRING(&values[r_to_tag_col]);
1327 subs.to_tag.len = strlen(subs.to_tag.s);
1328 subs.from_tag.s = (char *)VAL_STRING(&values[r_from_tag_col]);
1329 subs.from_tag.len = strlen(subs.from_tag.s);
1334 ret = RES_ROW_N(result);
1338 pa_dbf.free_result(pa_db, result);
1343 int query_db_notify(str *pres_uri, pres_ev_t *event, subs_t *watcher_subs)
1345 subs_t *subs_array = NULL, *s = NULL;
1346 str *notify_body = NULL, *aux_body = NULL;
1349 subs_array = get_subs_dialog(pres_uri, event, NULL);
1350 if(subs_array == NULL) {
1351 LM_DBG("Could not get subscription dialog\n");
1358 if(pres_notifier_processes > 0) {
1364 if(event->type & PUBL_TYPE)
1365 notify_body = get_p_notify_body(*pres_uri, event, NULL, NULL);
1369 if(event->aux_body_processing) {
1370 aux_body = event->aux_body_processing(s, notify_body);
1373 if(notify(s, watcher_subs, aux_body ? aux_body : notify_body, 0, 0)
1375 LM_ERR("Could not send notify for [event]=%.*s\n",
1376 event->name.len, event->name.s);
1380 if(aux_body != NULL) {
1382 event->aux_free_body(aux_body->s);
1393 free_subs_list(subs_array, PKG_MEM_TYPE, 0);
1394 free_notify_body(notify_body, event);
1399 int send_notify_request(
1400 subs_t *subs, subs_t *watcher_subs, str *n_body, int force_null_body)
1403 str met = {"NOTIFY", 6};
1404 str str_hdr = {0, 0};
1405 str *notify_body = NULL;
1407 subs_t *cb_param = NULL;
1408 str *final_body = NULL;
1410 str *aux_body = NULL;
1411 subs_t *backup_subs = NULL;
1413 LM_DBG("dialog info:\n");
1416 /* getting the status of the subscription */
1418 if(force_null_body) {
1419 goto jump_over_body;
1422 if(n_body != NULL && subs->status == ACTIVE_STATUS) {
1423 if(subs->event->req_auth) {
1425 if(subs->auth_rules_doc && subs->event->apply_auth_nbody) {
1426 if(subs->event->apply_auth_nbody(n_body, subs, ¬ify_body)
1428 LM_ERR("in function apply_auth_nbody\n");
1432 if(notify_body == NULL)
1433 notify_body = n_body;
1435 notify_body = n_body;
1437 if(subs->status == TERMINATED_STATUS
1438 || subs->status == PENDING_STATUS) {
1439 LM_DBG("state terminated or pending- notify body NULL\n");
1442 if(subs->event->type & WINFO_TYPE) {
1443 notify_body = get_wi_notify_body(subs, watcher_subs);
1444 if(notify_body == NULL) {
1445 LM_DBG("Could not get notify_body\n");
1449 notify_body = get_p_notify_body(subs->pres_uri, subs->event,
1450 NULL, (subs->contact.s) ? &subs->contact : NULL);
1451 if(notify_body == NULL || notify_body->s == NULL) {
1452 LM_DBG("Could not get the notify_body\n");
1454 /* call aux_body_processing if exists */
1455 if(subs->event->aux_body_processing) {
1456 aux_body = subs->event->aux_body_processing(
1459 free_notify_body(notify_body, subs->event);
1460 notify_body = aux_body;
1464 /* apply authorization rules if exists */
1465 if(subs->event->req_auth) {
1466 if(subs->auth_rules_doc && subs->event->apply_auth_nbody
1467 && subs->event->apply_auth_nbody(
1468 notify_body, subs, &final_body)
1470 LM_ERR("in function apply_auth\n");
1474 xmlFree(notify_body->s);
1475 pkg_free(notify_body);
1476 notify_body = final_body;
1486 if(subs->expires <= 0) {
1488 subs->status = TERMINATED_STATUS;
1489 subs->reason.s = "timeout";
1490 subs->reason.len = 7;
1493 /* build extra headers */
1494 if(build_str_hdr(subs, notify_body ? 1 : 0, &str_hdr) < 0) {
1495 LM_ERR("while building headers\n");
1498 LM_DBG("headers:\n%.*s\n", str_hdr.len, str_hdr.s);
1500 /* construct the dlg_t structure */
1501 td = ps_build_dlg_t(subs);
1503 LM_ERR("while building dlg_t structure\n");
1507 LM_DBG("expires %d status %d\n", subs->expires, subs->status);
1508 cb_param = mem_copy_subs(subs, SHM_MEM_TYPE);
1510 if(_pres_subs_mode==1) {
1511 backup_subs = _pres_subs_last_sub;
1512 _pres_subs_last_sub = subs;
1515 set_uac_req(&uac_r, &met, &str_hdr, notify_body, td, TMCB_LOCAL_COMPLETED,
1516 p_tm_callback, (void *)cb_param);
1517 result = tmb.t_request_within(&uac_r);
1518 if(_pres_subs_mode==1) {
1519 _pres_subs_last_sub = backup_subs;
1522 LM_ERR("in function tmb.t_request_within\n");
1528 LM_GEN2(pres_local_log_facility, pres_local_log_level,
1529 "NOTIFY %.*s via %.*s on behalf of %.*s for event %.*s : %.*s\n",
1530 td->rem_uri.len, td->rem_uri.s, td->hooks.next_hop->len,
1531 td->hooks.next_hop->s, td->loc_uri.len, td->loc_uri.s,
1532 subs->event->name.len, subs->event->name.s, subs->callid.len,
1538 pkg_free(str_hdr.s);
1540 if((int)(long)n_body != (int)(long)notify_body)
1541 free_notify_body(notify_body, subs->event);
1547 if(str_hdr.s != NULL)
1548 pkg_free(str_hdr.s);
1549 if((int)(long)n_body != (int)(long)notify_body) {
1550 if(notify_body != NULL) {
1551 if(notify_body->s != NULL) {
1552 if(subs->event->type & WINFO_TYPE)
1553 xmlFree(notify_body->s);
1554 else if(subs->event->apply_auth_nbody == NULL
1555 && subs->event->agg_nbody == NULL)
1556 pkg_free(notify_body->s);
1558 subs->event->free_body(notify_body->s);
1560 pkg_free(notify_body);
1567 int notify(subs_t *subs, subs_t *watcher_subs, str *n_body, int force_null_body,
1568 aux_body_processing_t *aux_body_processing)
1571 str *aux_body = NULL;
1573 /* update first in hash table and the send Notify */
1574 if(subs->expires != 0 && subs->status != TERMINATED_STATUS) {
1575 unsigned int hash_code;
1576 hash_code = core_case_hash(
1577 &subs->pres_uri, &subs->event->name, shtable_size);
1579 /* if subscriptions are held also in memory, update the subscription hashtable */
1580 if(pres_subs_dbmode != DB_ONLY) {
1581 if(update_shtable(subs_htable, hash_code, subs, LOCAL_TYPE) < 0) {
1582 /* subscriptions are held only in memory, and hashtable update failed */
1583 LM_ERR("updating subscription record in hash table\n");
1587 /* if DB_ONLY mode or WRITE_THROUGH update in database */
1588 if(subs->recv_event != PRES_SUBSCRIBE_RECV
1589 && ((pres_subs_dbmode == DB_ONLY && pres_notifier_processes == 0)
1590 || pres_subs_dbmode == WRITE_THROUGH)) {
1591 LM_DBG("updating subscription to database\n");
1592 if(update_subs_db(subs, LOCAL_TYPE) < 0) {
1593 LM_ERR("updating subscription in database\n");
1599 if(subs->reason.s && subs->status == ACTIVE_STATUS && subs->reason.len == 12
1600 && strncmp(subs->reason.s, "polite-block", 12) == 0) {
1601 force_null_body = 1;
1604 if(!force_null_body && aux_body_processing) {
1605 aux_body = aux_body_processing(subs, n_body);
1608 if(send_notify_request(subs, watcher_subs, aux_body ? aux_body : n_body,
1611 LM_ERR("sending Notify not successful\n");
1612 if(aux_body != NULL) {
1614 subs->event->aux_free_body(aux_body->s);
1621 if(aux_body != NULL) {
1623 subs->event->aux_free_body(aux_body->s);
1630 static sip_msg_t *_pres_subs_notify_reply_msg = NULL;
1631 static int _pres_subs_notify_reply_code = 0;
1633 int pv_parse_notify_reply_var_name(pv_spec_p sp, str *in)
1635 pv_spec_t *pv = NULL;
1636 if(in->s == NULL || in->len <= 0)
1638 pv = (pv_spec_t *)pkg_malloc(sizeof(pv_spec_t));
1641 memset(pv, 0, sizeof(pv_spec_t));
1642 if(pv_parse_spec(in, pv) == NULL)
1644 sp->pvp.pvn.u.dname = (void *)pv;
1645 sp->pvp.pvn.type = PV_NAME_PVAR;
1649 LM_ERR("invalid pv name [%.*s]\n", in->len, in->s);
1655 int pv_get_notify_reply(struct sip_msg *msg, pv_param_t *param, pv_value_t *res)
1657 pv_spec_t *pv = NULL;
1662 pv = (pv_spec_t *)param->pvn.u.dname;
1664 return pv_get_null(msg, param, res);
1666 return pv_get_spec_value(_pres_subs_notify_reply_msg, pv, res);
1669 #define FAKED_SIP_408_MSG_FORMAT \
1670 "SIP/2.0 408 TIMEOUT\r\nVia: SIP/2.0/UDP 127.0.0.1\r\nFrom: " \
1671 "invalid;\r\nTo: invalid\r\nCall-ID: invalid\r\nCSeq: 1 " \
1672 "TIMEOUT\r\nContent-Length: 0\r\n\r\n"
1673 static sip_msg_t *_faked_msg = NULL;
1675 sip_msg_t *faked_msg()
1677 if(_faked_msg == NULL) {
1678 _faked_msg = pkg_malloc(sizeof(sip_msg_t));
1679 if(likely(build_sip_msg_from_buf(_faked_msg, FAKED_SIP_408_MSG_FORMAT,
1680 strlen(FAKED_SIP_408_MSG_FORMAT), inc_msg_no())
1682 LM_ERR("failed to parse msg buffer\n");
1689 void run_notify_reply_event(struct cell *t, struct tmcb_params *ps)
1691 int backup_route_type;
1692 subs_t *backup_subs = NULL;
1695 if(goto_on_notify_reply == -1)
1698 if(likely(build_sip_msg_from_buf(&msg, t->uac->request.buffer,
1699 t->uac->request.buffer_len, inc_msg_no())
1701 LM_ERR("failed to parse msg buffer\n");
1705 _pres_subs_notify_reply_code = ps->code;
1706 if(ps->code == 408 || ps->rpl == NULL) {
1707 _pres_subs_notify_reply_msg = faked_msg();
1709 _pres_subs_notify_reply_msg = ps->rpl;
1712 if(_pres_subs_mode==1) {
1713 backup_subs = _pres_subs_last_sub;
1714 _pres_subs_last_sub = mem_copy_subs((subs_t *)(*ps->param), PKG_MEM_TYPE);
1717 backup_route_type = get_route_type();
1718 set_route_type(LOCAL_ROUTE);
1719 run_top_route(event_rt.rlist[goto_on_notify_reply], &msg, 0);
1720 set_route_type(backup_route_type);
1722 _pres_subs_notify_reply_msg = NULL;
1723 _pres_subs_notify_reply_code = 0;
1724 if(_pres_subs_mode==1) {
1725 pkg_free(_pres_subs_last_sub);
1726 _pres_subs_last_sub = backup_subs;
1731 int pres_get_delete_sub(void)
1733 sr_xavp_t *vavp = NULL;
1734 str vname = str_init("delete_subscription");
1736 if(pres_xavp_cfg.s == NULL || pres_xavp_cfg.len <= 0) {
1740 vavp = xavp_get_child_with_ival(&pres_xavp_cfg, &vname);
1742 return (int)vavp->val.v.i;
1748 void p_tm_callback(struct cell *t, int type, struct tmcb_params *ps)
1752 if(ps->param == NULL || *ps->param == NULL) {
1753 LM_ERR("weird shit happening\n");
1754 if(ps->param != NULL && *ps->param != NULL)
1755 shm_free((subs_t *)(*ps->param));
1759 subs = (subs_t *)(*ps->param);
1760 LM_DBG("completed with status %d [to_tag:%.*s]\n", ps->code,
1761 subs->to_tag.len, subs->to_tag.s);
1763 run_notify_reply_event(t, ps);
1765 if(ps->code == 404 || ps->code == 481
1766 || (ps->code == 408 && pres_timeout_rm_subs
1767 && subs->status != TERMINATED_STATUS)
1768 || pres_get_delete_sub()) {
1769 delete_subs(&subs->pres_uri, &subs->event->name, &subs->to_tag,
1770 &subs->from_tag, &subs->callid);
1776 void free_cbparam(c_back_param *cb_param)
1778 if(cb_param != NULL)
1782 c_back_param *shm_dup_cbparam(subs_t *subs)
1785 c_back_param *cb_param = NULL;
1787 size = sizeof(c_back_param) + subs->pres_uri.len + subs->event->name.len
1788 + subs->to_tag.len + subs->from_tag.len + subs->callid.len;
1790 cb_param = (c_back_param *)shm_malloc(size);
1791 LM_DBG("=== %d/%d/%d\n", subs->pres_uri.len, subs->event->name.len,
1793 if(cb_param == NULL) {
1794 LM_ERR("no more shared memory\n");
1797 memset(cb_param, 0, size);
1799 cb_param->pres_uri.s = (char *)cb_param + sizeof(c_back_param);
1800 memcpy(cb_param->pres_uri.s, subs->pres_uri.s, subs->pres_uri.len);
1801 cb_param->pres_uri.len = subs->pres_uri.len;
1802 cb_param->ev_name.s =
1803 (char *)(cb_param->pres_uri.s) + cb_param->pres_uri.len;
1804 memcpy(cb_param->ev_name.s, subs->event->name.s, subs->event->name.len);
1805 cb_param->ev_name.len = subs->event->name.len;
1806 cb_param->to_tag.s = (char *)(cb_param->ev_name.s) + cb_param->ev_name.len;
1807 memcpy(cb_param->to_tag.s, subs->to_tag.s, subs->to_tag.len);
1808 cb_param->to_tag.len = subs->to_tag.len;
1810 cb_param->from_tag.s = (char *)(cb_param->to_tag.s) + cb_param->to_tag.len;
1811 memcpy(cb_param->from_tag.s, subs->from_tag.s, subs->from_tag.len);
1812 cb_param->from_tag.len = subs->from_tag.len;
1814 cb_param->callid.s =
1815 (char *)(cb_param->from_tag.s) + cb_param->from_tag.len;
1816 memcpy(cb_param->callid.s, subs->callid.s, subs->callid.len);
1817 cb_param->callid.len = subs->callid.len;
1823 str *create_winfo_xml(watcher_t *watchers, char *version, str resource,
1824 str event, int STATE_FLAG)
1826 xmlDocPtr doc = NULL;
1827 xmlNodePtr root_node = NULL, node = NULL;
1828 xmlNodePtr w_list_node = NULL;
1834 LIBXML_TEST_VERSION;
1836 doc = xmlNewDoc(BAD_CAST "1.0");
1837 root_node = xmlNewNode(NULL, BAD_CAST "watcherinfo");
1838 xmlDocSetRootElement(doc, root_node);
1840 xmlNewProp(root_node, BAD_CAST "xmlns",
1841 BAD_CAST "urn:ietf:params:xml:ns:watcherinfo");
1842 xmlNewProp(root_node, BAD_CAST "version", BAD_CAST version);
1844 if(STATE_FLAG & FULL_STATE_FLAG) {
1845 if(xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "full") == NULL) {
1846 LM_ERR("while adding new attribute\n");
1850 if(xmlNewProp(root_node, BAD_CAST "state", BAD_CAST "partial")
1852 LM_ERR("while adding new attribute\n");
1857 w_list_node = xmlNewChild(root_node, NULL, BAD_CAST "watcher-list", NULL);
1858 if(w_list_node == NULL) {
1859 LM_ERR("while adding child\n");
1862 res = (char *)pkg_malloc(MAX_unsigned(resource.len, event.len) + 1);
1864 ERR_MEM(PKG_MEM_STR);
1866 memcpy(res, resource.s, resource.len);
1867 res[resource.len] = '\0';
1868 xmlNewProp(w_list_node, BAD_CAST "resource", BAD_CAST res);
1869 memcpy(res, event.s, event.len);
1870 res[event.len] = '\0';
1871 xmlNewProp(w_list_node, BAD_CAST "package", BAD_CAST res);
1877 strncpy(content, w->uri.s, w->uri.len);
1878 content[w->uri.len] = '\0';
1880 w_list_node, NULL, BAD_CAST "watcher", BAD_CAST content);
1882 LM_ERR("while adding child\n");
1885 if(xmlNewProp(node, BAD_CAST "id", BAD_CAST w->id.s) == NULL) {
1886 LM_ERR("while adding new attribute\n");
1890 if(xmlNewProp(node, BAD_CAST "event", BAD_CAST "subscribe") == NULL) {
1891 LM_ERR("while adding new attribute\n");
1896 node, BAD_CAST "status", BAD_CAST get_status_str(w->status))
1898 LM_ERR("while adding new attribute\n");
1903 body = (str *)pkg_malloc(sizeof(str));
1905 ERR_MEM(PKG_MEM_STR);
1907 memset(body, 0, sizeof(str));
1909 xmlDocDumpFormatMemory(doc, (xmlChar **)(void *)&body->s, &body->len, 1);
1925 int watcher_found_in_list(watcher_t *watchers, str wuri)
1932 if(w->uri.len == wuri.len
1933 && presence_sip_uri_match(&w->uri, &wuri) == 0)
1941 int add_waiting_watchers(watcher_t *watchers, str pres_uri, str event)
1944 db_key_t query_cols[3];
1945 db_val_t query_vals[3];
1946 db_key_t result_cols[2];
1947 db1_res_t *result = NULL;
1948 db_row_t *row = NULL;
1950 int n_result_cols = 0;
1951 int n_query_cols = 0;
1952 int wuser_col, wdomain_col;
1953 str wuser, wdomain, wuri;
1956 /* select from watchers table the users that have subscribed
1957 * to the presentity and have status pending */
1959 query_cols[n_query_cols] = &str_presentity_uri_col;
1960 query_vals[n_query_cols].type = DB1_STR;
1961 query_vals[n_query_cols].nul = 0;
1962 query_vals[n_query_cols].val.str_val = pres_uri;
1965 query_cols[n_query_cols] = &str_event_col;
1966 query_vals[n_query_cols].type = DB1_STR;
1967 query_vals[n_query_cols].nul = 0;
1968 query_vals[n_query_cols].val.str_val = event;
1971 query_cols[n_query_cols] = &str_status_col;
1972 query_vals[n_query_cols].type = DB1_INT;
1973 query_vals[n_query_cols].nul = 0;
1974 query_vals[n_query_cols].val.int_val = PENDING_STATUS;
1977 result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
1978 result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
1980 if(pa_dbf.use_table(pa_db, &watchers_table) < 0) {
1981 LM_ERR("sql use table 'watchers_table' failed\n");
1985 if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
1986 n_result_cols, 0, &result)
1988 LM_ERR("failed to query %.*s table\n", watchers_table.len,
1991 pa_dbf.free_result(pa_db, result);
1995 if(result == NULL) {
1996 LM_ERR("mysql query failed - null result\n");
2000 if(result->n <= 0) {
2001 LM_DBG("The query returned no result\n");
2002 pa_dbf.free_result(pa_db, result);
2006 for(i = 0; i < result->n; i++) {
2007 row = &result->rows[i];
2008 row_vals = ROW_VALUES(row);
2010 wuser.s = (char *)row_vals[wuser_col].val.string_val;
2011 wuser.len = strlen(wuser.s);
2013 wdomain.s = (char *)row_vals[wdomain_col].val.string_val;
2014 wdomain.len = strlen(wdomain.s);
2016 if(uandd_to_uri(wuser, wdomain, &wuri) < 0) {
2017 LM_ERR("creating uri from username and domain\n");
2021 if(watcher_found_in_list(watchers, wuri)) {
2026 w = (watcher_t *)pkg_malloc(sizeof(watcher_t));
2029 ERR_MEM(PKG_MEM_STR);
2031 memset(w, 0, sizeof(watcher_t));
2033 w->status = WAITING_STATUS;
2035 w->id.s = (char *)pkg_malloc(w->uri.len * 2 + 1);
2036 if(w->id.s == NULL) {
2039 ERR_MEM(PKG_MEM_STR);
2042 to64frombits((unsigned char *)w->id.s, (const unsigned char *)w->uri.s,
2044 w->id.len = strlen(w->id.s);
2047 w->next = watchers->next;
2051 pa_dbf.free_result(pa_db, result);
2056 pa_dbf.free_result(pa_db, result);
2060 #define EXTRACT_STRING(strng, chars) \
2062 strng.s = (char *)chars; \
2063 strng.len = strng.s == NULL ? 0 : strlen(strng.s); \
2066 static int unset_watchers_updated_winfo(str *pres_uri)
2068 db_key_t query_cols[3], result_cols[1], update_cols[1];
2069 db_val_t query_vals[3], update_vals[1];
2070 db_op_t query_ops[2];
2071 db1_res_t *result = NULL;
2072 int n_query_cols = 0;
2074 str winfo = str_init("presence.winfo");
2075 db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2077 /* If this is the only presence.winfo dialog awaiting
2078 update for this presentity reset all of the watchers
2079 updated_winfo fields. */
2081 query_cols[n_query_cols] = &str_presentity_uri_col;
2082 query_vals[n_query_cols].type = DB1_STR;
2083 query_vals[n_query_cols].nul = 0;
2084 query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2085 query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2088 query_cols[n_query_cols] = &str_event_col;
2089 query_vals[n_query_cols].type = DB1_STR;
2090 query_vals[n_query_cols].nul = 0;
2091 query_vals[n_query_cols].val.str_val = winfo;
2094 query_cols[n_query_cols] = &str_updated_col;
2095 query_vals[n_query_cols].type = DB1_INT;
2096 query_vals[n_query_cols].nul = 0;
2097 query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
2100 result_cols[0] = &str_id_col;
2102 update_cols[0] = &str_updated_winfo_col;
2103 update_vals[0].type = DB1_INT;
2104 update_vals[0].nul = 0;
2105 update_vals[0].val.int_val = NO_UPDATE_TYPE;
2107 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2108 LM_ERR("use table failed\n");
2112 if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols, 1,
2115 LM_ERR("in sql query\n");
2119 if(result == NULL) {
2120 LM_ERR("bad result\n");
2124 if(RES_ROW_N(result) <= 0) {
2125 query_ops[0] = OP_EQ;
2126 query_ops[1] = OP_NEQ;
2128 if(pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
2131 LM_ERR("in sql query\n");
2135 if(pa_dbf.affected_rows)
2136 ret = pa_dbf.affected_rows(pa_db);
2144 pa_dbf.free_result(pa_db, result);
2148 static int dialogs_awaiting_update(str *pres_uri, str event)
2150 db_key_t query_cols[3], result_cols[1];
2151 db_val_t query_vals[3];
2152 db_op_t query_ops[3];
2153 db1_res_t *result = NULL;
2154 int n_query_cols = 0;
2156 db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2158 query_cols[n_query_cols] = &str_presentity_uri_col;
2159 query_vals[n_query_cols].type = DB1_STR;
2160 query_vals[n_query_cols].nul = 0;
2161 query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2162 query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2163 query_ops[n_query_cols] = OP_EQ;
2166 query_cols[n_query_cols] = &str_event_col;
2167 query_vals[n_query_cols].type = DB1_STR;
2168 query_vals[n_query_cols].nul = 0;
2169 query_vals[n_query_cols].val.str_val = event;
2170 query_ops[n_query_cols] = OP_EQ;
2173 query_cols[n_query_cols] = &str_updated_col;
2174 query_vals[n_query_cols].type = DB1_INT;
2175 query_vals[n_query_cols].nul = 0;
2176 query_vals[n_query_cols].val.int_val = NO_UPDATE_TYPE;
2177 query_ops[n_query_cols] = OP_NEQ;
2180 result_cols[0] = &str_id_col;
2182 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2183 LM_ERR("use table failed\n");
2187 if(query_fn(pa_db, query_cols, query_ops, query_vals, result_cols,
2188 n_query_cols, 1, 0, &result)
2190 LM_ERR("in sql query\n");
2194 if(result == NULL) {
2195 LM_ERR("bad result\n");
2198 ret = RES_ROW_N(result);
2202 pa_dbf.free_result(pa_db, result);
2206 int set_wipeer_subs_updated(str *pres_uri, pres_ev_t *event, int full)
2208 db_key_t query_cols[3], result_cols[3], update_cols[2];
2209 db_val_t query_vals[3], update_vals[2], *values;
2211 db1_res_t *result = NULL;
2212 int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
2213 int callid_col, from_tag_col, to_tag_col;
2214 int i, ret = -1, count;
2215 str callid, from_tag, to_tag;
2216 db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2218 query_cols[n_query_cols] = &str_presentity_uri_col;
2219 query_vals[n_query_cols].type = DB1_STR;
2220 query_vals[n_query_cols].nul = 0;
2221 query_vals[n_query_cols].val.str_val.s = pres_uri->s;
2222 query_vals[n_query_cols].val.str_val.len = pres_uri->len;
2225 query_cols[n_query_cols] = &str_event_col;
2226 query_vals[n_query_cols].type = DB1_STR;
2227 query_vals[n_query_cols].nul = 0;
2228 query_vals[n_query_cols].val.str_val = event->name;
2231 result_cols[callid_col = n_result_cols++] = &str_callid_col;
2232 result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
2233 result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
2235 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2236 LM_ERR("use table failed\n");
2240 if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2241 n_result_cols, 0, &result)
2243 LM_ERR("in sql query\n");
2247 if(result == NULL) {
2248 LM_ERR("bad result\n");
2252 if(RES_ROW_N(result) <= 0) {
2257 rows = RES_ROWS(result);
2258 count = RES_ROW_N(result);
2259 for(i = 0; i < RES_ROW_N(result); i++) {
2260 values = ROW_VALUES(&rows[i]);
2262 EXTRACT_STRING(callid, VAL_STRING(&values[callid_col]));
2263 EXTRACT_STRING(from_tag, VAL_STRING(&values[from_tag_col]));
2264 EXTRACT_STRING(to_tag, VAL_STRING(&values[to_tag_col]));
2269 query_cols[n_query_cols] = &str_callid_col;
2270 query_vals[n_query_cols].type = DB1_STR;
2271 query_vals[n_query_cols].nul = 0;
2272 query_vals[n_query_cols].val.str_val = callid;
2275 query_cols[n_query_cols] = &str_to_tag_col;
2276 query_vals[n_query_cols].type = DB1_STR;
2277 query_vals[n_query_cols].nul = 0;
2278 query_vals[n_query_cols].val.str_val = to_tag;
2281 query_cols[n_query_cols] = &str_from_tag_col;
2282 query_vals[n_query_cols].type = DB1_STR;
2283 query_vals[n_query_cols].nul = 0;
2284 query_vals[n_query_cols].val.str_val = from_tag;
2287 update_cols[n_update_cols] = &str_updated_col;
2288 update_vals[n_update_cols].type = DB1_INT;
2289 update_vals[n_update_cols].nul = 0;
2290 update_vals[n_update_cols].val.int_val =
2291 core_case_hash(&callid, &from_tag, 0)
2292 % (pres_waitn_time * pres_notifier_poll_rate
2293 * pres_notifier_processes);
2297 update_cols[n_update_cols] = &str_updated_winfo_col;
2298 update_vals[n_update_cols].type = DB1_INT;
2299 update_vals[n_update_cols].nul = 0;
2300 update_vals[n_update_cols].val.int_val = UPDATED_TYPE;
2304 if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
2305 update_vals, n_query_cols, n_update_cols)
2307 LM_ERR("in sql query\n");
2311 if(pa_dbf.affected_rows)
2312 if(pa_dbf.affected_rows(pa_db) == 0)
2321 pa_dbf.free_result(pa_db, result);
2326 int set_updated(subs_t *sub)
2328 db_key_t query_cols[3], update_cols[1];
2329 db_val_t query_vals[3], update_vals[1];
2330 int n_query_cols = 0;
2332 query_cols[n_query_cols] = &str_callid_col;
2333 query_vals[n_query_cols].type = DB1_STR;
2334 query_vals[n_query_cols].nul = 0;
2335 query_vals[n_query_cols].val.str_val = sub->callid;
2338 query_cols[n_query_cols] = &str_to_tag_col;
2339 query_vals[n_query_cols].type = DB1_STR;
2340 query_vals[n_query_cols].nul = 0;
2341 query_vals[n_query_cols].val.str_val = sub->to_tag;
2344 query_cols[n_query_cols] = &str_from_tag_col;
2345 query_vals[n_query_cols].type = DB1_STR;
2346 query_vals[n_query_cols].nul = 0;
2347 query_vals[n_query_cols].val.str_val = sub->from_tag;
2350 update_cols[0] = &str_updated_col;
2351 update_vals[0].type = DB1_INT;
2352 update_vals[0].nul = 0;
2353 update_vals[0].val.int_val = core_case_hash(&sub->callid, &sub->from_tag, 0)
2354 % (pres_waitn_time * pres_notifier_poll_rate
2355 * pres_notifier_processes);
2357 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2358 LM_ERR("use table failed\n");
2362 if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols, update_vals,
2365 LM_ERR("in sql query\n");
2369 if(pa_dbf.affected_rows)
2370 return pa_dbf.affected_rows(pa_db);
2375 static watcher_t *build_watchers_list(subs_t *sub)
2377 db_key_t query_cols[3], result_cols[4];
2378 db_val_t query_vals[3], *values;
2380 db1_res_t *result = NULL;
2381 int n_query_cols = 0, n_result_cols = 0;
2382 int wuser_col, wdomain_col, callid_col, status_col;
2385 watcher_t *watchers = NULL;
2387 watchers = (watcher_t *)pkg_malloc(sizeof(watcher_t));
2388 if(watchers == NULL) {
2389 ERR_MEM(PKG_MEM_STR);
2391 memset(watchers, 0, sizeof(watcher_t));
2393 query_cols[n_query_cols] = &str_presentity_uri_col;
2394 query_vals[n_query_cols].type = DB1_STR;
2395 query_vals[n_query_cols].nul = 0;
2396 query_vals[n_query_cols].val.str_val = sub->pres_uri;
2399 query_cols[n_query_cols] = &str_event_col;
2400 query_vals[n_query_cols].type = DB1_STR;
2401 query_vals[n_query_cols].nul = 0;
2402 query_vals[n_query_cols].val.str_val = sub->event->wipeer->name;
2405 query_cols[n_query_cols] = &str_updated_winfo_col;
2406 query_vals[n_query_cols].type = DB1_INT;
2407 query_vals[n_query_cols].nul = 0;
2408 query_vals[n_query_cols].val.int_val = UPDATED_TYPE;
2411 result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
2412 result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
2413 result_cols[callid_col = n_result_cols++] = &str_callid_col;
2414 result_cols[status_col = n_result_cols++] = &str_status_col;
2416 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2417 LM_ERR("use table failed\n");
2421 if(pa_dbf.query(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2422 n_result_cols, 0, &result)
2424 LM_ERR("in sql query\n");
2428 if(result == NULL) {
2429 LM_ERR("bad result\n");
2433 if(RES_ROW_N(result) <= 0)
2436 rows = RES_ROWS(result);
2437 for(i = 0; i < RES_ROW_N(result); i++) {
2438 memset(&sb, 0, sizeof(subs_t));
2439 values = ROW_VALUES(&rows[i]);
2441 EXTRACT_STRING(sb.watcher_user, VAL_STRING(&values[wuser_col]));
2442 EXTRACT_STRING(sb.watcher_domain, VAL_STRING(&values[wdomain_col]));
2443 EXTRACT_STRING(sb.callid, VAL_STRING(&values[callid_col]));
2444 sb.status = VAL_INT(&values[status_col]);
2446 sb.event = sub->event->wipeer;
2448 if(add_watcher_list(&sb, watchers) < 0)
2453 pa_dbf.free_result(pa_db, result);
2458 pa_dbf.free_result(pa_db, result);
2459 free_watcher_list(watchers);
2463 static int cleanup_missing_dialog(subs_t *sub)
2465 int ret = -1, num_other_watchers = 0;
2467 if(sub->event->type & WINFO_TYPE) {
2468 if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2469 LM_ERR("resetting updated_winfo flags\n");
2472 } else if(sub->event->type & PUBL_TYPE) {
2473 if((num_other_watchers = dialogs_awaiting_update(
2474 &sub->pres_uri, sub->event->name))
2476 LM_ERR("checking watchers\n");
2478 } else if(num_other_watchers == 0) {
2479 if(delete_offline_presentities(&sub->pres_uri, sub->event) < 0) {
2480 LM_ERR("deleting presentity\n");
2492 static int notifier_notify(subs_t *sub, int *updated, int *end_transaction)
2495 watcher_t *watchers = NULL;
2496 int ret = 0, attempt_delete_presentities = 0;
2500 /* Terminating dialog NOTIFY */
2501 if(sub->expires == 0 || sub->status == TERMINATED_STATUS) {
2502 sub->status = TERMINATED_STATUS;
2504 if(sub->event->type & WINFO_TYPE) {
2505 if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2506 LM_WARN("resetting updated_winfo flags\n");
2508 if(pa_dbf.abort_transaction) {
2509 if(pa_dbf.abort_transaction(pa_db) < 0) {
2510 LM_ERR("in abort_transaction\n");
2514 *end_transaction = 0;
2516 /* Make sure this gets tried again next time */
2521 str winfo = str_init("presence.winfo");
2522 int num_other_watchers, num_winfos;
2524 if(sub->event->type & PUBL_TYPE) {
2525 if((num_other_watchers = dialogs_awaiting_update(
2526 &sub->pres_uri, sub->event->name))
2528 LM_ERR("checking watchers\n");
2530 } else if(num_other_watchers == 0)
2531 attempt_delete_presentities = 1;
2534 if(sub->event->wipeer) {
2535 if((num_winfos = dialogs_awaiting_update(&sub->pres_uri, winfo))
2537 LM_ERR("checking winfos\n");
2541 if(sub->updated_winfo == UPDATED_TYPE && num_winfos > 0) {
2547 } else /* Non-terminating dialog */
2549 if(sub->event->type & WINFO_TYPE) /* presence.winfo dialog */
2551 if(sub->updated_winfo == NO_UPDATE_TYPE) {
2552 /* Partial notify if
2553 updated_winfo == NO_UPDATE_TYPE */
2555 char *version_str = int2str(sub->version, &len);
2556 if(version_str == NULL) {
2557 LM_ERR("converting int to str\n");
2561 watchers = build_watchers_list(sub);
2562 if(watchers == NULL) {
2563 LM_ERR("in build_watchers_list\n");
2567 nbody = create_winfo_xml(watchers, version_str, sub->pres_uri,
2568 sub->event->wipeer->name, PARTIAL_STATE_FLAG);
2570 LM_ERR("in create_winfo_xml\n");
2574 } else /* Full presence.winfo NOTIFY */
2575 sub->updated_winfo = NO_UPDATE_TYPE;
2577 if(unset_watchers_updated_winfo(&sub->pres_uri) < 0) {
2578 LM_WARN("resetting updated_winfo flags\n");
2580 if(pa_dbf.abort_transaction) {
2581 if(pa_dbf.abort_transaction(pa_db) < 0) {
2582 LM_ERR("in abort_transaction\n");
2586 *end_transaction = 0;
2588 /* Make sure this gets tried again next time */
2593 } else if(sub->event->type & PUBL_TYPE) {
2594 int num_other_watchers;
2596 if((num_other_watchers = dialogs_awaiting_update(
2597 &sub->pres_uri, sub->event->name))
2599 LM_ERR("checking watchers\n");
2601 } else if(num_other_watchers == 0)
2602 attempt_delete_presentities = 1;
2603 } else if(!pres_send_fast_notify)
2607 if(notify(sub, NULL, nbody, 0, 0) < 0) {
2608 LM_ERR("could not send notify\n");
2615 if(attempt_delete_presentities) {
2616 if(delete_offline_presentities(&sub->pres_uri, sub->event) < 0) {
2617 LM_ERR("deleting presentity\n");
2622 free_notify_body(nbody, sub->event);
2623 free_watcher_list(watchers);
2628 free_notify_body(nbody, sub->event);
2629 free_watcher_list(watchers);
2631 if(pa_dbf.abort_transaction) {
2632 if(pa_dbf.abort_transaction(pa_db) < 0)
2633 LM_ERR("in abort_transaction\n");
2635 *end_transaction = 0;
2640 int process_dialogs(int round, int presence_winfo)
2642 db_key_t query_cols[3], result_cols[20], update_cols[4];
2643 db_val_t query_vals[3], update_vals[4], *values, *dvalues;
2644 db_op_t query_ops[2];
2645 db_row_t *rows, *drows;
2646 db1_res_t *dialog_list = NULL, *dialog = NULL;
2647 int n_query_cols = 0, n_result_cols = 0, n_update_cols = 0;
2648 int callid_col, to_tag_col, from_tag_col;
2649 int pres_uri_col, tuser_col, tdomain_col, fuser_col, fdomain_col;
2650 int wuser_col, wdomain_col, sockinfo_col, lcontact_col, contact_col;
2651 int rroute_col, event_id_col, reason_col, event_col, lcseq_col;
2652 int rcseq_col, status_col, version_col, updated_winfo_col, expires_col;
2653 int flags_col, user_agent_col;
2654 int i, notify_sent = 0, cached_updated_winfo, ret = -1;
2655 int end_transaction = 0;
2657 str ev_sname, winfo = str_init("presence.winfo");
2658 int now = (int)time(NULL);
2660 db_query_f query_fn = pa_dbf.query_lock ? pa_dbf.query_lock : pa_dbf.query;
2662 query_cols[n_query_cols] = &str_updated_col;
2663 query_vals[n_query_cols].type = DB1_INT;
2664 query_vals[n_query_cols].nul = 0;
2665 query_vals[n_query_cols].val.int_val = round;
2666 query_ops[n_query_cols] = OP_EQ;
2669 query_cols[n_query_cols] = &str_event_col;
2670 query_vals[n_query_cols].type = DB1_STR;
2671 query_vals[n_query_cols].nul = 0;
2672 query_vals[n_query_cols].val.str_val = winfo;
2673 query_ops[n_query_cols] = presence_winfo ? OP_EQ : OP_NEQ;
2676 result_cols[pres_uri_col = n_result_cols++] = &str_presentity_uri_col;
2677 result_cols[callid_col = n_result_cols++] = &str_callid_col;
2678 result_cols[to_tag_col = n_result_cols++] = &str_to_tag_col;
2679 result_cols[from_tag_col = n_result_cols++] = &str_from_tag_col;
2680 result_cols[event_col = n_result_cols++] = &str_event_col;
2682 update_cols[n_update_cols] = &str_updated_col;
2683 update_vals[n_update_cols].type = DB1_INT;
2684 update_vals[n_update_cols].nul = 0;
2685 update_vals[n_update_cols].val.int_val = NO_UPDATE_TYPE;
2688 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2689 LM_ERR("use table failed\n");
2693 if(pa_dbf.start_transaction) {
2694 if(pa_dbf.start_transaction(pa_db, pres_db_table_lock) < 0) {
2695 LM_ERR("in start_transaction\n");
2700 /* Step 1: Find active_watchers that require notification */
2701 if(query_fn(pa_db, query_cols, query_ops, query_vals, result_cols,
2702 n_query_cols, n_result_cols, 0, &dialog_list)
2704 LM_ERR("in sql query\n");
2707 if(dialog_list == NULL) {
2708 LM_ERR("bad result\n");
2712 if(dialog_list->n <= 0)
2715 /* Step 2: Update the records so they are not notified again */
2716 if(pa_dbf.update(pa_db, query_cols, query_ops, query_vals, update_cols,
2717 update_vals, n_query_cols, n_update_cols)
2719 LM_ERR("in sql update\n");
2723 if(pa_dbf.end_transaction) {
2724 if(pa_dbf.end_transaction(pa_db) < 0) {
2725 LM_ERR("in end_transaction\n");
2730 /* Step 3: Notify each watcher we found */
2731 rows = RES_ROWS(dialog_list);
2732 for(i = 0; i < RES_ROW_N(dialog_list); i++) {
2736 memset(&sub, 0, sizeof(subs_t));
2737 values = ROW_VALUES(&rows[i]);
2739 EXTRACT_STRING(sub.pres_uri, VAL_STRING(&values[pres_uri_col]));
2740 EXTRACT_STRING(sub.callid, VAL_STRING(&values[callid_col]));
2741 EXTRACT_STRING(sub.to_tag, VAL_STRING(&values[to_tag_col]));
2742 EXTRACT_STRING(sub.from_tag, VAL_STRING(&values[from_tag_col]));
2743 EXTRACT_STRING(ev_sname, VAL_STRING(&values[event_col]));
2744 sub.event = contains_event(&ev_sname, NULL);
2745 if(sub.event == NULL) {
2746 LM_ERR("event not found and set to NULL\n");
2750 query_cols[n_query_cols] = &str_callid_col;
2751 query_vals[n_query_cols].type = DB1_STR;
2752 query_vals[n_query_cols].nul = 0;
2753 query_vals[n_query_cols].val.str_val = sub.callid;
2756 query_cols[n_query_cols] = &str_to_tag_col;
2757 query_vals[n_query_cols].type = DB1_STR;
2758 query_vals[n_query_cols].nul = 0;
2759 query_vals[n_query_cols].val.str_val = sub.to_tag;
2762 query_cols[n_query_cols] = &str_from_tag_col;
2763 query_vals[n_query_cols].type = DB1_STR;
2764 query_vals[n_query_cols].nul = 0;
2765 query_vals[n_query_cols].val.str_val = sub.from_tag;
2768 result_cols[tuser_col = n_result_cols++] = &str_to_user_col;
2769 result_cols[tdomain_col = n_result_cols++] = &str_to_domain_col;
2770 result_cols[fuser_col = n_result_cols++] = &str_from_user_col;
2771 result_cols[fdomain_col = n_result_cols++] = &str_from_domain_col;
2772 result_cols[wuser_col = n_result_cols++] = &str_watcher_username_col;
2773 result_cols[wdomain_col = n_result_cols++] = &str_watcher_domain_col;
2774 result_cols[sockinfo_col = n_result_cols++] = &str_socket_info_col;
2775 result_cols[lcontact_col = n_result_cols++] = &str_local_contact_col;
2776 result_cols[contact_col = n_result_cols++] = &str_contact_col;
2777 result_cols[rroute_col = n_result_cols++] = &str_record_route_col;
2778 result_cols[event_id_col = n_result_cols++] = &str_event_id_col;
2779 result_cols[reason_col = n_result_cols++] = &str_reason_col;
2780 result_cols[lcseq_col = n_result_cols++] = &str_local_cseq_col;
2781 result_cols[rcseq_col = n_result_cols++] = &str_remote_cseq_col;
2782 result_cols[status_col = n_result_cols++] = &str_status_col;
2783 result_cols[version_col = n_result_cols++] = &str_version_col;
2784 result_cols[updated_winfo_col = n_result_cols++] =
2785 &str_updated_winfo_col;
2786 result_cols[expires_col = n_result_cols++] = &str_expires_col;
2787 result_cols[flags_col = n_result_cols++] = &str_flags_col;
2788 result_cols[user_agent_col = n_result_cols++] = &str_user_agent_col;
2790 /* Need to redo this here as we might have switched to the
2791 presentity table during a previous iteration. */
2792 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2793 LM_ERR("use table failed\n");
2797 if(pa_dbf.start_transaction) {
2798 if(pa_dbf.start_transaction(pa_db, pres_db_table_lock) < 0) {
2799 LM_ERR("in start_transaction\n");
2803 end_transaction = 1;
2805 if(query_fn(pa_db, query_cols, 0, query_vals, result_cols, n_query_cols,
2806 n_result_cols, 0, &dialog)
2808 LM_ERR("in sql query\n");
2812 if(dialog == NULL) {
2813 LM_ERR("bad result\n");
2817 if(dialog->n <= 0) {
2818 LM_INFO("record not found - this may be observed in multi-server "
2820 if(cleanup_missing_dialog(&sub) < 0)
2821 LM_ERR("cleaning up after missing record\n");
2826 LM_ERR("multiple records found for %.*s, ci : %.*s, tt : %.*s, ft "
2827 ": %.*s, ev : %.*s\n",
2828 sub.pres_uri.len, sub.pres_uri.s, sub.callid.len,
2829 sub.callid.s, sub.to_tag.len, sub.to_tag.s,
2830 sub.from_tag.len, sub.from_tag.s, ev_sname.len, ev_sname.s);
2834 drows = RES_ROWS(dialog);
2835 dvalues = ROW_VALUES(drows);
2837 EXTRACT_STRING(sub.to_user, VAL_STRING(&dvalues[tuser_col]));
2838 EXTRACT_STRING(sub.to_domain, VAL_STRING(&dvalues[tdomain_col]));
2839 EXTRACT_STRING(sub.from_user, VAL_STRING(&dvalues[fuser_col]));
2840 EXTRACT_STRING(sub.from_domain, VAL_STRING(&dvalues[fdomain_col]));
2841 EXTRACT_STRING(sub.watcher_user, VAL_STRING(&dvalues[wuser_col]));
2842 EXTRACT_STRING(sub.watcher_domain, VAL_STRING(&dvalues[wdomain_col]));
2843 EXTRACT_STRING(sub.sockinfo_str, VAL_STRING(&dvalues[sockinfo_col]));
2844 EXTRACT_STRING(sub.local_contact, VAL_STRING(&dvalues[lcontact_col]));
2845 EXTRACT_STRING(sub.contact, VAL_STRING(&dvalues[contact_col]));
2846 EXTRACT_STRING(sub.record_route, VAL_STRING(&dvalues[rroute_col]));
2847 EXTRACT_STRING(sub.event_id, VAL_STRING(&dvalues[event_id_col]));
2848 EXTRACT_STRING(sub.reason, VAL_STRING(&dvalues[reason_col]));
2849 EXTRACT_STRING(sub.user_agent, VAL_STRING(&dvalues[user_agent_col]));
2851 sub.local_cseq = VAL_INT(&dvalues[lcseq_col]) + 1;
2852 sub.remote_cseq = VAL_INT(&dvalues[rcseq_col]);
2853 sub.status = VAL_INT(&dvalues[status_col]);
2854 sub.version = VAL_INT(&dvalues[version_col]) + 1;
2855 cached_updated_winfo = sub.updated_winfo =
2856 VAL_INT(&dvalues[updated_winfo_col]);
2858 if(VAL_INT(&dvalues[expires_col]) > now + pres_expires_offset)
2859 sub.expires = VAL_INT(&dvalues[expires_col]) - now;
2862 sub.flags = VAL_INT(&dvalues[flags_col]);
2864 sub.updated = round;
2866 if((notify_sent = notifier_notify(&sub, &updated, &end_transaction))
2868 LM_ERR("sending NOTIFY request\n");
2870 if(cleanup_missing_dialog(&sub) < 0)
2871 LM_ERR("cleaning up after error sending NOTIFY"
2874 /* remove the dialog and continue */
2878 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2879 LM_ERR("use table failed\n");
2883 if((sub.expires > 0 && sub.status != TERMINATED_STATUS) || updated) {
2884 if(sub.updated_winfo != cached_updated_winfo) {
2885 update_cols[n_update_cols] = &str_updated_winfo_col;
2886 update_vals[n_update_cols].type = DB1_INT;
2887 update_vals[n_update_cols].nul = 0;
2888 update_vals[n_update_cols].val.int_val = sub.updated_winfo;
2893 update_cols[n_update_cols] = &str_updated_col;
2894 update_vals[n_update_cols].type = DB1_INT;
2895 update_vals[n_update_cols].nul = 0;
2896 update_vals[n_update_cols].val.int_val = round;
2901 update_cols[n_update_cols] = &str_local_cseq_col;
2902 update_vals[n_update_cols].type = DB1_INT;
2903 update_vals[n_update_cols].nul = 0;
2904 update_vals[n_update_cols].val.int_val = sub.local_cseq;
2907 update_cols[n_update_cols] = &str_version_col;
2908 update_vals[n_update_cols].type = DB1_INT;
2909 update_vals[n_update_cols].nul = 0;
2910 update_vals[n_update_cols].val.int_val = sub.version;
2914 if(n_update_cols > 0) {
2915 if(pa_dbf.update(pa_db, query_cols, 0, query_vals, update_cols,
2916 update_vals, n_query_cols, n_update_cols)
2918 LM_ERR("in sql update\n");
2923 } else if(notify_sent) {
2925 if(pa_dbf.use_table(pa_db, &active_watchers_table) < 0) {
2926 LM_ERR("use table failed\n");
2930 if(pa_dbf.delete(pa_db, query_cols, 0, query_vals, n_query_cols)
2932 LM_ERR("in sql delete");
2938 if(pa_dbf.end_transaction && end_transaction) {
2939 if(pa_dbf.end_transaction(pa_db) < 0) {
2940 LM_ERR("in end_transaction\n");
2945 pa_dbf.free_result(pa_db, dialog);
2953 pa_dbf.free_result(pa_db, dialog_list);
2955 pa_dbf.free_result(pa_db, dialog);
2957 if(pa_dbf.abort_transaction) {
2958 if(pa_dbf.abort_transaction(pa_db) < 0)
2959 LM_ERR("in abort_transaction\n");
2965 void pres_timer_send_notify(unsigned int ticks, void *param)
2967 int process_num = *((int *)param);
2969 subset + (pres_waitn_time * pres_notifier_poll_rate * process_num);
2971 if(++subset > (pres_waitn_time * pres_notifier_poll_rate) - 1)
2974 if(process_dialogs(round, 0) < 0) {
2975 LM_ERR("Handling non presence.winfo dialogs\n");
2978 if(process_dialogs(round, 1) < 0) {
2979 LM_ERR("Handling presence.winfo dialogs\n");